package io.microraft.store.sqlite;

import io.microraft.RaftEndpoint;
import io.microraft.lifecycle.RaftNodeLifecycleAware;
import io.microraft.model.RaftModelFactory;
import io.microraft.model.log.LogEntry;
import io.microraft.model.log.RaftGroupMembersView;
import io.microraft.model.log.SnapshotChunk;
import io.microraft.model.log.SnapshotEntry;
import io.microraft.persistence.RaftStore;
import io.microraft.persistence.RestoredRaftState;
import io.microraft.store.sqlite.StoreModelSerializer;
import java.io.File;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jooq.CloseableDSLContext;
import org.jooq.Converter;
import org.jooq.Field;
import org.jooq.GroupField;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record5;
import org.jooq.Select;
import org.jooq.Table;
import org.jooq.TableLike;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.sqlite.SQLiteConfig;

/* loaded from: input_file:io/microraft/store/sqlite/RaftSqliteStore.class */
public final class RaftSqliteStore implements RaftStore, RaftNodeLifecycleAware {
    private static final String PK = "pk";
    private final Field<RaftGroupMembersView> initialGroupMembersField;
    private final Field<RaftEndpoint> localEndpointField;
    private final Field<RaftEndpoint> votedForField;
    private final Field<LogEntry> logEntryField;
    private final Field<SnapshotChunk> chunkField;
    private final CloseableDSLContext dsl;
    private final RaftModelFactory raftModelFactory;
    private boolean tryToCleanUpOldSnapshots = false;
    private static final Table<Record> KV = DSL.table("kv");
    private static final Field<String> KEY = DSL.field("key", SQLDataType.VARCHAR);
    private static final Field<Boolean> LOCAL_ENDPOINT_VOTING = DSL.field("localEndpointVoting", SQLDataType.BOOLEAN);
    private static final Field<Integer> TERM = DSL.field("term", SQLDataType.INTEGER);
    private static final Table<Record> LOG_ENTRIES = DSL.table("logEntries");
    private static final Field<Long> INDEX = DSL.field("logIndex", SQLDataType.BIGINT);
    private static final Table<Record> SNAPSHOT_CHUNKS = DSL.table("snapshotChunks");
    private static final Field<Integer> CHUNK_INDEX = DSL.field("chunkIndex", SQLDataType.INTEGER);
    private static final Field<Integer> CHUNK_COUNT = DSL.field("chunkCount", SQLDataType.INTEGER);
    private static final Name COUNT = DSL.name("count");

    /* loaded from: input_file:io/microraft/store/sqlite/RaftSqliteStore$JooqConverterAdapter.class */
    private static final class JooqConverterAdapter<T> implements Converter<byte[], T> {
        private final StoreModelSerializer.Serializer<T> serializer;
        private final Class<T> clazz;

        private JooqConverterAdapter(StoreModelSerializer.Serializer<T> serializer, Class<T> cls) {
            this.serializer = serializer;
            this.clazz = cls;
        }

        @Nullable
        public T from(@Nullable byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            return this.serializer.deserialize(bArr);
        }

        public byte[] to(T t) {
            if (t == null) {
                return null;
            }
            return this.serializer.serialize(t);
        }

        public Class<byte[]> fromType() {
            return byte[].class;
        }

        public Class<T> toType() {
            return this.clazz;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: to, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m1to(Object obj) {
            return to((JooqConverterAdapter<T>) obj);
        }
    }

    public RaftSqliteStore(CloseableDSLContext closeableDSLContext, StoreModelSerializer storeModelSerializer, RaftModelFactory raftModelFactory) {
        this.dsl = closeableDSLContext;
        this.raftModelFactory = raftModelFactory;
        this.initialGroupMembersField = DSL.field("initialGroupMembers", SQLDataType.BINARY.asConvertedDataType(new JooqConverterAdapter(storeModelSerializer.raftGroupMembersViewSerializer(), RaftGroupMembersView.class)));
        this.logEntryField = DSL.field("logEntry", SQLDataType.BINARY.asConvertedDataType(new JooqConverterAdapter(storeModelSerializer.logEntrySerializer(), LogEntry.class)));
        this.localEndpointField = DSL.field("localEndpoint", SQLDataType.BINARY.asConvertedDataType(new JooqConverterAdapter(storeModelSerializer.raftEndpointSerializer(), RaftEndpoint.class)));
        this.votedForField = DSL.field("votedFor", SQLDataType.BINARY.asConvertedDataType(new JooqConverterAdapter(storeModelSerializer.raftEndpointSerializer(), RaftEndpoint.class)));
        this.chunkField = DSL.field("chunk", SQLDataType.BINARY.asConvertedDataType(new JooqConverterAdapter(storeModelSerializer.snapshotChunkSerializer(), SnapshotChunk.class)));
    }

    private void createTablesIfNotExists() {
        this.dsl.createTableIfNotExists(KV).column(KEY).column(this.localEndpointField).column(LOCAL_ENDPOINT_VOTING).column(this.initialGroupMembersField).column(TERM).column(this.votedForField).primaryKey(new Field[]{KEY}).execute();
        this.dsl.insertInto(KV).columns(KEY).values(PK).onConflictDoNothing().execute();
        this.dsl.createTableIfNotExists(LOG_ENTRIES).column(INDEX).column(this.logEntryField).primaryKey(new Field[]{INDEX}).execute();
        this.dsl.createTableIfNotExists(SNAPSHOT_CHUNKS).columns(new Field[]{INDEX, CHUNK_INDEX}).column(CHUNK_COUNT).column(this.chunkField).primaryKey(new Field[]{INDEX, CHUNK_INDEX}).execute();
        this.dsl.connection((v0) -> {
            v0.commit();
        });
    }

    public static RaftSqliteStore create(File file, RaftModelFactory raftModelFactory, StoreModelSerializer storeModelSerializer) {
        SQLiteConfig sQLiteConfig = new SQLiteConfig();
        sQLiteConfig.setPragma(SQLiteConfig.Pragma.JOURNAL_MODE, SQLiteConfig.JournalMode.WAL.getValue());
        sQLiteConfig.setPragma(SQLiteConfig.Pragma.LOCKING_MODE, SQLiteConfig.LockingMode.EXCLUSIVE.getValue());
        sQLiteConfig.setPragma(SQLiteConfig.Pragma.SYNCHRONOUS, "EXTRA");
        CloseableDSLContext using = DSL.using(jdbcUrl(file), sQLiteConfig.toProperties());
        using.connection(connection -> {
            connection.setAutoCommit(false);
        });
        RaftSqliteStore raftSqliteStore = new RaftSqliteStore(using, storeModelSerializer, raftModelFactory);
        raftSqliteStore.createTablesIfNotExists();
        return raftSqliteStore;
    }

    private static String jdbcUrl(File file) {
        return "jdbc:sqlite:" + file;
    }

    public void onRaftNodeTerminate() {
        this.dsl.connection((v0) -> {
            v0.rollback();
        });
        this.dsl.close();
    }

    public void persistAndFlushLocalEndpoint(RaftEndpoint raftEndpoint, boolean z) {
        this.dsl.update(KV).set(this.localEndpointField, raftEndpoint).set(LOCAL_ENDPOINT_VOTING, Boolean.valueOf(z)).execute();
        this.dsl.connection((v0) -> {
            v0.commit();
        });
    }

    public void persistAndFlushInitialGroupMembers(@Nonnull RaftGroupMembersView raftGroupMembersView) {
        this.dsl.update(KV).set(this.initialGroupMembersField, raftGroupMembersView).execute();
        this.dsl.connection((v0) -> {
            v0.commit();
        });
    }

    public void persistAndFlushTerm(int i, @Nullable RaftEndpoint raftEndpoint) {
        this.dsl.update(KV).set(TERM, Integer.valueOf(i)).set(this.votedForField, raftEndpoint).execute();
        this.dsl.connection((v0) -> {
            v0.commit();
        });
    }

    public void persistLogEntry(@Nonnull LogEntry logEntry) {
        this.dsl.insertInto(LOG_ENTRIES, INDEX, this.logEntryField).values(Long.valueOf(logEntry.getIndex()), logEntry).onDuplicateKeyIgnore().execute();
    }

    public void persistSnapshotChunk(@Nonnull SnapshotChunk snapshotChunk) {
        this.dsl.insertInto(SNAPSHOT_CHUNKS, INDEX, CHUNK_INDEX, CHUNK_COUNT, this.chunkField).values(Long.valueOf(snapshotChunk.getIndex()), Integer.valueOf(snapshotChunk.getSnapshotChunkIndex()), Integer.valueOf(snapshotChunk.getSnapshotChunkCount()), snapshotChunk).onDuplicateKeyIgnore().execute();
        this.tryToCleanUpOldSnapshots = true;
    }

    Optional<Long> getMaxCommittedSnapshotIndex() {
        return Optional.ofNullable((Long) this.dsl.select(DSL.max(INDEX).as(INDEX)).from(completedSnapshots()).fetchOne(INDEX));
    }

    private static <T> Field<T> qualify(Table<?> table, Field<T> field) {
        return DSL.field(DSL.name(new Name[]{table.$name(), field.$name()}), field.getType());
    }

    public void truncateLogEntriesFrom(long j) {
        this.dsl.deleteFrom(LOG_ENTRIES).where(INDEX.greaterOrEqual(Long.valueOf(j))).execute();
    }

    private Select<? extends Record1<Long>> completedSnapshots() {
        Table table = DSL.table("t");
        return this.dsl.select(qualify(table, INDEX)).from(new TableLike[]{this.dsl.select(qualify(SNAPSHOT_CHUNKS, INDEX), DSL.count(qualify(SNAPSHOT_CHUNKS, INDEX)).as(COUNT)).from(SNAPSHOT_CHUNKS).groupBy(new GroupField[]{qualify(SNAPSHOT_CHUNKS, INDEX)}).asTable(table), SNAPSHOT_CHUNKS}).where(qualify(table, INDEX).eq(qualify(SNAPSHOT_CHUNKS, INDEX))).and(DSL.field(COUNT).eq(CHUNK_COUNT));
    }

    public void truncateSnapshotChunksUntil(long j) {
        this.dsl.deleteFrom(SNAPSHOT_CHUNKS).where(qualify(SNAPSHOT_CHUNKS, INDEX).le(Long.valueOf(j))).and(qualify(SNAPSHOT_CHUNKS, INDEX).notIn(completedSnapshots())).execute();
    }

    List<SnapshotChunk> getAllSnapshotChunks() {
        return this.dsl.select(this.chunkField).from(SNAPSHOT_CHUNKS).orderBy(INDEX, CHUNK_INDEX).fetch(this.chunkField);
    }

    public void flush() {
        this.dsl.connection((v0) -> {
            v0.commit();
        });
        if (this.tryToCleanUpOldSnapshots) {
            this.tryToCleanUpOldSnapshots = false;
            getMaxCommittedSnapshotIndex().ifPresent(l -> {
                this.dsl.deleteFrom(LOG_ENTRIES).where(INDEX.lessOrEqual(l)).execute();
                this.dsl.deleteFrom(SNAPSHOT_CHUNKS).where(INDEX.lessThan(l)).execute();
                this.dsl.connection((v0) -> {
                    v0.commit();
                });
            });
        }
    }

    public Optional<RestoredRaftState> getRestoredRaftState() {
        Record5 fetchOne = this.dsl.select(this.localEndpointField, LOCAL_ENDPOINT_VOTING, this.initialGroupMembersField, TERM, this.votedForField).from(KV).fetchOne();
        if (fetchOne == null) {
            return Optional.empty();
        }
        if (fetchOne.get(this.localEndpointField) != null && fetchOne.get(LOCAL_ENDPOINT_VOTING) != null && fetchOne.get(this.initialGroupMembersField) != null && fetchOne.get(TERM) != null && fetchOne.get(this.votedForField) != null) {
            return Optional.of(new RestoredRaftState((RaftEndpoint) fetchOne.get(this.localEndpointField), ((Boolean) fetchOne.get(LOCAL_ENDPOINT_VOTING)).booleanValue(), (RaftGroupMembersView) fetchOne.get(this.initialGroupMembersField), ((Integer) fetchOne.get(TERM)).intValue(), (RaftEndpoint) fetchOne.get(this.votedForField), (SnapshotEntry) getMaxCommittedSnapshotIndex().map(l -> {
                List fetch = this.dsl.select(this.chunkField).from(SNAPSHOT_CHUNKS).where(INDEX.eq(l)).orderBy(CHUNK_INDEX).fetch(this.chunkField);
                return this.raftModelFactory.createSnapshotEntryBuilder().setSnapshotChunks(fetch).setIndex(l.longValue()).setTerm(((SnapshotChunk) fetch.get(0)).getTerm()).setGroupMembersView(((SnapshotChunk) fetch.get(0)).getGroupMembersView()).build();
            }).orElse(null), this.dsl.select(this.logEntryField).from(LOG_ENTRIES).orderBy(INDEX).fetch(this.logEntryField)));
        }
        checkState((fetchOne.get(this.localEndpointField) == null) == (fetchOne.get(LOCAL_ENDPOINT_VOTING) == null), "expected localEndpoint and localEndpointVoting to be both unset, or neither unset");
        checkState(fetchOne.get(TERM) != null || fetchOne.get(this.votedForField) == null, "expected since voted for is set, term should be unset");
        checkState(fetchOne.get(this.initialGroupMembersField) != null || (fetchOne.get(TERM) == null && fetchOne.get(this.localEndpointField) == null), "expected initial group members and local endpoint fields to be set before this node can vote");
        return Optional.empty();
    }

    private static void checkState(boolean z, String str) {
        if (!z) {
            throw new IllegalStateException(str);
        }
    }
}
