package io.debezium.server.redis;

import io.debezium.config.Field;
import io.smallrye.mutiny.Uni;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

/* loaded from: input_file:io/debezium/server/redis/RedisOffsetBackingStore.class */
public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
    private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis.";
    private static final String SINK_PROP_PREFIX = "debezium.sink.redis.";
    private String redisKeyName;
    private String address;
    private String user;
    private String password;
    private boolean sslEnabled;
    private Jedis client = null;
    private Map<String, String> config;
    private Integer initialRetryDelay;
    private Integer maxRetryDelay;
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
    public static final Field PROP_ADDRESS = Field.create("offset.storage.redis.address").withDescription("The redis url that will be used to access the database history");
    public static final Field PROP_SSL_ENABLED = Field.create("offset.storage.redis.ssl.enabled").withDescription("Use SSL for Redis connection").withDefault("false");
    public static final Field PROP_USER = Field.create("offset.storage.redis.user").withDescription("The redis url that will be used to access the database history");
    public static final Field PROP_PASSWORD = Field.create("offset.storage.redis.password").withDescription("The redis url that will be used to access the database history");
    public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
    public static final Field PROP_KEY_NAME = Field.create("offset.storage.redis.key").withDescription("The redis key that will be used to store the database history").withDefault(DEFAULT_REDIS_KEY_NAME);
    public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
    public static final Field PROP_RETRY_INITIAL_DELAY = Field.create("offset.storage.redis.retry.initial.delay.ms").withDescription("Initial retry delay (in ms)").withDefault(DEFAULT_RETRY_INITIAL_DELAY.intValue());
    public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
    public static final Field PROP_RETRY_MAX_DELAY = Field.create("offset.storage.redis.retry.max.delay.ms").withDescription("Maximum retry delay (in ms)").withDefault(DEFAULT_RETRY_MAX_DELAY.intValue());

    void connect() {
        this.client = new RedisConnection(this.address, this.user, this.password, this.sslEnabled).getRedisClient("debezium:offsets");
    }

    public void configure(WorkerConfig workerConfig) {
        super.configure(workerConfig);
        this.config = workerConfig.originalsStrings();
        this.address = this.config.get(PROP_ADDRESS.name());
        if (this.address == null) {
            this.address = this.config.get("debezium.sink.redis.address");
            this.user = this.config.get("debezium.sink.redis.user");
            this.password = this.config.get("debezium.sink.redis.password");
            this.sslEnabled = Boolean.parseBoolean(this.config.get("debezium.sink.redis.ssl.enabled"));
        } else {
            this.user = this.config.get(PROP_USER.name());
            this.password = this.config.get(PROP_PASSWORD.name());
            this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name()));
        }
        this.redisKeyName = (String) Optional.ofNullable(this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME);
        this.initialRetryDelay = (Integer) Optional.ofNullable(Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY);
        this.maxRetryDelay = (Integer) Optional.ofNullable(Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
    }

    public synchronized void start() {
        super.start();
        LOGGER.info("Starting RedisOffsetBackingStore");
        connect();
        load();
    }

    public synchronized void stop() {
        super.stop();
        LOGGER.info("Stopped RedisOffsetBackingStore");
    }

    private void load() {
        Map map = (Map) Uni.createFrom().item(() -> {
            return this.client.hgetAll(this.redisKeyName);
        }).onFailure().invoke(th -> {
            LOGGER.warn("Reading from offset store failed with " + th);
            LOGGER.warn("Will retry");
        }).onFailure(JedisConnectionException.class).invoke(th2 -> {
            LOGGER.warn("Attempting to reconnect to redis ");
            connect();
        }).onFailure().retry().withBackOff(Duration.ofMillis(this.initialRetryDelay.intValue()), Duration.ofMillis(this.maxRetryDelay.intValue())).indefinitely().invoke(map2 -> {
            LOGGER.trace("Offsets fetched from redis: " + map2);
        }).await().indefinitely();
        this.data = new HashMap();
        for (Map.Entry entry : map.entrySet()) {
            this.data.put(entry.getKey() != null ? ByteBuffer.wrap(((String) entry.getKey()).getBytes()) : null, entry.getValue() != null ? ByteBuffer.wrap(((String) entry.getValue()).getBytes()) : null);
        }
    }

    protected void save() {
        for (Map.Entry entry : this.data.entrySet()) {
            byte[] array = entry.getKey() != null ? ((ByteBuffer) entry.getKey()).array() : null;
            byte[] array2 = entry.getValue() != null ? ((ByteBuffer) entry.getValue()).array() : null;
            Uni.createFrom().item(() -> {
                return Long.valueOf(this.client.hset(this.redisKeyName.getBytes(), array, array2));
            }).onFailure().invoke(th -> {
                LOGGER.warn("Writing to offset store failed with " + th);
                LOGGER.warn("Will retry");
            }).onFailure(JedisConnectionException.class).invoke(th2 -> {
                LOGGER.warn("Attempting to reconnect to redis ");
                connect();
            }).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(2L)).indefinitely().invoke(l -> {
                LOGGER.trace("Record written to offset store in redis: " + array2);
            }).await().indefinitely();
        }
    }
}
