package org.joyqueue.server.retry.remote;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.TransportClient;
import org.joyqueue.network.transport.codec.JoyQueueHeader;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Direction;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.api.RetryPolicyProvider;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.server.retry.remote.command.GetRetry;
import org.joyqueue.server.retry.remote.command.GetRetryAck;
import org.joyqueue.server.retry.remote.command.GetRetryCount;
import org.joyqueue.server.retry.remote.command.GetRetryCountAck;
import org.joyqueue.server.retry.remote.command.PutRetry;
import org.joyqueue.server.retry.remote.command.UpdateRetry;
import org.joyqueue.server.retry.remote.config.RemoteRetryConfig;
import org.joyqueue.server.retry.remote.config.RemoteRetryConfigKey;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/server/retry/remote/RemoteMessageRetry.class */
public class RemoteMessageRetry implements MessageRetry<Long> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteMessageRetry.class);
    private TransportClient transportClient;
    private RemoteTransportCollection transports;
    private boolean startFlag;
    private RemoteRetryProvider remoteRetryProvider;
    private RetryPolicyProvider retryPolicyProvider;
    private RemoteRetryConfig config;
    private PropertySupplier propertySupplier = null;
    private int remoteRetryLimitThread = ((Integer) RemoteRetryConfigKey.REMOTE_RETRY_LIMIT_THREADS.getValue()).intValue();
    private long remoteRetryUpdateInterval = ((Long) RemoteRetryConfigKey.REMOTE_RETRY_UPDATE_INTERVAL.getValue()).longValue();
    private Semaphore retrySemaphore = null;
    private int maxRetryTimes = 3;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/server/retry/remote/RemoteMessageRetry$BalanceType.class */
    public enum BalanceType {
        ROLL,
        Random
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/server/retry/remote/RemoteMessageRetry$RemoteTransportCollection.class */
    public class RemoteTransportCollection {
        private BalanceType balanceType;
        private TransportClient nettyClient;
        private Map<String, Transport> urlTransportMap = new HashMap();
        private CopyOnWriteArrayList<Transport> transportList = new CopyOnWriteArrayList<>();
        private AtomicInteger rollCounter = new AtomicInteger(0);
        private Random random = new Random();
        private final LoopThread updateThread;

        RemoteTransportCollection(BalanceType balanceType, TransportClient transportClient, long j) {
            this.balanceType = balanceType;
            this.nettyClient = transportClient;
            buildRemoteTransport();
            this.updateThread = LoopThread.builder().sleepTime(j, j).name("Update-Remote-Transport-Thread").onException(th -> {
                RemoteMessageRetry.logger.error(th.getMessage(), th);
            }).doWork(this::buildRemoteTransport).build();
            this.updateThread.start();
        }

        private void buildRemoteTransport() {
            RemoteMessageRetry.logger.info("try to build remote transport.");
            Set<String> remoteUrl = getRemoteUrl();
            if (CollectionUtils.isEmpty(remoteUrl)) {
                RemoteMessageRetry.logger.error("Remote retry url set is empty.");
                return;
            }
            removeInvalid(remoteUrl);
            addRemoteTransport(remoteUrl);
            RemoteMessageRetry.logger.info("finish build remote transport.");
        }

        private void addRemoteTransport(Set<String> set) {
            if (CollectionUtils.isNotEmpty(set)) {
                set.stream().forEach(str -> {
                    try {
                        if (!this.urlTransportMap.containsKey(str)) {
                            Transport createTransport = this.nettyClient.createTransport(str);
                            this.urlTransportMap.put(str, createTransport);
                            this.transportList.add(createTransport);
                            RemoteMessageRetry.logger.info("add transport by url:[{}]", str);
                        }
                    } catch (Exception e) {
                        RemoteMessageRetry.logger.error("create retry remote transport error." + e);
                    }
                });
            }
        }

        private Set<String> getRemoteUrl() {
            return RemoteMessageRetry.this.remoteRetryProvider.getUrls();
        }

        public Transport get() throws JoyQueueException {
            if (this.transportList.size() == 0) {
                throw new JoyQueueException("Have no transport error.", JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
            }
            if (this.balanceType != BalanceType.ROLL) {
                return this.transportList.get(this.random.nextInt(this.transportList.size()));
            }
            int andIncrement = this.rollCounter.getAndIncrement();
            return andIncrement < this.transportList.size() ? this.transportList.get(andIncrement) : this.rollCounter.compareAndSet(andIncrement + 1, 0) ? this.transportList.get(0) : get();
        }

        public synchronized void removeInvalid(Set<String> set) {
            Set<String> keySet = this.urlTransportMap.keySet();
            if (CollectionUtils.isNotEmpty(keySet)) {
                for (String str : keySet) {
                    if (!set.contains(str)) {
                        this.transportList.remove(this.urlTransportMap.remove(str));
                        RemoteMessageRetry.logger.info("remove remote retry transport by url:[{}]", str);
                    }
                }
            }
        }

        public void stop() {
            this.transportList.stream().forEach(transport -> {
                transport.stop();
            });
        }
    }

    public RemoteMessageRetry(RemoteRetryProvider remoteRetryProvider) {
        this.remoteRetryProvider = remoteRetryProvider;
    }

    public void start() {
        this.retrySemaphore = new Semaphore(this.remoteRetryLimitThread);
        this.transportClient = this.remoteRetryProvider.createTransportClient();
        this.transports = new RemoteTransportCollection(BalanceType.ROLL, this.transportClient, this.remoteRetryUpdateInterval);
        this.startFlag = true;
        logger.info("remote retry manager is started");
    }

    public boolean isStarted() {
        return this.startFlag;
    }

    public void stop() {
        this.transports.stop();
        this.startFlag = false;
        logger.info("remote retry manager is stopped");
    }

    protected void checkStarted() throws JoyQueueException {
        if (!isStarted()) {
            throw new JoyQueueException(JoyQueueCode.CN_SERVICE_NOT_AVAILABLE, new Object[0]);
        }
    }

    public void setRetryPolicyProvider(RetryPolicyProvider retryPolicyProvider) {
        this.retryPolicyProvider = retryPolicyProvider;
    }

    public void addRetry(List<RetryMessageModel> list) throws JoyQueueException {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        checkStarted();
        if (sync(new Command(new JoyQueueHeader(Direction.REQUEST, 6), new PutRetry(list))).getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode()) {
            throw new JoyQueueException(JoyQueueCode.RETRY_ADD, new Object[]{""});
        }
    }

    public void retrySuccess(String str, String str2, Long[] lArr) throws JoyQueueException {
        checkStarted();
        remoteUpdateRetry(str, str2, lArr, UpdateRetry.SUCCESS);
    }

    public void retryError(String str, String str2, Long[] lArr) throws JoyQueueException {
        checkStarted();
        remoteUpdateRetry(str, str2, lArr, UpdateRetry.FAILED);
    }

    public void retryExpire(String str, String str2, Long[] lArr) throws JoyQueueException {
        checkStarted();
        remoteUpdateRetry(str, str2, lArr, UpdateRetry.EXPIRED);
    }

    public List<RetryMessageModel> getRetry(String str, String str2, short s, long j) throws JoyQueueException {
        List<RetryMessageModel> messages;
        checkStarted();
        Semaphore semaphore = this.retrySemaphore;
        try {
            if (semaphore.tryAcquire()) {
                if (str != null) {
                    try {
                        if (!str.trim().isEmpty() && str2 != null && !str2.trim().isEmpty() && s > 0) {
                            Command sync = sync(new Command(new JoyQueueHeader(Direction.REQUEST, 7), new GetRetry().topic(str).app(str2).count(s).startId(j)));
                            if (sync.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode()) {
                                throw new JoyQueueException(JoyQueueCode.RETRY_GET, new Object[]{""});
                            }
                            if (sync != null && (messages = ((GetRetryAck) sync.getPayload()).getMessages()) != null && messages.size() > 0) {
                                if (semaphore != null) {
                                    semaphore.release();
                                }
                                return messages;
                            }
                            if (semaphore != null) {
                                semaphore.release();
                            }
                        }
                    } catch (Exception e) {
                        logger.error("getRetry exception, topic: {}, app: {}, index: {}, count: {}", new Object[]{str, str2, Long.valueOf(j), Short.valueOf(s), e});
                        if (semaphore != null) {
                            semaphore.release();
                        }
                    }
                }
                ArrayList arrayList = new ArrayList();
                if (semaphore != null) {
                    semaphore.release();
                }
                return arrayList;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("tryAcquire failure:" + semaphore.drainPermits());
            }
            return new ArrayList();
        } catch (Throwable th) {
            if (semaphore != null) {
                semaphore.release();
            }
            throw th;
        }
    }

    public int countRetry(String str, String str2) throws JoyQueueException {
        checkStarted();
        if (str == null) {
            return 0;
        }
        try {
            if (str.trim().isEmpty() || str2 == null || str2.trim().isEmpty()) {
                return 0;
            }
            Command sync = sync(new Command(new JoyQueueHeader(Direction.REQUEST, 9), new GetRetryCount().topic(str).app(str2)));
            if (sync.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode()) {
                logger.error("countRetry exception, topic: {}, app: {}, code: {}", new Object[]{str, str2, Integer.valueOf(sync.getHeader().getStatus())});
                return 0;
            }
            GetRetryCountAck getRetryCountAck = (GetRetryCountAck) sync.getPayload();
            if (getRetryCountAck != null) {
                return getRetryCountAck.getCount();
            }
            return 0;
        } catch (Exception e) {
            logger.error("countRetry exception, topic: {}, app: {}", new Object[]{str, str2, e});
            return 0;
        }
    }

    protected void remoteUpdateRetry(String str, String str2, Long[] lArr, byte b) throws JoyQueueException {
        if (sync(new Command(new JoyQueueHeader(Direction.REQUEST, 8), new UpdateRetry().topic(str).app(str2).messages(lArr).updateType(b))).getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode()) {
            throw new JoyQueueException(JoyQueueCode.RETRY_UPDATE, new Object[]{""});
        }
    }

    protected Command sync(Command command) throws JoyQueueException {
        return this.transports.get().sync(command, this.config.getTransportTimeout());
    }

    public void setSupplier(PropertySupplier propertySupplier) {
        this.propertySupplier = propertySupplier;
        this.config = new RemoteRetryConfig(propertySupplier);
        this.remoteRetryLimitThread = this.config.getLimitThreads();
        this.remoteRetryUpdateInterval = this.config.getUpdateInterval();
    }
}
