package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.DebeziumException;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.class */
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor implements CacheProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInfinispanLogMinerEventProcessor.class);
    private static final String HOTROD_CLIENT_LOOKUP_PREFIX = "log.mining.buffer.infinispan.client.";
    private static final String HOTROD_CLIENT_PREFIX = "infinispan.client.";
    public static final String HOTROD_SERVER_LIST = "log.mining.buffer.infinispan.client.hotrod.server_list";
    private final RemoteCacheManager cacheManager;
    private final boolean dropBufferOnStop;
    private final RemoteCache<String, InfinispanTransaction> transactionCache;
    private final RemoteCache<String, LogMinerEvent> eventCache;
    private final RemoteCache<String, String> processedTransactionsCache;
    private final RemoteCache<String, String> schemaChangesCache;

    public RemoteInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        super(changeEventSourceContext, oracleConnectorConfig, oracleConnection, eventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics);
        Configuration build = new ConfigurationBuilder().withProperties(getHotrodClientProperties(oracleConnectorConfig)).addContextInitializer(TransactionMarshallerImpl.class.getName()).addContextInitializer(LogMinerEventMarshallerImpl.class.getName()).build();
        LOGGER.info("Using Infinispan in Hotrod client mode");
        this.cacheManager = new RemoteCacheManager(build, true);
        this.dropBufferOnStop = oracleConnectorConfig.isLogMiningBufferDropOnStop();
        this.transactionCache = createCache(CacheProvider.TRANSACTIONS_CACHE_NAME, oracleConnectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
        this.processedTransactionsCache = createCache(CacheProvider.PROCESSED_TRANSACTIONS_CACHE_NAME, oracleConnectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_PROCESSED_TRANSACTIONS);
        this.schemaChangesCache = createCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME, oracleConnectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
        this.eventCache = createCache(CacheProvider.EVENTS_CACHE_NAME, oracleConnectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
        displayCacheStatistics();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.dropBufferOnStop) {
            LOGGER.info("Clearing infinispan caches");
            this.transactionCache.clear();
            this.eventCache.clear();
            this.schemaChangesCache.clear();
            this.processedTransactionsCache.clear();
            this.cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
            this.cacheManager.administration().removeCache(CacheProvider.PROCESSED_TRANSACTIONS_CACHE_NAME);
            this.cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
            this.cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
        }
        LOGGER.info("Shutting down infinispan remote caches");
        this.cacheManager.close();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    /* renamed from: getTransactionCache */
    public BasicCache<String, InfinispanTransaction> mo59getTransactionCache() {
        return this.transactionCache;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    public BasicCache<String, LogMinerEvent> getEventCache() {
        return this.eventCache;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    public BasicCache<String, String> getSchemaChangesCache() {
        return this.schemaChangesCache;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    public BasicCache<String, String> getProcessedTransactionsCache() {
        return this.processedTransactionsCache;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Scn getTransactionCacheMinimumScn() {
        Scn scn = Scn.NULL;
        CloseableIterator it = this.transactionCache.values().iterator();
        while (it.hasNext()) {
            try {
                Scn startScn = ((InfinispanTransaction) it.next()).getStartScn();
                if (scn.isNull()) {
                    scn = startScn;
                } else if (startScn.compareTo(scn) < 0) {
                    scn = startScn;
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (it != null) {
            it.close();
        }
        return scn;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Optional<InfinispanTransaction> getOldestTransactionInCache() {
        InfinispanTransaction infinispanTransaction = null;
        if (!this.transactionCache.isEmpty()) {
            CloseableIterator it = this.transactionCache.values().iterator();
            try {
                infinispanTransaction = (InfinispanTransaction) it.next();
                while (it.hasNext()) {
                    InfinispanTransaction infinispanTransaction2 = (InfinispanTransaction) it.next();
                    int compareTo = infinispanTransaction2.getStartScn().compareTo(infinispanTransaction.getStartScn());
                    if (compareTo < 0) {
                        infinispanTransaction = infinispanTransaction2;
                    } else if (compareTo == 0 && infinispanTransaction2.getChangeTime().isBefore(infinispanTransaction.getChangeTime())) {
                        infinispanTransaction = infinispanTransaction2;
                    }
                }
                if (it != null) {
                    it.close();
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return Optional.ofNullable(infinispanTransaction);
    }

    private Properties getHotrodClientProperties(OracleConnectorConfig oracleConnectorConfig) {
        Map asMap = oracleConnectorConfig.getConfig().subset(HOTROD_CLIENT_LOOKUP_PREFIX, true).asMap();
        Properties properties = new Properties();
        for (Map.Entry entry : asMap.entrySet()) {
            properties.put("infinispan.client." + ((String) entry.getKey()), entry.getValue());
            if (((String) entry.getKey()).toLowerCase().endsWith("infinispan.client.hotrod.auth_username".toLowerCase())) {
                properties.put("infinispan.client.hotrod.use_auth", "true");
            }
        }
        return properties;
    }

    private <C, V> RemoteCache<C, V> createCache(String str, OracleConnectorConfig oracleConnectorConfig, Field field) {
        Objects.requireNonNull(str);
        RemoteCache<C, V> cache = this.cacheManager.getCache(str);
        if (cache != null) {
            LOGGER.info("Remote cache '{}' already defined.", str);
            return cache;
        }
        String string = oracleConnectorConfig.getConfig().getString(field);
        Objects.requireNonNull(string);
        RemoteCache<C, V> createCache = this.cacheManager.administration().createCache(str, new XMLStringConfiguration(string));
        if (createCache == null) {
            throw new DebeziumException("Failed to create remote Infinispan cache: " + str);
        }
        LOGGER.info("Created remote infinispan cache: {}", str);
        return createCache;
    }
}
