package io.kareldb;

import io.kareldb.kafka.KafkaSchema;
import io.kareldb.schema.Schema;
import io.kareldb.transaction.KarelDbCommitTable;
import io.kareldb.transaction.KarelDbTimestampStorage;
import io.kareldb.transaction.client.KarelDbTransactionManager;
import io.kcache.Cache;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.utils.Caches;
import io.kcache.utils.InMemoryCache;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.omid.transaction.RollbackException;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.transaction.TransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kareldb/KarelDbEngine.class */
public class KarelDbEngine implements Configurable, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KarelDbEngine.class);
    private KarelDbConfig config;
    private Cache<Long, Long> commits;
    private Cache<Long, Long> timestamps;
    private KarelDbTransactionManager transactionManager;
    private Schema schema;
    private final AtomicBoolean initialized = new AtomicBoolean();
    private static KarelDbEngine INSTANCE;

    public static synchronized KarelDbEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new KarelDbEngine();
        }
        return INSTANCE;
    }

    public static synchronized void closeInstance() {
        if (INSTANCE != null) {
            try {
                INSTANCE.close();
            } catch (IOException e) {
                LOG.warn("Could not close engine", e);
            }
            INSTANCE = null;
        }
    }

    private KarelDbEngine() {
    }

    public void configure(Map<String, ?> map) {
        configure(new KarelDbConfig(map));
    }

    public void configure(KarelDbConfig karelDbConfig) {
        this.config = karelDbConfig;
    }

    public void init() {
        Map originals = this.config.originals();
        String str = (String) originals.get("kafkacache.bootstrap.servers");
        String str2 = (String) originals.getOrDefault("kafkacache.group.id", "kareldb-1");
        if (str != null) {
            originals.put("kafkacache.topic", "_commits");
            originals.put("kafkacache.group.id", str2);
            originals.put("kafkacache.client.id", str2 + "-_commits");
            this.commits = new KafkaCache(new KafkaCacheConfig(originals), Serdes.Long(), Serdes.Long(), (CacheUpdateHandler) null, new InMemoryCache());
        } else {
            this.commits = new InMemoryCache();
        }
        this.commits = Caches.concurrentCache(this.commits);
        this.commits.init();
        if (str != null) {
            originals.put("kafkacache.topic", "_timestamps");
            originals.put("kafkacache.group.id", str2);
            originals.put("kafkacache.client.id", str2 + "-_timestamps");
            this.timestamps = new KafkaCache(new KafkaCacheConfig(originals), Serdes.Long(), Serdes.Long(), (CacheUpdateHandler) null, new InMemoryCache());
        } else {
            this.timestamps = new InMemoryCache();
        }
        this.timestamps = Caches.concurrentCache(this.timestamps);
        this.timestamps.init();
        this.transactionManager = KarelDbTransactionManager.newInstance(new KarelDbCommitTable(this.commits), new KarelDbTimestampStorage(this.timestamps));
        this.schema = (Schema) getConfiguredInstance((String) originals.getOrDefault("kind", KafkaSchema.class.getName()), originals);
        this.schema.init();
        if (!this.initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("Illegal state while initializing engine. Engine was already initialized");
        }
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public void sync() {
        this.commits.sync();
        this.timestamps.sync();
        this.schema.sync();
    }

    public Schema getSchema() {
        return this.schema;
    }

    public KarelDbTransactionManager getTxManager() {
        return this.transactionManager;
    }

    public Transaction beginTx() throws TransactionException {
        return this.transactionManager.begin();
    }

    public void commitTx(Transaction transaction) throws RollbackException, TransactionException {
        this.transactionManager.commit(transaction);
    }

    public void rollbackTx(Transaction transaction) throws TransactionException {
        this.transactionManager.rollback(transaction);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.transactionManager.close();
        this.schema.close();
    }

    public static <T> T getConfiguredInstance(String str, Map<String, ?> map) {
        try {
            Class<?> cls = Class.forName(str);
            if (cls == null) {
                return null;
            }
            Object newInstance = Utils.newInstance(cls);
            if (newInstance instanceof Configurable) {
                ((Configurable) newInstance).configure(map);
            }
            return (T) cls.cast(newInstance);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}
