package io.pravega.client.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.connection.impl.ClientConnection;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.io.StreamHelpers;
import io.pravega.common.util.CertificateUtils;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.base.Strings;
import io.pravega.shaded.io.netty.buffer.ByteBuf;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.EnhancedByteBufInputStream;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManagerFactory;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/connection/impl/TcpClientConnection.class */
public class TcpClientConnection implements ClientConnection {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TcpClientConnection.class);
    static final int CONNECTION_TIMEOUT = 5000;
    static final int TCP_BUFFER_SIZE = 262144;
    private final Socket socket;
    private final CommandEncoder encoder;
    private final ConnectionReader reader;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final PravegaNodeUri location;
    private final Runnable onClose;
    private final ScheduledFuture<?> timeoutFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/client/connection/impl/TcpClientConnection$ConnectionReader.class */
    public static class ConnectionReader implements Runnable {
        static final ThreadFactory THREAD_FACTORY = ExecutorServiceHelpers.getThreadFactory("ClientSocketReaders", 7);
        private final String name;
        private final InputStream in;
        private final ReplyProcessor callback;
        private final AppendBatchSizeTracker batchSizeTracker;
        private final AtomicBoolean stop = new AtomicBoolean(false);
        private final Thread thread = THREAD_FACTORY.newThread(this);

        public ConnectionReader(String str, InputStream inputStream, ReplyProcessor replyProcessor, AppendBatchSizeTracker appendBatchSizeTracker) {
            this.name = str;
            this.in = inputStream;
            this.callback = replyProcessor;
            this.batchSizeTracker = appendBatchSizeTracker;
        }

        public void start() {
            this.thread.start();
        }

        @Override // java.lang.Runnable
        public void run() {
            IoBuffer ioBuffer = new IoBuffer();
            while (!this.stop.get()) {
                try {
                    WireCommand readCommand = readCommand(this.in, ioBuffer);
                    if (readCommand instanceof WireCommands.DataAppended) {
                        this.batchSizeTracker.recordAck(((WireCommands.DataAppended) readCommand).getEventNumber());
                    }
                    try {
                        this.callback.process((Reply) readCommand);
                    } catch (Exception e) {
                        this.callback.processingFailure(e);
                    }
                } catch (EOFException e2) {
                    TcpClientConnection.log.info("Closing TcpClientConnection.Reader because end of input reached.");
                    stop();
                } catch (SocketException e3) {
                    if (e3.getMessage().equals("Socket closed")) {
                        TcpClientConnection.log.info("Closing TcpConnection.Reader because socket is closed.");
                    } else {
                        TcpClientConnection.log.warn("Error in reading from socket.", e3);
                    }
                    stop();
                } catch (Exception e4) {
                    TcpClientConnection.log.warn("Error processing data from from server " + this.name, e4);
                    stop();
                }
            }
        }

        @VisibleForTesting
        static WireCommand readCommand(InputStream inputStream, IoBuffer ioBuffer) throws IOException {
            ByteBuf buffOfSize = ioBuffer.getBuffOfSize(inputStream, 8);
            int i = buffOfSize.getInt(0);
            WireCommandType type = WireCommands.getType(i);
            if (type == null) {
                throw new InvalidMessageException("Unknown wire command: " + i);
            }
            int i2 = buffOfSize.getInt(4);
            if (i2 < 0 || i2 > 16777215) {
                throw new InvalidMessageException("Event of invalid length: " + i2);
            }
            return type.readFrom(new EnhancedByteBufInputStream(ioBuffer.getBuffOfSize(inputStream, i2)), i2);
        }

        public void stop() {
            if (this.stop.getAndSet(true)) {
                return;
            }
            StreamHelpers.closeQuietly(this.in, TcpClientConnection.log, "Got error while shutting down reader {}. ", this.name);
            this.callback.connectionDropped();
        }
    }

    /* loaded from: input_file:io/pravega/client/connection/impl/TcpClientConnection$TimeoutBatch.class */
    private static final class TimeoutBatch implements Runnable {
        private final AtomicLong token = new AtomicLong(-1);
        private final CommandEncoder encoder;

        @Override // java.lang.Runnable
        public void run() {
            this.token.set(this.encoder.batchTimeout(this.token.get()));
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"encoder"})
        public TimeoutBatch(CommandEncoder commandEncoder) {
            this.encoder = commandEncoder;
        }
    }

    private TcpClientConnection(Socket socket, CommandEncoder commandEncoder, ConnectionReader connectionReader, PravegaNodeUri pravegaNodeUri, Runnable runnable, ScheduledExecutorService scheduledExecutorService) {
        this.socket = (Socket) Preconditions.checkNotNull(socket);
        this.encoder = (CommandEncoder) Preconditions.checkNotNull(commandEncoder);
        this.reader = (ConnectionReader) Preconditions.checkNotNull(connectionReader);
        this.location = (PravegaNodeUri) Preconditions.checkNotNull(pravegaNodeUri);
        this.onClose = runnable;
        this.timeoutFuture = scheduledExecutorService.scheduleWithFixedDelay(new TimeoutBatch(commandEncoder), 20L, 20L, TimeUnit.MILLISECONDS);
    }

    public static CompletableFuture<TcpClientConnection> connect(PravegaNodeUri pravegaNodeUri, ClientConfig clientConfig, ReplyProcessor replyProcessor, ScheduledExecutorService scheduledExecutorService, Runnable runnable) {
        return CompletableFuture.supplyAsync(() -> {
            Socket createClientSocket = createClientSocket(pravegaNodeUri, clientConfig);
            try {
                InputStream inputStream = createClientSocket.getInputStream();
                AppendBatchSizeTrackerImpl appendBatchSizeTrackerImpl = new AppendBatchSizeTrackerImpl();
                ConnectionReader connectionReader = new ConnectionReader(pravegaNodeUri.toString(), inputStream, replyProcessor, appendBatchSizeTrackerImpl);
                connectionReader.start();
                return new TcpClientConnection(createClientSocket, new CommandEncoder(l -> {
                    return appendBatchSizeTrackerImpl;
                }, null, createClientSocket.getOutputStream()), connectionReader, pravegaNodeUri, runnable, scheduledExecutorService);
            } catch (Exception e) {
                StreamHelpers.closeQuietly(createClientSocket, log, "Failed to close socket while failing.", new Object[0]);
                runnable.run();
                throw Exceptions.sneakyThrow(new ConnectionFailedException(e));
            }
        }, scheduledExecutorService);
    }

    private static TrustManagerFactory createFromCert(String str) throws CertificateException, IOException, NoSuchAlgorithmException, KeyStoreException {
        TrustManagerFactory trustManagerFactory = null;
        if (!Strings.isNullOrEmpty(str)) {
            KeyStore createTrustStore = CertificateUtils.createTrustStore(str);
            trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            trustManagerFactory.init(createTrustStore);
        }
        return trustManagerFactory;
    }

    private static Socket createClientSocket(PravegaNodeUri pravegaNodeUri, ClientConfig clientConfig) {
        Socket socket;
        try {
            if (clientConfig.isEnableTlsToSegmentStore()) {
                TrustManagerFactory createFromCert = createFromCert(clientConfig.getTrustStore());
                SSLContext sSLContext = SSLContext.getInstance("TLS");
                sSLContext.init(null, createFromCert != null ? createFromCert.getTrustManagers() : null, null);
                SSLSocket sSLSocket = (SSLSocket) sSLContext.getSocketFactory().createSocket();
                if (clientConfig.isValidateHostName()) {
                    SSLParameters sSLParameters = new SSLParameters();
                    sSLParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    sSLSocket.setSSLParameters(sSLParameters);
                }
                socket = sSLSocket;
            } else {
                socket = new Socket();
            }
            socket.setSendBufferSize(TCP_BUFFER_SIZE);
            socket.setReceiveBufferSize(TCP_BUFFER_SIZE);
            socket.setTcpNoDelay(true);
            socket.connect(new InetSocketAddress(pravegaNodeUri.getEndpoint(), pravegaNodeUri.getPort()), CONNECTION_TIMEOUT);
            return socket;
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(new ConnectionFailedException(e));
        }
    }

    @Override // io.pravega.client.connection.impl.ClientConnection
    public void send(WireCommand wireCommand) throws ConnectionFailedException {
        checkIfClosed();
        try {
            this.encoder.write(wireCommand);
        } catch (IOException e) {
            log.warn("Error writing to connection: {}", e.toString());
            close();
            throw new ConnectionFailedException(e);
        }
    }

    @Override // io.pravega.client.connection.impl.ClientConnection
    public void send(Append append) throws ConnectionFailedException {
        checkIfClosed();
        try {
            this.encoder.write(append);
        } catch (IOException e) {
            log.warn("Error writing to connection: {}", e.toString());
            close();
            throw new ConnectionFailedException(e);
        }
    }

    private void checkIfClosed() throws ConnectionFailedException {
        if (this.closed.get()) {
            throw new ConnectionFailedException("Connection already closed");
        }
    }

    @Override // io.pravega.client.connection.impl.ClientConnection, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.reader.stop();
            this.timeoutFuture.cancel(false);
            StreamHelpers.closeQuietly(this.socket, log, "Error closing TcpClientConnection.socket", new Object[0]);
            if (this.onClose != null) {
                this.onClose.run();
            }
        }
    }

    @VisibleForTesting
    boolean isClosed() {
        return this.closed.get();
    }

    @Override // io.pravega.client.connection.impl.ClientConnection
    public void sendAsync(List<Append> list, ClientConnection.CompletedCallback completedCallback) {
        try {
            Iterator<Append> it = list.iterator();
            while (it.hasNext()) {
                this.encoder.write(it.next());
            }
            completedCallback.complete(null);
        } catch (IOException e) {
            log.warn("Error writing to connection: {}", e.toString());
            close();
            completedCallback.complete(new ConnectionFailedException(e));
        }
    }

    public String toString() {
        return "TcpClientConnection [location=" + this.location + ", isClosed=" + this.closed.get() + "]";
    }
}
