package io.wondrous.sns.api.tmg.realtime;

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.annotation.RestrictTo;
import android.support.annotation.VisibleForTesting;
import com.google.gson.Gson;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Action;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.wondrous.sns.api.tmg.TmgApiConfig;
import io.wondrous.sns.api.tmg.realtime.internal.CompositeWebsocketListener;
import io.wondrous.sns.api.tmg.realtime.internal.RealtimeSocketListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketConnectingListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketEnvelopeMessage;
import io.wondrous.sns.api.tmg.realtime.internal.SocketTopicMessage;
import io.wondrous.sns.oauth.OAuthInterceptor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.reactivestreams.Publisher;

@Singleton
/* loaded from: classes4.dex */
public class TmgRealtimeApi {
    private static final int CLOSE_CODE_NORMAL = 1000;
    private static final String TAG = "TmgRealtimeApi";
    private final Gson mGson;

    @Nullable
    private final OAuthInterceptor mOAuthInterceptor;
    private final OkHttpClient mOkHttpClient;
    private final Observable<WebSocket> mSocketTask;
    final Map<String, Flowable<TopicEvent>> mTopicPublishers = new ConcurrentHashMap();

    @VisibleForTesting
    final CompositeWebsocketListener mWebsocketListener = new CompositeWebsocketListener();
    private final Flowable<SocketEnvelopeMessage> mStreamPublisher = Flowable.create(new FlowableOnSubscribe(this) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$0
        private final TmgRealtimeApi arg$1;

        /* JADX INFO: Access modifiers changed from: package-private */
        {
            this.arg$1 = this;
        }

        @Override // io.reactivex.FlowableOnSubscribe
        public void subscribe(FlowableEmitter flowableEmitter) {
            this.arg$1.lambda$new$1$TmgRealtimeApi(flowableEmitter);
        }
    }, BackpressureStrategy.BUFFER).share();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public TmgRealtimeApi(@Named("realtime-client") OkHttpClient okHttpClient, final TmgApiConfig tmgApiConfig, TmgRealtimeConfig tmgRealtimeConfig, Gson gson) {
        this.mOkHttpClient = okHttpClient;
        this.mOAuthInterceptor = extractOAuthInterceptor(okHttpClient);
        this.mGson = gson;
        this.mSocketTask = Observable.create(new ObservableOnSubscribe(this, tmgApiConfig) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$1
            private final TmgRealtimeApi arg$1;
            private final TmgApiConfig arg$2;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = tmgApiConfig;
            }

            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter observableEmitter) {
                this.arg$1.lambda$new$4$TmgRealtimeApi(this.arg$2, observableEmitter);
            }
        }).replay(1).refCount(1, tmgRealtimeConfig.getSocketReuseTimeoutInSecs(), TimeUnit.SECONDS);
    }

    private Flowable<TopicEvent> createTopicPublisher(@NonNull final String str) {
        return subscribeToTopic(str).toFlowable(BackpressureStrategy.LATEST).switchMap(new Function(this) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$4
            private final TmgRealtimeApi arg$1;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
            }

            @Override // io.reactivex.functions.Function
            public Object apply(Object obj) {
                return this.arg$1.lambda$createTopicPublisher$8$TmgRealtimeApi((RealtimeSubscription) obj);
            }
        }).ofType(SocketTopicMessage.class).filter(new Predicate(str) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$5
            private final String arg$1;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = str;
            }

            @Override // io.reactivex.functions.Predicate
            public boolean test(Object obj) {
                boolean equals;
                equals = this.arg$1.equals(((SocketTopicMessage) obj).getTopic());
                return equals;
            }
        }).map(TmgRealtimeApi$$Lambda$6.$instance);
    }

    @Nullable
    private static OAuthInterceptor extractOAuthInterceptor(@NonNull OkHttpClient okHttpClient) {
        for (Interceptor interceptor : okHttpClient.interceptors()) {
            if (interceptor instanceof OAuthInterceptor) {
                return (OAuthInterceptor) interceptor;
            }
        }
        return null;
    }

    private Flowable<SocketEnvelopeMessage> getMessagesStream() {
        return this.mStreamPublisher;
    }

    private Observable<RealtimeSubscription> subscribeToTopic(@NonNull final String str) {
        return getSocket().switchMap(new Function(this, str) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$3
            private final TmgRealtimeApi arg$1;
            private final String arg$2;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = str;
            }

            @Override // io.reactivex.functions.Function
            public Object apply(Object obj) {
                return this.arg$1.lambda$subscribeToTopic$7$TmgRealtimeApi(this.arg$2, (WebSocket) obj);
            }
        });
    }

    @RestrictTo({RestrictTo.Scope.LIBRARY})
    public void addStreamSocketListener(RealtimeSocketListener realtimeSocketListener) {
        this.mWebsocketListener.addListener(new StreamWebsocketAdapter(realtimeSocketListener));
    }

    public Flowable<TopicEvent> authenticatedEvents(String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        String sub = this.mOAuthInterceptor == null ? null : this.mOAuthInterceptor.getSub();
        if (sub == null) {
            return Flowable.error(new IllegalStateException("Unable to subscribe to privileged realtime topic."));
        }
        return events("/" + sub + str);
    }

    public Flowable<TopicEvent> events(final String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        Flowable<TopicEvent> flowable = this.mTopicPublishers.get(str);
        if (flowable != null) {
            return flowable;
        }
        Flowable<TopicEvent> share = createTopicPublisher(str).doOnTerminate(new Action(this, str) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$2
            private final TmgRealtimeApi arg$1;
            private final String arg$2;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = str;
            }

            @Override // io.reactivex.functions.Action
            public void run() {
                this.arg$1.lambda$events$5$TmgRealtimeApi(this.arg$2);
            }
        }).share();
        this.mTopicPublishers.put(str, share);
        return share;
    }

    @NonNull
    Observable<WebSocket> getSocket() {
        return this.mSocketTask;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ Publisher lambda$createTopicPublisher$8$TmgRealtimeApi(RealtimeSubscription realtimeSubscription) throws Exception {
        return getMessagesStream();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$events$5$TmgRealtimeApi(String str) throws Exception {
        this.mTopicPublishers.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$new$0$TmgRealtimeApi(WebSocketStreamCallbacks webSocketStreamCallbacks) throws Exception {
        this.mWebsocketListener.removeListener(webSocketStreamCallbacks);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$new$1$TmgRealtimeApi(FlowableEmitter flowableEmitter) throws Exception {
        final WebSocketStreamCallbacks webSocketStreamCallbacks = new WebSocketStreamCallbacks(flowableEmitter, this.mGson);
        this.mWebsocketListener.addListener(webSocketStreamCallbacks);
        flowableEmitter.setCancellable(new Cancellable(this, webSocketStreamCallbacks) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$10
            private final TmgRealtimeApi arg$1;
            private final WebSocketStreamCallbacks arg$2;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = webSocketStreamCallbacks;
            }

            @Override // io.reactivex.functions.Cancellable
            public void cancel() {
                this.arg$1.lambda$new$0$TmgRealtimeApi(this.arg$2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$new$2$TmgRealtimeApi(SocketConnectingListener socketConnectingListener) throws Exception {
        this.mWebsocketListener.removeListener(socketConnectingListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$new$4$TmgRealtimeApi(TmgApiConfig tmgApiConfig, ObservableEmitter observableEmitter) throws Exception {
        final SocketConnectingListener socketConnectingListener = new SocketConnectingListener(observableEmitter, this.mGson);
        socketConnectingListener.setCancellable(new Cancellable(this, socketConnectingListener) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$8
            private final TmgRealtimeApi arg$1;
            private final SocketConnectingListener arg$2;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = socketConnectingListener;
            }

            @Override // io.reactivex.functions.Cancellable
            public void cancel() {
                this.arg$1.lambda$new$2$TmgRealtimeApi(this.arg$2);
            }
        });
        this.mWebsocketListener.addListener(socketConnectingListener);
        final WebSocket newWebSocket = this.mOkHttpClient.newWebSocket(new Request.Builder().url(tmgApiConfig.getWebSocketUrl()).build(), this.mWebsocketListener);
        observableEmitter.setCancellable(new Cancellable(newWebSocket) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$9
            private final WebSocket arg$1;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = newWebSocket;
            }

            @Override // io.reactivex.functions.Cancellable
            public void cancel() {
                this.arg$1.close(1000, "Client disconnected");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ void lambda$subscribeToTopic$6$TmgRealtimeApi(WebSocket webSocket, String str, ObservableEmitter observableEmitter) throws Exception {
        RealtimeSubscription realtimeSubscription = new RealtimeSubscription(webSocket, str, this.mGson);
        realtimeSubscription.subscribe();
        observableEmitter.setDisposable(realtimeSubscription);
        observableEmitter.onNext(realtimeSubscription);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final /* synthetic */ ObservableSource lambda$subscribeToTopic$7$TmgRealtimeApi(final String str, final WebSocket webSocket) throws Exception {
        return Observable.create(new ObservableOnSubscribe(this, webSocket, str) { // from class: io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi$$Lambda$7
            private final TmgRealtimeApi arg$1;
            private final WebSocket arg$2;
            private final String arg$3;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = this;
                this.arg$2 = webSocket;
                this.arg$3 = str;
            }

            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter observableEmitter) {
                this.arg$1.lambda$subscribeToTopic$6$TmgRealtimeApi(this.arg$2, this.arg$3, observableEmitter);
            }
        });
    }
}
