package org.joyqueue.server.retry.remote.handler;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.codec.JoyQueueHeader;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Direction;
import org.joyqueue.network.transport.command.handler.CommandHandler;
import org.joyqueue.network.transport.command.provider.ExecutorServiceProvider;
import org.joyqueue.network.transport.exception.TransportException;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.server.retry.remote.command.GetRetry;
import org.joyqueue.server.retry.remote.command.GetRetryAck;
import org.joyqueue.server.retry.remote.command.GetRetryCount;
import org.joyqueue.server.retry.remote.command.GetRetryCountAck;
import org.joyqueue.server.retry.remote.command.PutRetry;
import org.joyqueue.server.retry.remote.command.UpdateRetry;
import org.joyqueue.server.retry.remote.config.RemoteRetryConfigKey;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/server/retry/remote/handler/RemoteRetryMessageHandler.class */
public class RemoteRetryMessageHandler implements CommandHandler, ExecutorServiceProvider {
    protected static final Logger logger = LoggerFactory.getLogger(RemoteRetryMessageHandler.class);
    private MessageRetry messageRetry;
    private PropertySupplier propertySupplier;
    private ExecutorService threadPool;

    public RemoteRetryMessageHandler(MessageRetry messageRetry, PropertySupplier propertySupplier) {
        this.messageRetry = messageRetry;
        this.propertySupplier = propertySupplier;
        this.threadPool = new ThreadPoolExecutor(((Integer) propertySupplier.getValue(RemoteRetryConfigKey.REMOTE_RETRY_THREADS)).intValue(), ((Integer) propertySupplier.getValue(RemoteRetryConfigKey.REMOTE_RETRY_THREADS)).intValue(), ((Integer) propertySupplier.getValue(RemoteRetryConfigKey.REMOTE_RETRY_THREAD_KEEPALIVE)).intValue(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(((Integer) propertySupplier.getValue(RemoteRetryConfigKey.REMOTE_RETRY_THREAD_QUEUE_SIZE)).intValue()), (ThreadFactory) new NamedThreadFactory("joyqueue-retry-remote-threads"));
    }

    public Command handle(Transport transport, Command command) throws TransportException {
        logger.debug("Receive command:[{}]", command);
        long now = SystemClock.now();
        try {
            try {
                switch (command.getHeader().getType()) {
                    case 6:
                        Command execute = execute((PutRetry) command.getPayload());
                        if (SystemClock.now() - now > 100) {
                            logger.info("handle retry more than 100ms, Command:[{}]", command);
                        }
                        return execute;
                    case 7:
                        Command execute2 = execute((GetRetry) command.getPayload());
                        if (SystemClock.now() - now > 100) {
                            logger.info("handle retry more than 100ms, Command:[{}]", command);
                        }
                        return execute2;
                    case 8:
                        Command execute3 = execute((UpdateRetry) command.getPayload());
                        if (SystemClock.now() - now > 100) {
                            logger.info("handle retry more than 100ms, Command:[{}]", command);
                        }
                        return execute3;
                    case 9:
                        Command execute4 = execute((GetRetryCount) command.getPayload());
                        if (SystemClock.now() - now > 100) {
                            logger.info("handle retry more than 100ms, Command:[{}]", command);
                        }
                        return execute4;
                    default:
                        throw new JoyQueueException(JoyQueueCode.CN_COMMAND_UNSUPPORTED.getMessage(new Object[]{Integer.valueOf(command.getHeader().getType())}), JoyQueueCode.CN_COMMAND_UNSUPPORTED.getCode());
                }
            } catch (JoyQueueException e) {
                logger.error("Message retry exception, transport: {}", transport, e);
                Command build = BooleanAck.build(e.getCode(), e.getMessage());
                if (SystemClock.now() - now > 100) {
                    logger.info("handle retry more than 100ms, Command:[{}]", command);
                }
                return build;
            } catch (Exception e2) {
                logger.error("Message retry exception, transport: {}", transport, e2);
                Command build2 = BooleanAck.build(JoyQueueCode.CN_UNKNOWN_ERROR.getCode(), JoyQueueCode.CN_UNKNOWN_ERROR.getMessage(new Object[0]));
                if (SystemClock.now() - now > 100) {
                    logger.info("handle retry more than 100ms, Command:[{}]", command);
                }
                return build2;
            }
        } catch (Throwable th) {
            if (SystemClock.now() - now > 100) {
                logger.info("handle retry more than 100ms, Command:[{}]", command);
            }
            throw th;
        }
    }

    public ExecutorService getExecutorService(Transport transport, Command command) {
        return this.threadPool;
    }

    private Command execute(PutRetry putRetry) throws JoyQueueException {
        logger.debug("add retry message:[{}]", putRetry);
        this.messageRetry.addRetry(putRetry.getMessages());
        return BooleanAck.build();
    }

    private Command execute(GetRetry getRetry) throws JoyQueueException {
        logger.debug("get retry message by condition:[{}]", getRetry);
        List<RetryMessageModel> retry = this.messageRetry.getRetry(getRetry.getTopic(), getRetry.getApp(), getRetry.getCount(), getRetry.getStartId());
        GetRetryAck getRetryAck = new GetRetryAck();
        getRetryAck.setMessages(retry);
        Command command = new Command();
        command.setHeader(new JoyQueueHeader(Direction.RESPONSE, -7));
        command.setPayload(getRetryAck);
        return command;
    }

    private Command execute(UpdateRetry updateRetry) throws JoyQueueException {
        logger.debug("update retry by condition:[{}]", updateRetry);
        int updateType = updateRetry.getUpdateType();
        if (UpdateRetry.SUCCESS == updateType) {
            this.messageRetry.retrySuccess(updateRetry.getTopic(), updateRetry.getApp(), updateRetry.getMessages());
        } else if (UpdateRetry.FAILED == updateType) {
            this.messageRetry.retryError(updateRetry.getTopic(), updateRetry.getApp(), updateRetry.getMessages());
        } else if (UpdateRetry.EXPIRED == updateType) {
            this.messageRetry.retryExpire(updateRetry.getTopic(), updateRetry.getApp(), updateRetry.getMessages());
        }
        return BooleanAck.build();
    }

    private Command execute(GetRetryCount getRetryCount) throws JoyQueueException {
        logger.debug("get retry count by condition:[{}]", getRetryCount);
        int countRetry = this.messageRetry.countRetry(getRetryCount.getTopic(), getRetryCount.getApp());
        GetRetryCountAck getRetryCountAck = new GetRetryCountAck();
        getRetryCountAck.setTopic(getRetryCount.getTopic());
        getRetryCountAck.setApp(getRetryCountAck.getApp());
        getRetryCountAck.setCount(countRetry);
        Command command = new Command();
        command.setHeader(new JoyQueueHeader(Direction.RESPONSE, -9));
        command.setPayload(getRetryCountAck);
        return command;
    }
}
