package com.sleepycat.je.rep.subscription;

import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationSecurityException;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.ChannelTimeoutTask;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.ReplicaOutputThread;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.net.DataChannelFactory;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.Protocol;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshake;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig;
import com.sleepycat.je.rep.stream.SubscriberFeederSyncup;
import com.sleepycat.je.rep.utilint.BinaryProtocol;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.utilint.InternalException;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.VLSN;
import java.io.IOException;
import java.lang.Thread;
import java.util.Timer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/je-7.5.11.jar:com/sleepycat/je/rep/subscription/SubscriptionThread.class */
public class SubscriptionThread extends StoppableThread {
    private final Logger logger;
    private final SubscriptionConfig config;
    private final SubscriptionStat stats;
    private final BlockingQueue<Long> outputQueue;
    private final BlockingQueue<Object> inputQueue;
    private SubscriptionProcessMessageThread messageProcThread;
    private NamedChannelWithTimeout namedChannel;
    private ChannelTimeoutTask channelTimeoutTask;
    private Protocol protocol;
    private final VLSN reqVLSN;
    private volatile SubscriptionOutputThread outputThread;
    private volatile SubscriptionStatus status;
    private volatile Exception storedException;
    private TestHook<SubscriptionThread> exceptionHandlingTestHook;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/je-7.5.11.jar:com/sleepycat/je/rep/subscription/SubscriptionThread$ConnectionException.class */
    public class ConnectionException extends RuntimeException {
        private final long retrySleepMs;

        ConnectionException(String str, long j, Throwable th) {
            super(str, th);
            this.retrySleepMs = j;
        }

        long getRetrySleepMs() {
            return this.retrySleepMs;
        }

        @Override // java.lang.Throwable
        public String getMessage() {
            return "Failed to connect, will retry after sleeping " + this.retrySleepMs + " ms";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/je-7.5.11.jar:com/sleepycat/je/rep/subscription/SubscriptionThread$SubFeederHandshakeConfig.class */
    public class SubFeederHandshakeConfig implements ReplicaFeederHandshakeConfig {
        private final NodeType nodeType;
        private final RepImpl repImpl;

        SubFeederHandshakeConfig(NodeType nodeType) {
            this.nodeType = nodeType;
            this.repImpl = (RepImpl) SubscriptionThread.this.envImpl;
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public RepImpl getRepImpl() {
            return this.repImpl;
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public NameIdPair getNameIdPair() {
            return getRepImpl().getNameIdPair();
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public RepUtils.Clock getClock() {
            return new RepUtils.Clock(RepImpl.getClockSkewMs());
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public NodeType getNodeType() {
            return this.nodeType;
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public NamedChannel getNamedChannel() {
            return SubscriptionThread.this.namedChannel;
        }

        @Override // com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig
        public RepGroupImpl getGroup() {
            RepGroupImpl repGroupImpl = new RepGroupImpl(SubscriptionThread.this.config.getGroupName(), true, this.repImpl.getCurrentJEVersion());
            if (SubscriptionThread.this.config.getGroupUUID() != null) {
                repGroupImpl.setUUID(SubscriptionThread.this.config.getGroupUUID());
            }
            return repGroupImpl;
        }
    }

    /* loaded from: input_file:lib/je-7.5.11.jar:com/sleepycat/je/rep/subscription/SubscriptionThread$SubscriptionThreadExceptionHandler.class */
    private class SubscriptionThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
        private SubscriptionThreadExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            SubscriptionThread.this.logger.severe("Error { " + th.getMessage() + " } in SubscriptionThread {" + thread + " } was uncaught.\nstack trace:\n" + LoggerUtils.getStackTrace(th));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionThread(ReplicatedEnvironment replicatedEnvironment, VLSN vlsn, SubscriptionConfig subscriptionConfig, SubscriptionStat subscriptionStat, Logger logger) {
        super(RepInternal.getNonNullRepImpl(replicatedEnvironment), "Subscription Main");
        setUncaughtExceptionHandler(new SubscriptionThreadExceptionHandler());
        this.reqVLSN = vlsn;
        this.config = subscriptionConfig;
        this.stats = subscriptionStat;
        this.logger = logger;
        this.protocol = null;
        this.namedChannel = null;
        this.inputQueue = new ArrayBlockingQueue(subscriptionConfig.getInputMessageQueueSize());
        this.outputQueue = new ArrayBlockingQueue(subscriptionConfig.getOutputMessageQueueSize());
        this.status = SubscriptionStatus.INIT;
        this.storedException = null;
        this.exceptionHandlingTestHook = null;
    }

    public SubscriptionStatus getStatus() {
        return this.status;
    }

    public Exception getStoredException() {
        return this.storedException;
    }

    @Override // com.sleepycat.je.utilint.StoppableThread
    protected Logger getLogger() {
        return this.logger;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LoggerUtils.info(this.logger, this.envImpl, "Start subscription from VLSN " + this.reqVLSN + " from feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
        try {
            try {
                try {
                    try {
                        try {
                            try {
                                try {
                                    int maxConnectRetries = this.config.getMaxConnectRetries();
                                    int i = 0;
                                    while (true) {
                                        if (isShutdown()) {
                                            break;
                                        }
                                        try {
                                            initializeConnection();
                                            if (0 == 0) {
                                                LoggerUtils.fine(this.logger, this.envImpl, "Create auxiliary msg processing and output threads");
                                                if (createAuxThread()) {
                                                    this.status = SubscriptionStatus.SUCCESS;
                                                    loopInternal();
                                                } else {
                                                    this.status = SubscriptionStatus.UNKNOWN_ERROR;
                                                }
                                            }
                                        } catch (ConnectionException e) {
                                            if (i == maxConnectRetries) {
                                                LoggerUtils.info(this.logger, this.envImpl, "Reaching the max retry " + maxConnectRetries + " to connect feeder " + this.config.getFeederHost() + ", shut down subscription\n" + LoggerUtils.getStackTrace(e));
                                                this.storedException = e;
                                                this.status = SubscriptionStatus.CONNECTION_ERROR;
                                                break;
                                            } else {
                                                i++;
                                                LoggerUtils.fine(this.logger, this.envImpl, "Fail to connect feeder at " + this.config.getFeederHost() + " sleep for " + e.getRetrySleepMs() + " ms and re-connect again");
                                                Thread.sleep(e.getRetrySleepMs());
                                            }
                                        }
                                    }
                                    shutdown();
                                } catch (EnvironmentFailureException e2) {
                                    this.storedException = e2;
                                    LoggerUtils.warning(this.logger, this.envImpl, "unable to sync up with feeder due to EFE " + e2.getMessage() + "\n" + LoggerUtils.getStackTrace(e2));
                                    this.status = SubscriptionStatus.UNKNOWN_ERROR;
                                    shutdown();
                                }
                            } catch (ReplicationSecurityException e3) {
                                this.storedException = e3;
                                LoggerUtils.warning(this.logger, this.envImpl, "Subscription exited due to security check failure: " + e3.getMessage());
                                this.status = SubscriptionStatus.SECURITY_CHECK_ERROR;
                                shutdown();
                            }
                        } catch (InternalException e4) {
                            this.storedException = e4;
                            LoggerUtils.warning(this.logger, this.envImpl, "internal exception " + e4.getMessage() + "\n" + LoggerUtils.getStackTrace(e4));
                            this.status = SubscriptionStatus.UNKNOWN_ERROR;
                            shutdown();
                        }
                    } catch (InterruptedException e5) {
                        this.storedException = e5;
                        LoggerUtils.warning(this.logger, this.envImpl, "interrupted exception " + e5.getMessage() + "\n" + LoggerUtils.getStackTrace(e5));
                        this.status = SubscriptionStatus.UNKNOWN_ERROR;
                        shutdown();
                    }
                } catch (InsufficientLogException e6) {
                    this.storedException = e6;
                    LoggerUtils.info(this.logger, this.envImpl, "unable to subscribe from requested VLSN " + this.reqVLSN + "\n" + LoggerUtils.getStackTrace(e6));
                    this.status = SubscriptionStatus.VLSN_NOT_AVAILABLE;
                    shutdown();
                }
            } catch (GroupShutdownException e7) {
                if (this.messageProcThread.isAlive()) {
                    try {
                        this.messageProcThread.join();
                    } catch (InterruptedException e8) {
                        LoggerUtils.fine(this.logger, this.envImpl, "exception in shutting down msg proc thread " + e8.getMessage() + "\n" + LoggerUtils.getStackTrace(e8));
                    }
                }
                this.storedException = e7;
                LoggerUtils.info(this.logger, this.envImpl, "received group shutdown " + e7.getMessage() + "\n" + LoggerUtils.getStackTrace(e7));
                this.status = SubscriptionStatus.GRP_SHUTDOWN;
                shutdown();
            }
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setExceptionHandlingTestHook(TestHook<SubscriptionThread> testHook) {
        this.exceptionHandlingTestHook = testHook;
    }

    void setStatus(SubscriptionStatus subscriptionStatus) {
        this.status = subscriptionStatus;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        if (shutdownDone(this.logger)) {
            return;
        }
        try {
        } catch (Exception e) {
            LoggerUtils.warning(this.logger, this.envImpl, "error in shutdown msg proc thread: " + e.getMessage() + ", continue shutdown the subscription thread.");
        } finally {
            this.messageProcThread = null;
        }
        if (this.messageProcThread != null) {
            this.messageProcThread.shutdownThread(this.logger);
            LoggerUtils.info(this.logger, this.envImpl, "message processing thread has shut down.");
        }
        try {
        } catch (Exception e2) {
            LoggerUtils.warning(this.logger, this.envImpl, "error in shutdown output thread: " + e2.getMessage() + ", continue shutdown subscription thread.");
        } finally {
            this.outputThread = null;
        }
        if (this.outputThread != null) {
            this.outputThread.shutdownThread(this.logger);
            LoggerUtils.info(this.logger, this.envImpl, "output thread has shut down.");
        }
        this.inputQueue.clear();
        this.outputQueue.clear();
        RepUtils.shutdownChannel(this.namedChannel);
        if (this.channelTimeoutTask != null) {
            this.channelTimeoutTask.cancel();
        }
        shutdownThread(this.logger);
        LoggerUtils.info(this.logger, this.envImpl, "queues cleared and channel closed, subscription thread has completely shut down");
    }

    void offer(Object obj) throws InterruptedException, GroupShutdownException {
        RepImpl repImpl = (RepImpl) this.envImpl;
        while (!isShutdown() && !this.inputQueue.offer(obj, 1000L, TimeUnit.MILLISECONDS)) {
            if (!this.messageProcThread.isAlive()) {
                LoggerUtils.info(this.logger, repImpl, "Thread consuming input queue is gone, start shutdown process");
                throw new GroupShutdownException(this.logger, repImpl, this.config.getFeederHost(), this.stats.getHighVLSN(), 0L);
            }
            this.stats.getNumReplayQueueOverflow().increment();
        }
    }

    private void initializeConnection() throws InternalException, EnvironmentFailureException, ConnectionException, ReplicationSecurityException {
        LoggerUtils.fine(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " start open channel and handshake with feeder");
        try {
            openChannel();
            this.protocol = new ReplicaFeederHandshake(new SubFeederHandshakeConfig(this.config.getNodeType())).execute();
            int minProtocolVersion = this.config.getMinProtocolVersion();
            if (this.protocol.getVersion() < minProtocolVersion) {
                throw new BinaryProtocol.ProtocolException("HA protocol version (" + this.protocol.getVersion() + ") is lower than minimal required version (" + minProtocolVersion + ")");
            }
            LoggerUtils.fine(this.logger, this.envImpl, "subscription " + this.config.getSubNodeName() + " sync-up with feeder at vlsn: " + this.reqVLSN);
            VLSN execute = new SubscriberFeederSyncup(this.namedChannel, this.protocol, this.config.getFeederFilter(), (RepImpl) this.envImpl, this.config.getStreamMode(), this.logger).execute(this.reqVLSN);
            LoggerUtils.fine(this.logger, this.envImpl, "sync-up with feeder done, start vlsn: " + execute);
            if (execute.equals(VLSN.NULL_VLSN)) {
                throw new InsufficientLogException((RepImpl) this.envImpl, this.reqVLSN);
            }
            this.stats.setStartVLSN(execute);
            this.protocol.read(this.namedChannel.getChannel(), BaseProtocol.Heartbeat.class);
            queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
            LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " successfully connect to feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort() + ", reqVLSN: " + this.reqVLSN + ", start VLSN: " + execute);
        } catch (EnvironmentFailureException e) {
            this.logger.warning("Fail to handshake with feeder: " + e.getMessage());
            throw e;
        } catch (BinaryProtocol.ProtocolException e2) {
            String str = "Unable to connect to feeder " + this.config.getFeederHost() + " due to protocol exception " + e2.getMessage();
            LoggerUtils.warning(this.logger, this.envImpl, str);
            throw new InternalException(str, e2);
        } catch (IOException e3) {
            throw new ConnectionException("Unable to connect due to " + e3.getMessage() + ",  will retry later.", this.config.getSleepBeforeRetryMs(), e3);
        }
    }

    private boolean createAuxThread() {
        RepImpl repImpl = (RepImpl) this.envImpl;
        this.inputQueue.clear();
        this.outputQueue.clear();
        this.outputThread = new SubscriptionOutputThread(this, repImpl, this.outputQueue, this.protocol, this.namedChannel.getChannel(), this.config.getAuthenticator(), this.stats);
        SubscriptionOutputThread subscriptionOutputThread = this.outputThread;
        if (subscriptionOutputThread == null) {
            LoggerUtils.info(this.logger, this.envImpl, "subscription " + this.config.getSubNodeName() + " just shut down, no need to create auxiliary threads");
            return false;
        }
        subscriptionOutputThread.start();
        LoggerUtils.fine(this.logger, this.envImpl, "output thread created for subscription " + this.config.getSubNodeName());
        this.messageProcThread = new SubscriptionProcessMessageThread(repImpl, this.inputQueue, this.config, this.stats, this.logger);
        this.messageProcThread.start();
        LoggerUtils.fine(this.logger, this.envImpl, "message processing thread created for subscription " + this.config.getSubNodeName());
        return true;
    }

    private NamedChannel openChannel() throws ConnectionException, InternalException, ReplicationSecurityException {
        RepImpl repImpl = (RepImpl) this.envImpl;
        if (repImpl == null) {
            throw new IllegalStateException("Replication env is unavailable.");
        }
        try {
            DataChannelFactory.ConnectOptions connectOptions = new DataChannelFactory.ConnectOptions();
            this.config.getClass();
            DataChannelFactory.ConnectOptions openTimeout = connectOptions.setTcpNoDelay(true).setReceiveBufferSize(this.config.getReceiveBufferSize()).setOpenTimeout((int) this.config.getStreamOpenTimeout(TimeUnit.MILLISECONDS));
            this.config.getClass();
            DataChannel connect = repImpl.getChannelFactory().connect(this.config.getInetSocketAddress(), openTimeout.setBlocking(true));
            ServiceDispatcher.doServiceHandshake(connect, "Feeder", this.config.getAuthInfo());
            LoggerUtils.fine(this.logger, this.envImpl, "channel opened to service Feeder@" + this.config.getFeederHost() + "[address: " + this.config.getFeederHostAddr() + " port: " + this.config.getFeederPort() + "]");
            int duration = repImpl.getConfigManager().getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
            this.channelTimeoutTask = new ChannelTimeoutTask(new Timer(true));
            this.namedChannel = new NamedChannelWithTimeout(repImpl, this.logger, this.channelTimeoutTask, connect, duration);
            LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " has successfully created a channel to feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
            return this.namedChannel;
        } catch (ServiceDispatcher.ServiceConnectFailedException e) {
            if (e.getResponse() == ServiceDispatcher.Response.UNKNOWN_SERVICE) {
                throw new ConnectionException("Service exception: " + e.getMessage() + ", wait longer and will retry later", this.config.getSleepBeforeRetryMs(), e);
            }
            if (e.getResponse() == ServiceDispatcher.Response.INVALID) {
                throw new ReplicationSecurityException("Security check failure:" + e.getMessage(), this.config.getSubNodeName(), e);
            }
            throw new InternalException("Subscription " + this.config.getSubNodeName() + "failed to handshake for service Feeder with feeder " + this.config.getFeederHost(), e);
        } catch (IOException e2) {
            throw new ConnectionException("Fail to open channel to feeder due to " + e2.getMessage() + ", will retry later", this.config.getSleepBeforeRetryMs(), e2);
        }
    }

    private void loopInternal() throws InternalException, GroupShutdownException, ReplicationSecurityException {
        RepImpl repImpl = (RepImpl) this.envImpl;
        try {
            try {
                LoggerUtils.info(this.logger, this.envImpl, "Start reading messages from feeder " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
                while (!isShutdown()) {
                    checkOutputThread();
                    BinaryProtocol.Message read = this.protocol.read(this.namedChannel);
                    if (read == null) {
                        LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " has nothing stream, exit loop.");
                        return;
                    }
                    if (!$assertionsDisabled && !TestHookExecute.doHookIfSet(this.exceptionHandlingTestHook, this)) {
                        throw new AssertionError();
                    }
                    this.stats.getNumMsgReceived().increment();
                    BinaryProtocol.MessageOp op = read.getOp();
                    if (op == Protocol.HEARTBEAT) {
                        LoggerUtils.finest(this.logger, this.envImpl, "receive heartbeat from " + this.namedChannel.getNameIdPair());
                        queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
                    } else {
                        if (op == Protocol.SHUTDOWN_REQUEST) {
                            LoggerUtils.info(this.logger, this.envImpl, "Receive shutdown request from feeder " + this.config.getFeederHost() + ", shutdown subscriber");
                            GroupShutdownException groupShutdownException = new GroupShutdownException(this.logger, repImpl, this.config.getFeederHost(), this.stats.getHighVLSN(), ((BaseProtocol.ShutdownRequest) read).getShutdownTimeMs());
                            offer(groupShutdownException);
                            throw groupShutdownException;
                        }
                        offer(read);
                        long size = this.inputQueue.size();
                        if (size > this.stats.getMaxPendingInput().get().longValue()) {
                            this.stats.getMaxPendingInput().set(Long.valueOf(size));
                            LoggerUtils.finest(this.logger, this.envImpl, "Max pending request log items:" + size);
                        }
                    }
                }
            } catch (GroupShutdownException | ReplicationSecurityException e) {
                throw e;
            }
        } catch (Exception e2) {
            throw new InternalException(e2.getMessage(), e2);
        }
    }

    private void checkOutputThread() throws InternalException, ReplicationSecurityException {
        SubscriptionOutputThread subscriptionOutputThread = this.outputThread;
        if (subscriptionOutputThread == null) {
            LoggerUtils.fine(this.logger, this.envImpl, "output thread no longer exists");
        } else if (subscriptionOutputThread.getException() instanceof ReplicationSecurityException) {
            ReplicationSecurityException replicationSecurityException = (ReplicationSecurityException) subscriptionOutputThread.getException();
            LoggerUtils.warning(this.logger, this.envImpl, "Output thread exited due to security check failure: " + replicationSecurityException.getMessage());
            throw replicationSecurityException;
        }
    }

    private void queueAck(Long l) throws IOException {
        try {
            this.outputQueue.put(l);
        } catch (InterruptedException e) {
            throw new IOException("Ack I/O interrupted", e);
        }
    }

    static {
        $assertionsDisabled = !SubscriptionThread.class.desiredAssertionStatus();
    }
}
