package io.fixprotocol.silverflash.fixp.store;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/* loaded from: input_file:io/fixprotocol/silverflash/fixp/store/CassandraMessageStore.class */
public class CassandraMessageStore implements MessageStore {
    private static final int BUFFER_CAPACITY = 2048;
    private static final String KEYSPACE_NAME = "fixp";
    private static final String MESSAGE_COLNAME = "message";
    private static final String SEQ_NO_COLNAME = "seqNo";
    private static final String SESSION_ID_COLNAME = "sessionId";
    private static final String TABLE_NAME = "messages";
    private BoundStatement boundInsertStatement;
    private BoundStatement boundSelectMaxStatement;
    private BoundStatement boundSelectStatement;
    private Cluster cluster;
    private final String contactPoints;
    private int numberOfThreads = 1;
    private final Executor executor = Executors.newFixedThreadPool(this.numberOfThreads);
    private int maxSimultaneousRequests = 50;
    private Session session;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/fixprotocol/silverflash/fixp/store/CassandraMessageStore$RowConsumer.class */
    public static class RowConsumer implements Consumer<Row> {
        private final ArrayList<ByteBuffer> arrayList;
        private int index = 0;
        private final int initialSize;

        public RowConsumer(MessageStoreResult messageStoreResult) {
            this.arrayList = messageStoreResult.getMessageList();
            this.initialSize = this.arrayList.size();
        }

        @Override // java.util.function.Consumer
        public void accept(Row row) {
            ByteBuffer bytes = row.getBytes(CassandraMessageStore.MESSAGE_COLNAME);
            if (this.index >= this.initialSize) {
                this.arrayList.add(ByteBuffer.allocate(CassandraMessageStore.BUFFER_CAPACITY));
            }
            ByteBuffer byteBuffer = this.arrayList.get(this.index);
            byteBuffer.clear();
            byteBuffer.put(bytes);
            byteBuffer.flip();
            this.index++;
        }
    }

    public CassandraMessageStore(String str) {
        this.contactPoints = str;
    }

    public void buildSchema() throws StoreException {
        if (this.session == null) {
            throw new StoreException("Schema not created; store not open");
        }
        this.session.execute(new SimpleStatement("CREATE KEYSPACE IF NOT EXISTS fixp WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"));
        System.out.format("Keyspace %s available\n", KEYSPACE_NAME);
        this.session.execute(SchemaBuilder.createTable(KEYSPACE_NAME, TABLE_NAME).ifNotExists().addPartitionKey(SESSION_ID_COLNAME, DataType.uuid()).addClusteringColumn(SEQ_NO_COLNAME, DataType.bigint()).addColumn(MESSAGE_COLNAME, DataType.blob()));
        System.out.format("Table %s available\n", TABLE_NAME);
    }

    public void close() throws Exception {
        if (this.session != null) {
            this.session.close();
            this.cluster.close();
            this.session = null;
        }
    }

    public void dropSchema() throws StoreException {
        if (this.session == null) {
            throw new StoreException("Schema not dropped; store not open");
        }
        this.session.execute(SchemaBuilder.dropTable(KEYSPACE_NAME, TABLE_NAME).ifExists());
        System.out.println("Schema dropped");
    }

    public void insertMessage(UUID uuid, long j, ByteBuffer byteBuffer) throws StoreException {
        this.boundInsertStatement.bind(new Object[]{uuid, Long.valueOf(j), byteBuffer});
        this.session.execute(this.boundInsertStatement);
    }

    public CompletableFuture<CassandraMessageStore> open() {
        CompletableFuture<CassandraMessageStore> completableFuture = new CompletableFuture<>();
        this.executor.execute(() -> {
            try {
                this.cluster = new Cluster.Builder().addContactPoints(new String[]{this.contactPoints}).withSocketOptions(new SocketOptions().setTcpNoDelay(true)).build();
                this.cluster.getConfiguration().getProtocolOptions().setCompression(ProtocolOptions.Compression.LZ4);
                this.session = this.cluster.connect("system");
                Metadata metadata = this.cluster.getMetadata();
                System.out.println(String.format("Connected to cluster '%s' on %s.", metadata.getClusterName(), metadata.getAllHosts()));
                buildSchema();
                this.session.execute(new SimpleStatement("USE fixp"));
                System.out.format("Using keyspace %s\n", KEYSPACE_NAME);
                this.boundInsertStatement = prepareInsertStatement();
                this.boundSelectStatement = prepareSelectStatement();
                this.boundSelectMaxStatement = prepareSelectMaxStatement();
            } catch (DriverException | StoreException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    public long retrieveMaxSeqNo(UUID uuid) {
        long j = 0;
        this.boundSelectMaxStatement.bind(new Object[]{uuid});
        Iterator it = this.session.execute(this.boundSelectMaxStatement).all().iterator();
        if (it.hasNext()) {
            j = ((Row) it.next()).getLong(SEQ_NO_COLNAME);
        }
        return j;
    }

    public void retrieveMessagesAsync(MessageStoreResult messageStoreResult, final Consumer<MessageStoreResult> consumer) {
        long fromSeqNo = messageStoreResult.getFromSeqNo();
        this.boundSelectStatement.bind(new Object[]{messageStoreResult.getSessionId(), Long.valueOf(fromSeqNo), Long.valueOf(fromSeqNo + messageStoreResult.getCountRequested())});
        Futures.addCallback(Futures.transform(this.session.executeAsync(this.boundSelectStatement), resultSet -> {
            resultSet.forEach(new RowConsumer(messageStoreResult));
            return messageStoreResult;
        }), new FutureCallback<MessageStoreResult>() { // from class: io.fixprotocol.silverflash.fixp.store.CassandraMessageStore.1
            public void onFailure(Throwable th) {
                th.printStackTrace();
            }

            public void onSuccess(MessageStoreResult messageStoreResult2) {
                consumer.accept(messageStoreResult2);
            }
        }, this.executor);
    }

    public boolean schemaExists() throws StoreException {
        if (this.session == null) {
            throw new StoreException("Keyspaces not available; store not open");
        }
        boolean z = false;
        Iterator it = this.session.execute(QueryBuilder.select().column("keyspace_name").from("system.schema_keyspaces")).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (KEYSPACE_NAME.equals(((Row) it.next()).getString("keyspace_name"))) {
                z = true;
                break;
            }
        }
        return z;
    }

    private BoundStatement prepareInsertStatement() {
        return new BoundStatement(this.session.prepare("INSERT INTO messages(sessionId, seqNo, message) VALUES (?,?,?);"));
    }

    private BoundStatement prepareSelectMaxStatement() {
        return new BoundStatement(this.session.prepare("SELECT seqNo FROM messages WHERE sessionId= ? ORDER BY seqNo DESC LIMIT 1"));
    }

    private BoundStatement prepareSelectStatement() {
        return new BoundStatement(this.session.prepare("SELECT message FROM messages WHERE sessionId= ? AND seqNo >= ? AND seqNo <= ?"));
    }
}
