package org.s1.cluster.dds;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.s1.S1SystemError;
import org.s1.cluster.dds.beans.CommandBean;
import org.s1.cluster.dds.beans.MessageBean;
import org.s1.cluster.dds.beans.StorageId;
import org.s1.misc.Closure;
import org.s1.objects.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/s1/cluster/dds/QueueWorker.class */
public class QueueWorker {
    private static final Logger LOG = LoggerFactory.getLogger(QueueWorker.class);
    private OperationLog operationLog;
    private boolean shutdownOnError;
    private String nodeId;
    private int threads;
    private double transactionPriority;
    private Map<String, Double> priorityTable;
    private ExecutorService executor;
    private final LinkedList<MessageBean> list = new LinkedList<>();
    private final LinkedList<MessageBean> listInWork = new LinkedList<>();
    private final Map<String, Map<String, Boolean>> nameThreads = new ConcurrentHashMap();
    private volatile boolean run = false;
    private volatile String status = "stopped";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/s1/cluster/dds/QueueWorker$QueueWorkerThread.class */
    public class QueueWorkerThread implements Runnable {
        private QueueWorkerThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            int size;
            boolean z;
            if (QueueWorker.LOG.isDebugEnabled()) {
                QueueWorker.LOG.debug("Node " + QueueWorker.this.nodeId + " queue worker " + Thread.currentThread().getName() + " started");
            }
            while (true) {
                synchronized (QueueWorker.this.list) {
                    size = QueueWorker.this.list.size();
                }
                if (size == 0 && !QueueWorker.this.run) {
                    break;
                }
                String str = null;
                List<MessageBean> newArrayList = Objects.newArrayList(new Object[0]);
                synchronized (QueueWorker.this.list) {
                    Iterator it = QueueWorker.this.list.iterator();
                    while (it.hasNext()) {
                        MessageBean messageBean = (MessageBean) it.next();
                        synchronized (QueueWorker.this.nameThreads) {
                            String name = messageBean.getDataSource() == null ? null : messageBean.getDataSource().getName();
                            double d = 0.5d;
                            if (name == null) {
                                name = "transaction";
                                d = QueueWorker.this.transactionPriority;
                            } else if (QueueWorker.this.priorityTable.containsKey(name)) {
                                d = ((Double) QueueWorker.this.priorityTable.get(name)).doubleValue();
                            }
                            if (!QueueWorker.this.nameThreads.containsKey(name)) {
                                QueueWorker.this.nameThreads.put(name, Objects.newHashMap(String.class, Boolean.class, new Object[0]));
                            }
                            z = ((double) (((Map) QueueWorker.this.nameThreads.get(name)).size() / QueueWorker.this.threads)) < d;
                            if (z) {
                                str = name;
                                ((Map) QueueWorker.this.nameThreads.get(str)).put(Thread.currentThread().getName(), true);
                            }
                        }
                        if (z) {
                            newArrayList.add(messageBean);
                            synchronized (QueueWorker.this.listInWork) {
                                QueueWorker.this.listInWork.add(messageBean);
                            }
                            it.remove();
                        }
                    }
                }
                for (MessageBean messageBean2 : newArrayList) {
                    if (QueueWorker.LOG.isTraceEnabled()) {
                        QueueWorker.LOG.trace("Node " + QueueWorker.this.nodeId + " queue worker " + Thread.currentThread().getName() + " processing record: " + messageBean2.toString(true));
                    } else if (QueueWorker.LOG.isDebugEnabled()) {
                        QueueWorker.LOG.debug("Node " + QueueWorker.this.nodeId + " queue worker " + Thread.currentThread().getName() + " processing record: " + messageBean2.toString(false));
                    }
                    QueueWorker.this.runCommand(messageBean2);
                    synchronized (QueueWorker.this.listInWork) {
                        QueueWorker.this.listInWork.remove(messageBean2);
                    }
                }
                if (str != null) {
                    synchronized (QueueWorker.this.nameThreads) {
                        ((Map) QueueWorker.this.nameThreads.get(str)).remove(Thread.currentThread().getName());
                    }
                }
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw S1SystemError.wrap(e);
                }
            }
            if (QueueWorker.LOG.isDebugEnabled()) {
                QueueWorker.LOG.debug("Node " + QueueWorker.this.nodeId + " queue worker " + Thread.currentThread().getName() + " stopped");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueWorker(String str, OperationLog operationLog, boolean z, int i, double d, Map<String, Double> map) {
        this.nodeId = str;
        this.operationLog = operationLog;
        this.shutdownOnError = z;
        if (i > 0) {
            this.threads = i;
        } else {
            this.threads = 10;
        }
        map = map == null ? Objects.newHashMap(new Object[0]) : map;
        for (String str2 : map.keySet()) {
            if (map.get(str2).doubleValue() <= 0.0d || map.get(str2).doubleValue() > 1.0d) {
                map.put(str2, Double.valueOf(0.5d));
            }
        }
        this.priorityTable = map;
        if (d <= 0.0d || d > 1.0d) {
            this.transactionPriority = 0.5d;
        } else {
            this.transactionPriority = d;
        }
    }

    Map<String, Object> getStatistic() {
        Map<String, Object> newHashMap = Objects.newHashMap(new Object[0]);
        newHashMap.put("status", this.status);
        if ("started".equals(this.status)) {
            newHashMap.put("priorityTable", this.priorityTable);
            newHashMap.put("transactionPriority", Double.valueOf(this.transactionPriority));
            newHashMap.put("threads", Integer.valueOf(this.threads));
            newHashMap.put("queueDepth", Integer.valueOf(this.list.size()));
        }
        return newHashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start() {
        int size;
        if (this.status.equals("started")) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        synchronized (this.listInWork) {
            this.listInWork.clear();
        }
        synchronized (this.nameThreads) {
            this.nameThreads.clear();
        }
        this.executor = Executors.newFixedThreadPool(this.threads, new ThreadFactory() { // from class: org.s1.cluster.dds.QueueWorker.1
            private AtomicInteger i = new AtomicInteger(-1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "QueueWorkerThread-" + this.i.incrementAndGet());
            }
        });
        this.run = true;
        synchronized (this.list) {
            size = this.list.size();
        }
        for (int i = 0; i < this.threads; i++) {
            this.executor.execute(new QueueWorkerThread());
        }
        this.status = "started";
        LOG.info("Node " + this.nodeId + " queue worker started (" + (System.currentTimeMillis() - currentTimeMillis) + " ms.) with " + this.threads + " threads, write log messages in queue: " + size + ", processing them...");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stop() {
        int size;
        if (this.status.equals("stopped")) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.run = false;
        this.executor.shutdown();
        do {
        } while (!this.executor.isTerminated());
        this.status = "stopped";
        synchronized (this.list) {
            size = this.list.size();
        }
        LOG.info("Node " + this.nodeId + " queue worker stopped (" + (System.currentTimeMillis() - currentTimeMillis) + " ms.), " + size + " log messages left");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(final MessageBean messageBean) {
        synchronized (this.list) {
            if (Objects.find(this.list, new Closure<MessageBean, Boolean>() { // from class: org.s1.cluster.dds.QueueWorker.2
                @Override // org.s1.misc.Closure
                public Boolean call(MessageBean messageBean2) {
                    return Boolean.valueOf(messageBean2.getId() == messageBean.getId());
                }
            }) == null) {
                this.list.add(messageBean);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addFirst(List<MessageBean> list) {
        synchronized (this.list) {
            Iterator<MessageBean> it = list.iterator();
            while (it.hasNext()) {
                final MessageBean next = it.next();
                if (Objects.find(this.list, new Closure<MessageBean, Boolean>() { // from class: org.s1.cluster.dds.QueueWorker.3
                    @Override // org.s1.misc.Closure
                    public Boolean call(MessageBean messageBean) {
                        return Boolean.valueOf(messageBean.getId() == next.getId());
                    }
                }) != null) {
                    it.remove();
                }
            }
            this.list.addAll(0, list);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clear() {
        synchronized (this.list) {
            this.list.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flush(final StorageId storageId) {
        boolean z = true;
        while (z) {
            synchronized (this.list) {
                z = Objects.find(this.list, new Closure<MessageBean, Boolean>() { // from class: org.s1.cluster.dds.QueueWorker.4
                    @Override // org.s1.misc.Closure
                    public Boolean call(MessageBean messageBean) {
                        return Boolean.valueOf(messageBean.isSameEntity(storageId));
                    }
                }) != null;
            }
            if (!z) {
                synchronized (this.listInWork) {
                    z = Objects.find(this.listInWork, new Closure<MessageBean, Boolean>() { // from class: org.s1.cluster.dds.QueueWorker.5
                        @Override // org.s1.misc.Closure
                        public Boolean call(MessageBean messageBean) {
                            return Boolean.valueOf(messageBean.isSameEntity(storageId));
                        }
                    }) != null;
                }
            }
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                throw S1SystemError.wrap(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runCommand(MessageBean messageBean) {
        try {
            this.operationLog.addToLocalLog(messageBean);
            runRealWrite(messageBean);
            this.operationLog.markDone(messageBean.getId());
        } catch (Throwable th) {
            onError(messageBean, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runRealWrite(CommandBean commandBean) {
        if (commandBean.getDataSource() != null) {
            runAtomicWrite(commandBean);
            return;
        }
        List list = (List) Objects.get(commandBean.getParams(), "list");
        if (list == null) {
            list = Objects.newArrayList(new Object[0]);
        }
        Iterator it = list.iterator();
        while (it.hasNext()) {
            runAtomicWrite((CommandBean) it.next());
        }
    }

    private void runAtomicWrite(CommandBean commandBean) {
        DistributedDataSource distributedDataSource = null;
        try {
            distributedDataSource = commandBean.getDataSource().newInstance();
        } catch (Throwable th) {
        }
        if (distributedDataSource != null) {
            distributedDataSource.runWriteCommand(commandBean);
        } else {
            LOG.warn("Cannot initialize DistributedDataSource with name " + commandBean.getDataSource().getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onError(MessageBean messageBean, Throwable th) {
        LOG.error("Distributed storage error! \n Message: " + messageBean.toString(true), th);
        if (this.shutdownOnError) {
            LOG.info("System shutdown");
            System.exit(1);
        }
    }
}
