package co.vsco.vsn.grpc;

import android.content.Context;
import co.vsco.vsn.grpc.TelegraphGrpc;
import com.google.a.a.a.a.a.a;
import com.vsco.c.C;
import com.vsco.proto.telegraph.l;
import io.grpc.stub.StreamObserver;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

/* loaded from: classes.dex */
public class MessageStreamManager {
    private PublishSubject<String> messageBus = PublishSubject.create();
    private TelegraphGrpc telegraphGrpc;
    private static final String TAG = MessageStreamManager.class.getSimpleName();
    private static MessageStreamManager INSTANCE = null;

    private MessageStreamManager(String str, Context context) {
        this.telegraphGrpc = new TelegraphGrpc(str);
    }

    public static MessageStreamManager getInstance(String str, Context context) {
        if (INSTANCE == null) {
            INSTANCE = new MessageStreamManager(str, context.getApplicationContext());
        }
        return INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(String str) {
        this.messageBus.onNext(str);
    }

    public void cancelMessageStream() {
        this.telegraphGrpc.cancelMessageStream();
        this.telegraphGrpc.unsubscribe();
    }

    public <T> Subscription register(Action1<String> action1) {
        return this.messageBus.observeOn(AndroidSchedulers.mainThread()).subscribe(action1);
    }

    public synchronized void streamMessagesAsync() throws TelegraphGrpc.TelegraphException {
        this.telegraphGrpc.fetchMessagesStreamingAsync(new StreamObserver<l>() { // from class: co.vsco.vsn.grpc.MessageStreamManager.1
            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                C.i(MessageStreamManager.TAG, "Message stream completed by server.");
                MessageStreamManager.this.cancelMessageStream();
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                C.exe(MessageStreamManager.TAG, th.getMessage(), th);
                a.a(th);
                MessageStreamManager.this.cancelMessageStream();
            }

            @Override // io.grpc.stub.StreamObserver
            public void onNext(l lVar) {
                C.i(MessageStreamManager.TAG, "Message received in stream.");
                if (lVar.j() > 0) {
                    MessageStreamManager.this.send(lVar.a(0).k().e);
                }
            }
        });
    }
}
