package org.boon.slumberdb.stores;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.boon.Boon;
import org.boon.Logger;
import org.boon.Pair;
import org.boon.collections.IntList;
import org.boon.concurrent.Timer;
import org.boon.core.Sys;
import org.boon.slumberdb.service.config.DataStoreConfig;
import org.boon.slumberdb.service.protocol.Action;
import org.boon.slumberdb.service.protocol.requests.BatchSetRequest;
import org.boon.slumberdb.service.protocol.requests.DataStoreRequest;
import org.boon.slumberdb.service.protocol.requests.GetRequest;
import org.boon.slumberdb.service.protocol.requests.ReadBatchRequest;
import org.boon.slumberdb.service.protocol.requests.RemoveRequest;
import org.boon.slumberdb.service.protocol.requests.SetRequest;
import org.boon.slumberdb.service.results.StatCount;

/* loaded from: input_file:org/boon/slumberdb/stores/BaseDataStore.class */
public abstract class BaseDataStore {
    protected final DataStoreSource source;
    protected DataOutputQueue outputDataQueue;
    protected DataStore nextReaderDataStore;
    protected ScheduledExecutorService scheduledExecutorService;
    protected DataStoreConfig dataStoreConfig;
    protected Future<?> future;
    protected LinkedTransferQueue<DataStoreRequest> writeOperationsQueue = new LinkedTransferQueue<>();
    protected LinkedTransferQueue<DataStoreRequest> readOperationsQueue = new LinkedTransferQueue<>();
    protected AtomicBoolean stop = new AtomicBoolean();
    protected List<DataStoreRequest> readOperationsBatch = new ArrayList();
    protected List<DataStoreRequest> writeOperationsBatch = new ArrayList();
    protected Timer timer = Timer.timer();
    private Logger logger = Boon.configurableLogger(getClass());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/boon/slumberdb/stores/BaseDataStore$CallStatus.class */
    public static class CallStatus {
        Action action;
        DataStoreSource source;
        int count;

        private CallStatus() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/boon/slumberdb/stores/BaseDataStore$CallStatusTracker.class */
    public static class CallStatusTracker {
        long numberOfCallsSinceLastSendAttempt;
        long totalNumCalls;
        long lastStatSend;
        Map<Pair<Action, DataStoreSource>, CallStatus> callStatusMap;

        private CallStatusTracker() {
            this.numberOfCallsSinceLastSendAttempt = 0L;
            this.totalNumCalls = 0L;
            this.lastStatSend = 0L;
            this.callStatusMap = new HashMap();
        }

        void addCall(DataStoreRequest dataStoreRequest, DataOutputQueue dataOutputQueue) {
            Pair<Action, DataStoreSource> pair = Pair.pair(dataStoreRequest.action(), dataStoreRequest.source());
            CallStatus callStatus = this.callStatusMap.get(pair);
            if (callStatus == null) {
                CallStatus callStatus2 = new CallStatus();
                callStatus2.action = dataStoreRequest.action();
                callStatus2.source = dataStoreRequest.source();
                callStatus2.count = dataStoreRequest.count();
                this.callStatusMap.put(pair, callStatus2);
            } else {
                callStatus.action = dataStoreRequest.action();
                callStatus.source = dataStoreRequest.source();
                callStatus.count += dataStoreRequest.count();
            }
            this.numberOfCallsSinceLastSendAttempt++;
            if (this.numberOfCallsSinceLastSendAttempt > 100) {
                this.totalNumCalls += this.numberOfCallsSinceLastSendAttempt;
                this.numberOfCallsSinceLastSendAttempt = 0L;
                long time = Timer.timer().time();
                if (time - this.lastStatSend > 20000) {
                    for (CallStatus callStatus3 : this.callStatusMap.values()) {
                        dataOutputQueue.put(new StatCount(time, callStatus3.source, callStatus3.action, "DATA SOURCE ACCESS ", callStatus3.count));
                    }
                    dataOutputQueue.put(new StatCount(time, DataStoreSource.SERVER, Action.NONE, "DATA SOURCE ACCESS " + Thread.currentThread().getName(), this.totalNumCalls));
                    try {
                        dataOutputQueue.put(new StatCount(time, DataStoreSource.SERVER, Action.GET_STATS, "Thread CPU Time " + Thread.currentThread().getName(), Sys.threadCPUTime()));
                    } catch (Throwable th) {
                    }
                    this.lastStatSend = time;
                }
            }
        }

        public void clearStats() {
            this.numberOfCallsSinceLastSendAttempt = 0L;
            this.totalNumCalls = 0L;
            this.lastStatSend = 0L;
            this.callStatusMap.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/boon/slumberdb/stores/BaseDataStore$ReadStatus.class */
    public static class ReadStatus {
        IntList readBatchSize;
        CallStatusTracker tracker;

        private ReadStatus() {
            this.readBatchSize = new IntList();
            this.tracker = new CallStatusTracker();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/boon/slumberdb/stores/BaseDataStore$WriteStatus.class */
    public static class WriteStatus {
        IntList writeBatchSize;
        CallStatusTracker tracker;

        private WriteStatus() {
            this.writeBatchSize = new IntList();
            this.tracker = new CallStatusTracker();
        }

        public void sendBatchSize(DataStoreSource dataStoreSource, DataOutputQueue dataOutputQueue) {
            int max = this.writeBatchSize.max();
            int max2 = this.writeBatchSize.max();
            int max3 = this.writeBatchSize.max();
            int median = this.writeBatchSize.median();
            int standardDeviation = this.writeBatchSize.standardDeviation();
            int variance = this.writeBatchSize.variance();
            long time = Timer.timer().time();
            dataOutputQueue.put(new StatCount(time, DataStoreSource.SERVER, Action.GET_STATS, "Thread request handler User Time", Sys.threadUserTime()));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE max", max));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE mean", max2));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE min", max3));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE median", median));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE standardDeviation", standardDeviation));
            dataOutputQueue.put(new StatCount(time, dataStoreSource, Action.SET, "DataStore WRITE BATCH SIZE variance", variance));
            this.writeBatchSize.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseDataStore(DataStoreSource dataStoreSource) {
        this.source = dataStoreSource;
    }

    public void clearStats() {
    }

    public void sendStats(long j) {
    }

    public void init(DataStoreConfig dataStoreConfig, DataOutputQueue dataOutputQueue, DataStore dataStore) {
        this.dataStoreConfig = dataStoreConfig;
        this.outputDataQueue = dataOutputQueue;
        this.nextReaderDataStore = dataStore;
    }

    public void set(SetRequest setRequest) {
        this.writeOperationsQueue.offer(setRequest);
    }

    public void addAll(BatchSetRequest batchSetRequest) {
        this.writeOperationsQueue.offer(batchSetRequest);
    }

    public void get(GetRequest getRequest) {
        this.readOperationsQueue.offer(getRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processReadQueue() throws InterruptedException {
        ReadStatus readStatus = new ReadStatus();
        while (true) {
            DataStoreRequest poll = this.readOperationsQueue.poll(this.dataStoreConfig.pollTimeoutMS(), TimeUnit.MILLISECONDS);
            while (true) {
                DataStoreRequest dataStoreRequest = poll;
                if (dataStoreRequest == null) {
                    break;
                }
                readStatus.tracker.addCall(dataStoreRequest, this.outputDataQueue);
                this.readOperationsBatch.add(dataStoreRequest);
                if (this.readOperationsBatch.size() > this.dataStoreConfig.processQueueMaxBatchSize()) {
                    break;
                } else {
                    poll = this.readOperationsQueue.poll();
                }
            }
            if (this.readOperationsBatch.size() > 0) {
                try {
                    recievedReadBatch(new ArrayList(this.readOperationsBatch));
                    this.readOperationsBatch.clear();
                } catch (Throwable th) {
                    this.readOperationsBatch.clear();
                    throw th;
                }
            } else {
                flushReadsIfNeeded();
            }
            if (readStatus.readBatchSize.size() > 1000) {
                long time = Timer.timer().time();
                this.outputDataQueue.put(new StatCount(time, DataStoreSource.SERVER, Action.GET_STATS, "Thread TIME USER  BaseDataStore " + Thread.currentThread().getName(), Sys.threadUserTime()));
                this.outputDataQueue.put(new StatCount(time, DataStoreSource.SERVER, Action.GET_STATS, "Thread TIME CPU  BaseDataStore " + Thread.currentThread().getName(), Sys.threadCPUTime()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.max", readStatus.readBatchSize.max()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.min", readStatus.readBatchSize.min()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.median", readStatus.readBatchSize.median()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.mean", readStatus.readBatchSize.mean()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.standardDeviation", readStatus.readBatchSize.standardDeviation()));
                this.outputDataQueue.put(new StatCount(time, this.source, Action.GET, "BaseDataStore readStatus.readBatchSize.variance", readStatus.readBatchSize.variance()));
                readStatus.readBatchSize.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processWriteQueue() throws InterruptedException {
        WriteStatus writeStatus = new WriteStatus();
        while (true) {
            DataStoreRequest poll = this.writeOperationsQueue.poll(this.dataStoreConfig.pollTimeoutMS(), TimeUnit.MILLISECONDS);
            while (true) {
                DataStoreRequest dataStoreRequest = poll;
                if (dataStoreRequest == null) {
                    break;
                }
                writeStatus.tracker.addCall(dataStoreRequest, this.outputDataQueue);
                this.writeOperationsBatch.add(dataStoreRequest);
                if (this.writeOperationsBatch.size() > this.dataStoreConfig.processQueueMaxBatchSize()) {
                    break;
                } else {
                    poll = this.writeOperationsQueue.poll();
                }
            }
            if (this.writeOperationsBatch.size() > 0) {
                try {
                    writeStatus.writeBatchSize.add(this.writeOperationsBatch.size());
                    recievedWriteBatch(new ArrayList(this.writeOperationsBatch));
                    this.writeOperationsBatch.clear();
                } catch (Throwable th) {
                    this.writeOperationsBatch.clear();
                    throw th;
                }
            } else {
                flushWritesIfNeeded();
            }
            if (writeStatus.writeBatchSize.size() > 1000) {
                writeStatus.sendBatchSize(this.source, this.outputDataQueue);
            }
        }
    }

    protected void flushReadsIfNeeded() throws InterruptedException {
    }

    protected void flushWritesIfNeeded() throws InterruptedException {
    }

    protected abstract void recievedReadBatch(List<DataStoreRequest> list) throws InterruptedException;

    protected abstract void recievedWriteBatch(List<DataStoreRequest> list) throws InterruptedException;

    public void stop() {
        this.stop.set(true);
        this.future.cancel(true);
        this.scheduledExecutorService.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nextDataStoreGet(GetRequest getRequest) {
        try {
            this.nextReaderDataStore.get(getRequest);
        } catch (Exception e) {
            this.logger.error(e, new Object[]{"Can't call next reader data store", this.nextReaderDataStore, getRequest});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nextDataStoreReadBatch(ReadBatchRequest readBatchRequest) {
        try {
            this.nextReaderDataStore.batchRead(readBatchRequest);
        } catch (Exception e) {
            this.logger.error(e, new Object[]{"Can't call next reader data store for batch read", this.nextReaderDataStore});
        }
    }

    public void batchRead(ReadBatchRequest readBatchRequest) {
        this.readOperationsQueue.add(readBatchRequest);
    }

    public void remove(RemoveRequest removeRequest) {
        this.writeOperationsQueue.add(removeRequest);
    }

    public void start() {
        this.scheduledExecutorService = Executors.newScheduledThreadPool(2, new ThreadFactory() { // from class: org.boon.slumberdb.stores.BaseDataStore.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(" DataQueue Process " + BaseDataStore.this.source);
                return thread;
            }
        });
        this.future = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.boon.slumberdb.stores.BaseDataStore.2
            @Override // java.lang.Runnable
            public void run() {
                if (BaseDataStore.this.stop.get()) {
                    return;
                }
                try {
                    BaseDataStore.this.processWriteQueue();
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    BaseDataStore.this.logger.fatal(e2);
                }
            }
        }, 0L, this.dataStoreConfig.threadErrorResumeTimeMS(), TimeUnit.MILLISECONDS);
        this.future = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.boon.slumberdb.stores.BaseDataStore.3
            @Override // java.lang.Runnable
            public void run() {
                if (BaseDataStore.this.stop.get()) {
                    return;
                }
                try {
                    BaseDataStore.this.processReadQueue();
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    BaseDataStore.this.logger.fatal(e2, new Object[]{"Problem with base data store running scheduled job"});
                }
            }
        }, 0L, this.dataStoreConfig.threadErrorResumeTimeMS(), TimeUnit.MILLISECONDS);
    }
}
