package io.debezium.server.redis;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.resps.StreamEntry;

@ThreadSafe
/* loaded from: input_file:io/debezium/server/redis/RedisDatabaseHistory.class */
public final class RedisDatabaseHistory extends AbstractDatabaseHistory {
    private static final String CONFIGURATION_FIELD_PREFIX_STRING = "database.history.redis.";
    Duration initialRetryDelay;
    Duration maxRetryDelay;
    private static final String SINK_PROP_PREFIX = "debezium.sink.redis.";
    private Configuration config;
    private String redisKeyName;
    private String address;
    private String user;
    private String password;
    private boolean sslEnabled;
    private Integer connectionTimeout;
    private Integer socketTimeout;
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisDatabaseHistory.class);
    public static final Field PROP_ADDRESS = Field.create("database.history.redis.address").withDescription("The redis url that will be used to access the database history");
    public static final Field PROP_SSL_ENABLED = Field.create("database.history.redis.ssl.enabled").withDescription("Use SSL for Redis connection").withDefault("false");
    public static final Field PROP_USER = Field.create("database.history.redis.user").withDescription("The redis url that will be used to access the database history");
    public static final Field PROP_PASSWORD = Field.create("database.history.redis.password").withDescription("The redis url that will be used to access the database history");
    public static final Field PROP_KEY = Field.create("database.history.redis.key").withDescription("The redis key that will be used to store the database history").withDefault("metadata:debezium:db_history");
    public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
    public static final Field PROP_RETRY_INITIAL_DELAY = Field.create("database.history.redis.retry.initial.delay.ms").withDescription("Initial retry delay (in ms)").withDefault(DEFAULT_RETRY_INITIAL_DELAY.intValue());
    public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
    public static final Field PROP_RETRY_MAX_DELAY = Field.create("database.history.redis.retry.max.delay.ms").withDescription("Maximum retry delay (in ms)").withDefault(DEFAULT_RETRY_MAX_DELAY.intValue());
    public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
    public static final Field PROP_CONNECTION_TIMEOUT = Field.create("database.history.redis.connection.timeout.ms").withDescription("Connection timeout (in ms)").withDefault(DEFAULT_CONNECTION_TIMEOUT.intValue());
    public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
    public static final Field PROP_SOCKET_TIMEOUT = Field.create("database.history.redis.socket.timeout.ms").withDescription("Socket timeout (in ms)").withDefault(DEFAULT_SOCKET_TIMEOUT.intValue());
    public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, new Field[]{PROP_USER, PROP_PASSWORD, PROP_KEY});
    private final DocumentWriter writer = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private final AtomicBoolean running = new AtomicBoolean();
    private Jedis client = null;

    void connect() {
        this.client = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout.intValue(), this.socketTimeout.intValue(), this.sslEnabled).getRedisClient("debezium:db_history");
    }

    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, DatabaseHistoryListener databaseHistoryListener, boolean z) {
        Collection<Field> collection = ALL_FIELDS;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(collection, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        Collection<Field> collection2 = ALL_FIELDS;
        Logger logger2 = LOGGER;
        Objects.requireNonNull(logger2);
        configuration.validateAndRecord(collection2, logger2::error);
        this.config = configuration;
        this.address = this.config.getString(PROP_ADDRESS.name());
        if (this.address == null) {
            this.address = this.config.getString("debezium.sink.redis.address");
            this.user = this.config.getString("debezium.sink.redis.user");
            this.password = this.config.getString("debezium.sink.redis.password");
            this.sslEnabled = Boolean.parseBoolean(this.config.getString("debezium.sink.redis.ssl.enabled"));
        } else {
            this.user = this.config.getString(PROP_USER.name());
            this.password = this.config.getString(PROP_PASSWORD.name());
            this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name()));
        }
        this.redisKeyName = this.config.getString(PROP_KEY);
        LOGGER.info("rediskeyname:" + this.redisKeyName);
        this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY));
        this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY));
        this.connectionTimeout = Integer.valueOf(this.config.getInteger(PROP_CONNECTION_TIMEOUT));
        this.socketTimeout = Integer.valueOf(this.config.getInteger(PROP_SOCKET_TIMEOUT));
        super.configure(configuration, historyRecordComparator, databaseHistoryListener, z);
    }

    public synchronized void start() {
        super.start();
        LOGGER.info("Starting RedisDatabaseHistory");
        connect();
    }

    protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
        if (historyRecord == null) {
            return;
        }
        try {
            String write = this.writer.write(historyRecord.document());
            DelayStrategy exponential = DelayStrategy.exponential(this.initialRetryDelay, this.maxRetryDelay);
            boolean z = false;
            while (!z) {
                try {
                    if (this.client == null) {
                        connect();
                    }
                    this.client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", write));
                    LOGGER.trace("Record written to database history in redis: " + write);
                    z = true;
                } catch (JedisConnectionException e) {
                    LOGGER.warn("Attempting to reconnect to redis ");
                    connect();
                } catch (Exception e2) {
                    LOGGER.warn("Writing to database history stream failed", e2);
                    LOGGER.warn("Will retry");
                }
                if (!z) {
                    exponential.sleepWhen(!z);
                }
            }
        } catch (IOException e3) {
            LOGGER.error("Failed to convert record to string: {}", historyRecord, e3);
            throw new DatabaseHistoryException("Unable to write database history record");
        }
    }

    public void stop() {
        this.running.set(false);
        if (this.client != null) {
            this.client.disconnect();
        }
        super.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v26, types: [java.util.List] */
    protected synchronized void recoverRecords(Consumer<HistoryRecord> consumer) {
        DelayStrategy exponential = DelayStrategy.exponential(this.initialRetryDelay, this.maxRetryDelay);
        boolean z = false;
        ArrayList<StreamEntry> arrayList = new ArrayList();
        while (!z) {
            try {
                if (this.client == null) {
                    connect();
                }
                arrayList = this.client.xrange(this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null);
                z = true;
            } catch (JedisConnectionException e) {
                LOGGER.warn("Attempting to reconnect to redis ");
                connect();
            } catch (Exception e2) {
                LOGGER.warn("Reading from database history stream failed with " + e2);
                LOGGER.warn("Will retry");
            }
            if (!z) {
                exponential.sleepWhen(!z);
            }
        }
        for (StreamEntry streamEntry : arrayList) {
            try {
                consumer.accept(new HistoryRecord(this.reader.read((String) streamEntry.getFields().get("schema"))));
            } catch (IOException e3) {
                LOGGER.error("Failed to convert record to string: {}", streamEntry, e3);
                return;
            }
        }
    }

    public boolean storageExists() {
        return true;
    }

    public boolean exists() {
        return this.client != null && this.client.xlen(this.redisKeyName) > 0;
    }
}
