package io.reactivex.netty;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.reactivex.netty.ConnectConfiguration;
import io.reactivex.netty.RemoteObservableConfiguration;
import io.reactivex.netty.RemoteRxEvent;
import io.reactivex.netty.RemoteRxServer;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.codec.Decoder;
import io.reactivex.netty.codec.Encoder;
import io.reactivex.netty.ingress.IngressPolicies;
import io.reactivex.netty.ingress.IngressPolicy;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import io.reactivex.netty.slotting.SlottingStrategies;
import io.reactivex.netty.slotting.SlottingStrategy;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/netty/RemoteObservable.class */
public class RemoteObservable {
    private RemoteObservable() {
    }

    public static <T> RemoteRxConnection<T> connect(final ConnectConfiguration<T> connectConfiguration) {
        final ConnectionMetrics connectionMetrics = new ConnectionMetrics();
        return new RemoteRxConnection<>(Observable.create(new Observable.OnSubscribe<T>() { // from class: io.reactivex.netty.RemoteObservable.1
            public void call(Subscriber<? super T> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(ConnectConfiguration.this.getName());
                subscriber.add(remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(ConnectConfiguration.this, remoteUnsubscribe, connectionMetrics).subscribe(subscriber);
            }
        }), connectionMetrics);
    }

    public static <T> Observable<T> connect(String str, int i, Decoder<T> decoder) {
        return connect(new ConnectConfiguration.Builder().host(str).port(i).decoder(decoder).build()).getObservable();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Observable<T> createTcpConnectionToServer(final ConnectConfiguration<T> connectConfiguration, final RemoteUnsubscribe remoteUnsubscribe, final ConnectionMetrics connectionMetrics) {
        final PublishSubject create = PublishSubject.create();
        final Decoder<T> decoder = connectConfiguration.getDecoder();
        RxNetty.createTcpClient(connectConfiguration.getHost(), connectConfiguration.getPort(), new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>() { // from class: io.reactivex.netty.RemoteObservable.7
            public void configureNewPipeline(ChannelPipeline channelPipeline) {
                channelPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                channelPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4));
            }
        }, new RxEventPipelineConfigurator()})).connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>() { // from class: io.reactivex.netty.RemoteObservable.6
            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, RemoteRxEvent> observableConnection) {
                observableConnection.writeAndFlush(RemoteRxEvent.subscribed(ConnectConfiguration.this.getName(), ConnectConfiguration.this.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(observableConnection);
                return observableConnection.getInput();
            }
        }).retry(connectConfiguration.getSubscribeRetryAttempts()).doOnError(new Action1<Throwable>() { // from class: io.reactivex.netty.RemoteObservable.5
            public void call(Throwable th) {
                ConnectConfiguration.this.getSubscribeErrorHandler().call(new SubscribeInfo(ConnectConfiguration.this.getHost(), ConnectConfiguration.this.getPort(), ConnectConfiguration.this.getName(), ConnectConfiguration.this.getSubscribeParameters()), th);
                if (ConnectConfiguration.this.isSuppressSubscribeErrors()) {
                    return;
                }
                create.onError(th);
            }
        }).map(new Func1<RemoteRxEvent, Notification<T>>() { // from class: io.reactivex.netty.RemoteObservable.4
            public Notification<T> call(RemoteRxEvent remoteRxEvent) {
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.next) {
                    ConnectionMetrics.this.incrementNextCount();
                    return Notification.createOnNext(decoder.decode(remoteRxEvent.getData()));
                }
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.error) {
                    ConnectionMetrics.this.incrementErrorCount();
                    return Notification.createOnError(RemoteObservable.fromBytesToThrowable(remoteRxEvent.getData()));
                }
                if (remoteRxEvent.getType() != RemoteRxEvent.Type.completed) {
                    throw new RuntimeException("RemoteRxEvent of type:" + remoteRxEvent.getType() + ", not supported.");
                }
                ConnectionMetrics.this.incrementCompletedCount();
                return Notification.createOnCompleted();
            }
        }).doOnError(new Action1<Throwable>() { // from class: io.reactivex.netty.RemoteObservable.3
            public void call(Throwable th) {
                ConnectConfiguration.this.getDeocdingErrorHandler().call((Object) null, th);
                if (ConnectConfiguration.this.isSuppressDecodingErrors()) {
                    return;
                }
                create.onError(th);
            }
        }).dematerialize().subscribe(new Observer<T>() { // from class: io.reactivex.netty.RemoteObservable.2
            public void onCompleted() {
                create.onCompleted();
            }

            public void onError(Throwable th) {
                create.onError(th);
            }

            public void onNext(T t) {
                create.onNext(t);
            }
        });
        return create;
    }

    public static <T> RemoteRxServer serve(int i, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(configureServerFromParams(null, i, observable, encoder, SlottingStrategies.noSlotting(), IngressPolicies.allowAll()));
    }

    public static <T> RemoteRxServer serve(int i, String str, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(configureServerFromParams(str, i, observable, encoder, SlottingStrategies.noSlotting(), IngressPolicies.allowAll()));
    }

    private static <T> RemoteRxServer.Builder configureServerFromParams(String str, int i, Observable<T> observable, Encoder<T> encoder, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy) {
        return new RemoteRxServer.Builder().port(i).ingressPolicy(ingressPolicy).addObservable(new RemoteObservableConfiguration.Builder().name(str).encoder(encoder).slottingStrategy(slottingStrategy).observable(observable).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static byte[] fromThrowableToBytes(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            try {
                byteArrayOutputStream = new ByteArrayOutputStream();
                objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                objectOutputStream.writeObject(th);
                if (objectOutputStream != null) {
                    try {
                        objectOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                }
                if (byteArrayOutputStream != null) {
                    byteArrayOutputStream.close();
                }
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th2) {
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e3) {
                    e3.printStackTrace();
                    throw new RuntimeException(e3);
                }
            }
            if (byteArrayOutputStream != null) {
                byteArrayOutputStream.close();
            }
            throw th2;
        }
    }

    static Throwable fromBytesToThrowable(byte[] bArr) {
        ByteArrayInputStream byteArrayInputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            try {
                byteArrayInputStream = new ByteArrayInputStream(bArr);
                objectInputStream = new ObjectInputStream(byteArrayInputStream);
                Throwable th = (Throwable) objectInputStream.readObject();
                if (byteArrayInputStream != null) {
                    try {
                        byteArrayInputStream.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                if (objectInputStream != null) {
                    objectInputStream.close();
                }
                return th;
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            } catch (ClassNotFoundException e3) {
                throw new RuntimeException(e3);
            }
        } catch (Throwable th2) {
            if (byteArrayInputStream != null) {
                try {
                    byteArrayInputStream.close();
                } catch (IOException e4) {
                    throw new RuntimeException(e4);
                }
            }
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th2;
        }
    }
}
