package org.boon.slumberdb.stores.mysql;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.boon.Boon;
import org.boon.Lists;
import org.boon.Logger;
import org.boon.slumberdb.config.GlobalConfig;
import org.boon.slumberdb.service.config.DataStoreConfig;
import org.boon.slumberdb.service.protocol.Action;
import org.boon.slumberdb.service.protocol.requests.BatchSetRequest;
import org.boon.slumberdb.service.protocol.requests.DataStoreRequest;
import org.boon.slumberdb.service.protocol.requests.GetRequest;
import org.boon.slumberdb.service.protocol.requests.ReadBatchRequest;
import org.boon.slumberdb.service.protocol.requests.RemoveRequest;
import org.boon.slumberdb.service.protocol.requests.SearchRequest;
import org.boon.slumberdb.service.protocol.requests.SetRequest;
import org.boon.slumberdb.service.results.SingleResult;
import org.boon.slumberdb.service.results.StatCount;
import org.boon.slumberdb.stores.BaseDataStore;
import org.boon.slumberdb.stores.DataOutputQueue;
import org.boon.slumberdb.stores.DataStore;
import org.boon.slumberdb.stores.DataStoreSource;

/* loaded from: input_file:org/boon/slumberdb/stores/mysql/MySQLDataStore.class */
public class MySQLDataStore extends BaseDataStore implements DataStore {
    private final boolean debug;
    private final LinkedTransferQueue<ReadBatchRequest> batchReadOperations;
    protected List<ScheduledExecutorService> scheduledExecutorServices;
    protected List<Future<?>> futures;
    int sqlBatchWrite;
    private Logger logger;
    private long lastReadFlushTime;
    private long lastWriteFlushTime;
    private ConcurrentMap<String, List<GetRequest>> readMapKeyToGetOperation;
    private List<String> readList;
    private List<String> removeList;
    private LinkedTransferQueue<Map<String, String>> loadedResultsFromDBQueue;
    private LinkedTransferQueue<List<String>> loadQueue;
    private LinkedTransferQueue<Object> writeQueue;
    private Set<String> keyblackList;
    private boolean hasBlackList;
    private Map<String, String> outputMap;
    private int numReaders;
    private int numWriters;
    private int maxReadBatch;
    private int minReadBatch;
    private int maxWriteBatch;
    private int flushQueueInterval;
    private String url;
    private String user;
    private String password;
    private String table;

    public MySQLDataStore() {
        super(DataStoreSource.REMOTE_DB);
        this.debug = GlobalConfig.DEBUG;
        this.batchReadOperations = new LinkedTransferQueue<>();
        this.scheduledExecutorServices = new ArrayList();
        this.futures = new ArrayList();
        this.logger = Boon.configurableLogger(getClass().getSimpleName());
        this.readMapKeyToGetOperation = new ConcurrentHashMap();
        this.readList = new ArrayList();
        this.removeList = new ArrayList();
        this.loadedResultsFromDBQueue = new LinkedTransferQueue<>();
        this.loadQueue = new LinkedTransferQueue<>();
        this.writeQueue = new LinkedTransferQueue<>();
        this.keyblackList = new HashSet();
        this.outputMap = new ConcurrentHashMap();
        this.numReaders = 5;
        this.numWriters = 5;
        this.maxReadBatch = 1000;
        this.minReadBatch = 100;
        this.maxWriteBatch = 10000;
        this.flushQueueInterval = 250;
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore
    public void init(DataStoreConfig dataStoreConfig, DataOutputQueue dataOutputQueue, DataStore dataStore) {
        super.init(dataStoreConfig, dataOutputQueue, dataStore);
        this.url = dataStoreConfig.dbUrl();
        this.user = dataStoreConfig.dbUser();
        this.password = dataStoreConfig.dbPassword();
        this.table = dataStoreConfig.dbTable();
        this.numReaders = dataStoreConfig.dbReaderCount();
        this.numWriters = dataStoreConfig.dbWriterCount();
        this.maxReadBatch = dataStoreConfig.dbMaxReadBatch();
        this.minReadBatch = dataStoreConfig.dbMinReadBatch();
        this.maxWriteBatch = dataStoreConfig.dbMaxWriteBatch();
        this.sqlBatchWrite = dataStoreConfig.sqlBatchWrite();
        this.flushQueueInterval = dataStoreConfig.dbReadFlushQueueIntervalMS();
        if (this.debug) {
            this.logger.info(dataStoreConfig);
        }
        initBlackListSet(dataStoreConfig);
        if (this.keyblackList.size() > 0) {
            this.hasBlackList = true;
        }
    }

    private void initBlackListSet(DataStoreConfig dataStoreConfig) {
        List<String> mySQLKeyBlackList = dataStoreConfig.mySQLKeyBlackList();
        if (mySQLKeyBlackList != null) {
            Iterator<String> it = mySQLKeyBlackList.iterator();
            while (it.hasNext()) {
                this.keyblackList.add(it.next());
            }
        }
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore
    protected void flushReadsIfNeeded() throws InterruptedException {
        long time = this.timer.time();
        if (time - this.lastReadFlushTime <= this.flushQueueInterval || this.readList.size() <= 0) {
            return;
        }
        if (this.loadQueue.offer(new ArrayList(this.readList))) {
            this.readList.clear();
        } else {
            this.logger.warn(new Object[]{"MySQL LOAD QUEUE IS FULL", Integer.valueOf(this.loadQueue.size())});
        }
        this.lastReadFlushTime = time;
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore
    protected void flushWritesIfNeeded() throws InterruptedException {
        long time = this.timer.time();
        if (time - this.lastWriteFlushTime > this.dataStoreConfig.dbWriteFlushQueueIntervalMS()) {
            if (this.outputMap.size() > 0) {
                if (this.writeQueue.offer(new LinkedHashMap(this.outputMap))) {
                    this.outputMap.clear();
                } else {
                    this.logger.warn(new Object[]{"MySQL STORE QUEUE IS FULL", Integer.valueOf(this.writeQueue.size())});
                }
                this.lastWriteFlushTime = time;
            }
            if (this.removeList.size() > 0) {
                if (this.writeQueue.offer(new ArrayList(this.removeList))) {
                    this.removeList.clear();
                } else {
                    this.logger.warn(new Object[]{"MySQL LOAD QUEUE IS FULL", Integer.valueOf(this.writeQueue.size())});
                }
            }
        }
    }

    public void processLoadResults() throws InterruptedException {
        long j = 0;
        long j2 = 0;
        while (true) {
            Map<String, String> poll = this.loadedResultsFromDBQueue.poll(this.dataStoreConfig.pollTimeoutMS(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                for (Map.Entry<String, String> entry : poll.entrySet()) {
                    List<GetRequest> put = this.readMapKeyToGetOperation.put(entry.getKey(), Lists.safeList(GetRequest.class));
                    if (put != null) {
                        j++;
                        for (GetRequest getRequest : put) {
                            if (entry.getValue() != null) {
                                this.outputDataQueue.put(new SingleResult(getRequest.messageId(), getRequest.clientId(), DataStoreSource.REMOTE_DB, entry.getKey(), entry.getValue()));
                            } else {
                                nextDataStoreGet(getRequest);
                                j2++;
                            }
                        }
                    }
                }
                if (j % 1000 == 0) {
                    this.outputDataQueue.put(new StatCount(DataStoreSource.REMOTE_DB, Action.GET, "recordsLoaded from MySQL", j));
                    this.outputDataQueue.put(new StatCount(DataStoreSource.REMOTE_DB, Action.GET, "notFound from MySQL", j2));
                }
            }
        }
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore
    protected void recievedReadBatch(List<DataStoreRequest> list) throws InterruptedException {
        handleGets(list);
        handleBatchRead(list);
    }

    private void handleBatchRead(List<DataStoreRequest> list) {
        for (DataStoreRequest dataStoreRequest : list) {
            if (dataStoreRequest instanceof ReadBatchRequest) {
                this.batchReadOperations.offer((ReadBatchRequest) dataStoreRequest);
            }
        }
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore
    protected void recievedWriteBatch(List<DataStoreRequest> list) throws InterruptedException {
        handleAddAll(list);
        handlePuts(list);
        handleRemove(list);
    }

    private void handleGets(List<DataStoreRequest> list) throws InterruptedException {
        for (DataStoreRequest dataStoreRequest : list) {
            if (dataStoreRequest instanceof GetRequest) {
                GetRequest getRequest = (GetRequest) dataStoreRequest;
                this.readList.add(getRequest.key());
                List<GetRequest> list2 = this.readMapKeyToGetOperation.get(getRequest.key());
                if (list2 == null) {
                    this.readMapKeyToGetOperation.put(getRequest.key(), Lists.safeList(new GetRequest[]{getRequest}));
                } else {
                    list2.add(getRequest);
                }
            }
        }
        if (this.readList.size() > this.minReadBatch) {
            if (this.loadQueue.size() == 0) {
                try {
                    this.loadQueue.put(new ArrayList(this.readList));
                    this.readList.clear();
                    return;
                } catch (Exception e) {
                    this.logger.error(e, new Object[]{"Unable to add an item to the load queue, this means we can't read from MySQL"});
                    return;
                }
            }
            try {
                if (this.readList.size() >= this.maxReadBatch) {
                    try {
                        this.loadQueue.put(new ArrayList(this.readList));
                        this.readList.clear();
                    } catch (Exception e2) {
                        this.logger.error(e2, new Object[]{"Unable to add an item to the load queue, this means we can't read from MySQL"});
                        this.readList.clear();
                    }
                }
            } catch (Throwable th) {
                this.readList.clear();
                throw th;
            }
        }
    }

    @Override // org.boon.slumberdb.stores.DataStore
    public void search(SearchRequest searchRequest) {
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore, org.boon.slumberdb.stores.DataStore
    public void addAll(BatchSetRequest batchSetRequest) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(batchSetRequest.payloadAsMap());
        Iterator<String> it = this.keyblackList.iterator();
        while (it.hasNext()) {
            linkedHashMap.remove(it.next());
        }
        this.writeOperationsQueue.offer(new BatchSetRequest(batchSetRequest.messageId(), batchSetRequest.clientId(), (Map<String, String>) linkedHashMap));
    }

    private void handleAddAll(List<DataStoreRequest> list) {
        for (DataStoreRequest dataStoreRequest : list) {
            if (dataStoreRequest.action() == Action.SET_BATCH) {
                this.writeQueue.put(((BatchSetRequest) dataStoreRequest).payloadAsMap());
            }
        }
    }

    private void handleRemove(List<DataStoreRequest> list) {
        for (DataStoreRequest dataStoreRequest : list) {
            if (dataStoreRequest instanceof RemoveRequest) {
                RemoveRequest removeRequest = (RemoveRequest) dataStoreRequest;
                if (!this.hasBlackList || !this.keyblackList.contains(removeRequest.key())) {
                    this.removeList.add(removeRequest.key());
                    if (this.removeList.size() >= this.maxWriteBatch) {
                        try {
                            this.writeQueue.put(this.outputMap);
                            this.removeList.clear();
                        } catch (Exception e) {
                            this.logger.fatal(e, new Object[]{"MySQL Store, Unable to add an item to the store queue for REMOVE, this means we can't write to MySQL", Integer.valueOf(this.removeList.size()), "queue size", Integer.valueOf(this.writeQueue.size())});
                        }
                    }
                }
            }
        }
    }

    private void handlePuts(List<DataStoreRequest> list) throws InterruptedException {
        for (DataStoreRequest dataStoreRequest : list) {
            if (dataStoreRequest instanceof SetRequest) {
                SetRequest setRequest = (SetRequest) dataStoreRequest;
                if (!this.hasBlackList || !this.keyblackList.contains(setRequest.key())) {
                    this.outputMap.put(setRequest.key(), setRequest.payload());
                }
            }
        }
        if (this.outputMap.size() > 0 && this.writeQueue.size() == 0) {
            try {
                this.writeQueue.put(new HashMap(this.outputMap));
                this.outputMap.clear();
            } catch (Exception e) {
                this.logger.error(e, new Object[]{"MySQL Store, Unable to add an item to the store queue, this means we can't write to MySQL buf size", Integer.valueOf(this.outputMap.size()), "queue size", Integer.valueOf(this.writeQueue.size())});
            }
        }
        if (this.outputMap.size() >= this.maxWriteBatch) {
            try {
                this.writeQueue.put(this.outputMap);
                this.outputMap.clear();
            } catch (Exception e2) {
                this.logger.fatal(e2, new Object[]{"MySQL Store, Unable to add an item to the store queue, this means we can't write to MySQL", Integer.valueOf(this.outputMap.size()), "queue size", Integer.valueOf(this.writeQueue.size())});
            }
        }
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore, org.boon.slumberdb.stores.DataStore
    public void stop() {
        super.stop();
        Iterator<Future<?>> it = this.futures.iterator();
        while (it.hasNext()) {
            try {
                it.next().cancel(true);
            } catch (Exception e) {
            }
        }
        Iterator<ScheduledExecutorService> it2 = this.scheduledExecutorServices.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().shutdown();
            } catch (Exception e2) {
            }
        }
    }

    @Override // org.boon.slumberdb.stores.BaseDataStore, org.boon.slumberdb.stores.DataStore
    public void start() {
        super.start();
        startWorker("Process Load Results", new Runnable() { // from class: org.boon.slumberdb.stores.mysql.MySQLDataStore.1
            @Override // java.lang.Runnable
            public void run() {
                if (MySQLDataStore.this.stop.get()) {
                    return;
                }
                try {
                    MySQLDataStore.this.processLoadResults();
                } catch (InterruptedException e) {
                    if (!MySQLDataStore.this.stop.get()) {
                        Thread.interrupted();
                    } else if (MySQLDataStore.this.debug) {
                        MySQLDataStore.this.logger.info("Thread stopped with stop flag.");
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                    MySQLDataStore.this.logger.error(e2, new Object[]{"MySQLDataStore ", e2.getMessage()});
                    MySQLDataStore.this.handleSQLException(e2);
                }
            }
        });
        for (int i = 0; i < this.numWriters; i++) {
            final MySQLDataStorePutManager mySQLDataStorePutManager = new MySQLDataStorePutManager(this.outputDataQueue, this.dataStoreConfig, this.stop, this.writeQueue, this.url, this.user, this.password, this.table);
            startWorker("Writer " + i, new Runnable() { // from class: org.boon.slumberdb.stores.mysql.MySQLDataStore.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        mySQLDataStorePutManager.run();
                    } catch (Exception e) {
                        MySQLDataStore.this.logger.error(e, new Object[]{"MySQLDataStore ", e.getMessage()});
                        MySQLDataStore.this.handleSQLException(e);
                    }
                }
            });
        }
        for (int i2 = 0; i2 < this.numReaders; i2++) {
            final MySQLDataStoreLoader mySQLDataStoreLoader = new MySQLDataStoreLoader(this.dataStoreConfig, this.outputDataQueue, this.stop, this.loadedResultsFromDBQueue, this.loadQueue, this.url, this.user, this.password, this.table);
            startWorker("Reader " + i2, new Runnable() { // from class: org.boon.slumberdb.stores.mysql.MySQLDataStore.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        mySQLDataStoreLoader.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                        MySQLDataStore.this.logger.error(e, new Object[]{"MySQLDataStore Batch Loader", e.getMessage()});
                        MySQLDataStore.this.handleSQLException(e);
                    }
                }
            });
        }
        for (int i3 = 0; i3 < this.numReaders; i3++) {
            final MySQLBatchLoader mySQLBatchLoader = new MySQLBatchLoader(this.dataStoreConfig, this.stop, this.outputDataQueue, this.batchReadOperations, this.url, this.user, this.password, this.table);
            startWorker("Batch Reader " + i3, new Runnable() { // from class: org.boon.slumberdb.stores.mysql.MySQLDataStore.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        mySQLBatchLoader.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                        MySQLDataStore.this.logger.error(e, new Object[]{"MySQLDataStore Batch Loader", e.getMessage()});
                        MySQLDataStore.this.handleSQLException(e);
                    }
                }
            });
        }
    }

    public void handleSQLException(Throwable th) {
        if (!(th instanceof SQLException)) {
            handleSQLException(th.getCause());
            return;
        }
        SQLException sQLException = (SQLException) th;
        SQLException nextException = sQLException.getNextException();
        while (true) {
            SQLException sQLException2 = nextException;
            if (sQLException2 == null) {
                return;
            }
            this.logger.error(sQLException2, new Object[]{"MySQLDataStore Nested SQL Exception", sQLException2.getMessage()});
            nextException = sQLException.getNextException();
        }
    }

    private void startWorker(final String str, Runnable runnable) {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.boon.slumberdb.stores.mysql.MySQLDataStore.5
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable2) {
                Thread thread = new Thread(runnable2);
                thread.setName(getClass().getSimpleName() + " " + str);
                return thread;
            }
        });
        this.scheduledExecutorServices.add(newScheduledThreadPool);
        this.futures.add(newScheduledThreadPool.scheduleAtFixedRate(runnable, 0L, this.dataStoreConfig.threadErrorResumeTimeMS(), TimeUnit.MILLISECONDS));
    }
}
