package io.activej.cube.etcd;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.ref.RefLong;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.cube.aggregation.AggregationChunkStorage;
import io.activej.cube.exception.CubeException;
import io.activej.etcd.EtcdEventProcessor;
import io.activej.etcd.EtcdListener;
import io.activej.etcd.EtcdUtils;
import io.activej.etcd.codec.key.EtcdKeyCodec;
import io.activej.etcd.codec.kv.EtcdKVCodecs;
import io.activej.etcd.codec.kv.EtcdKVDecoder;
import io.activej.etcd.codec.prefix.EtcdPrefixCodec;
import io.activej.etcd.exception.MalformedEtcdDataException;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.reactor.schedule.ScheduledRunnable;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.common.exception.CompactedException;
import io.etcd.jetcd.options.GetOption;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/etcd/CubeCleanerService.class */
public final class CubeCleanerService extends AbstractReactive implements ReactiveService, ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(CubeCleanerService.class);
    private static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    public static final Duration DEFAULT_CLEANUP_OLDER_THAN = ApplicationSettings.getDuration(CubeCleanerService.class, "cleanupOlderThan", Duration.ofHours(1));
    public static final Duration DEFAULT_CLEANUP_RETRY = ApplicationSettings.getDuration(CubeCleanerService.class, "cleanupRetry", Duration.ofMinutes(1));
    private static final Duration WATCH_RETRY_INTERVAL = ApplicationSettings.getDuration(CubeCleanerService.class, "watchRetryInterval", Duration.ofSeconds(1));
    private final Client client;
    private final AggregationChunkStorage storage;
    private final ByteSequence root;
    private final Queue<DeletedChunksEntry> deletedChunksQueue;
    private final AsyncRunnable cleanup;
    private final Set<Long> stalledChunkIds;
    private EtcdPrefixCodec<String> aggregationIdCodec;
    private EtcdKeyCodec<Long> chunkIdCodec;
    private ByteSequence prefixChunk;
    private ByteSequence timestampKey;
    private Watch.Watcher watcher;
    private long watchRevision;
    private long watchTimestamp;
    private long lastCleanupRevision;
    private long cleanupOlderThanMillis;
    private long cleanupRetryMillis;
    private volatile boolean stopped;

    @Nullable
    private ScheduledRunnable cleanupSchedule;
    private boolean retryFromCompactedRevision;
    private final PromiseStats promiseCleanup;
    private final PromiseStats promiseDeleteChunks;
    private final ExceptionStats watchEtcdExceptionStats;
    private final ExceptionStats malformedDataExceptionStats;
    private Instant watchConnectionLastEstablishedAt;
    private Instant watchLastCompletedAt;
    private CurrentTimeProvider now;

    /* loaded from: input_file:io/activej/cube/etcd/CubeCleanerService$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, CubeCleanerService> {
        private Builder() {
        }

        public Builder withCurrentTimeProvider(CurrentTimeProvider currentTimeProvider) {
            checkNotBuilt(this);
            CubeCleanerService.this.now = currentTimeProvider;
            return this;
        }

        public Builder withPrefixChunk(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeCleanerService.this.prefixChunk = byteSequence;
            return this;
        }

        public Builder withTimestampKey(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeCleanerService.this.timestampKey = byteSequence;
            return this;
        }

        public Builder withCleanupOlderThen(Duration duration) {
            checkNotBuilt(this);
            CubeCleanerService.this.cleanupOlderThanMillis = duration.toMillis();
            return this;
        }

        public Builder withCleanupRetry(Duration duration) {
            checkNotBuilt(this);
            CubeCleanerService.this.cleanupRetryMillis = duration.toMillis();
            return this;
        }

        public Builder withAggregationIdCodec(EtcdPrefixCodec<String> etcdPrefixCodec) {
            checkNotBuilt(this);
            CubeCleanerService.this.aggregationIdCodec = etcdPrefixCodec;
            return this;
        }

        public Builder withChunkIdCodec(EtcdKeyCodec<Long> etcdKeyCodec) {
            checkNotBuilt(this);
            CubeCleanerService.this.chunkIdCodec = etcdKeyCodec;
            return this;
        }

        public Builder withRetryFromCompactedRevision(boolean z) {
            checkNotBuilt(this);
            CubeCleanerService.this.retryFromCompactedRevision = z;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeCleanerService m31doBuild() {
            return CubeCleanerService.this;
        }
    }

    /* loaded from: input_file:io/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry.class */
    public static final class DeletedChunksEntry extends Record {
        private final long deleteRevision;
        private final long deleteTimestamp;
        private final Set<Long> chunkIds;

        public DeletedChunksEntry(long j, long j2, Set<Long> set) {
            this.deleteRevision = j;
            this.deleteTimestamp = j2;
            this.chunkIds = set;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DeletedChunksEntry.class), DeletedChunksEntry.class, "deleteRevision;deleteTimestamp;chunkIds", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteRevision:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteTimestamp:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->chunkIds:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DeletedChunksEntry.class), DeletedChunksEntry.class, "deleteRevision;deleteTimestamp;chunkIds", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteRevision:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteTimestamp:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->chunkIds:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DeletedChunksEntry.class, Object.class), DeletedChunksEntry.class, "deleteRevision;deleteTimestamp;chunkIds", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteRevision:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->deleteTimestamp:J", "FIELD:Lio/activej/cube/etcd/CubeCleanerService$DeletedChunksEntry;->chunkIds:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public long deleteRevision() {
            return this.deleteRevision;
        }

        public long deleteTimestamp() {
            return this.deleteTimestamp;
        }

        public Set<Long> chunkIds() {
            return this.chunkIds;
        }
    }

    private CubeCleanerService(Client client, AggregationChunkStorage aggregationChunkStorage, ByteSequence byteSequence) {
        super(aggregationChunkStorage.getReactor());
        this.deletedChunksQueue = new ConcurrentLinkedQueue();
        this.cleanup = AsyncRunnables.coalesce(this::doCleanup);
        this.stalledChunkIds = Collections.newSetFromMap(new ConcurrentHashMap());
        this.aggregationIdCodec = EtcdUtils.AGGREGATION_ID_CODEC;
        this.chunkIdCodec = EtcdUtils.CHUNK_ID_CODEC;
        this.prefixChunk = EtcdUtils.CHUNK;
        this.timestampKey = EtcdUtils.TIMESTAMP;
        this.cleanupOlderThanMillis = DEFAULT_CLEANUP_OLDER_THAN.toMillis();
        this.cleanupRetryMillis = DEFAULT_CLEANUP_RETRY.toMillis();
        this.promiseCleanup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseDeleteChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.watchEtcdExceptionStats = ExceptionStats.create();
        this.malformedDataExceptionStats = ExceptionStats.create();
        this.watchConnectionLastEstablishedAt = null;
        this.watchLastCompletedAt = null;
        this.now = this.reactor;
        this.client = client;
        this.storage = aggregationChunkStorage;
        this.root = byteSequence;
    }

    public static CubeCleanerService create(Client client, AggregationChunkStorage aggregationChunkStorage, ByteSequence byteSequence) {
        return (CubeCleanerService) builder(client, aggregationChunkStorage, byteSequence).build();
    }

    public static Builder builder(Client client, AggregationChunkStorage aggregationChunkStorage, ByteSequence byteSequence) {
        return new Builder();
    }

    public Promise<Void> start() {
        ByteSequence concat = this.root.concat(this.prefixChunk);
        return Promise.ofCompletionStage(this.client.getKVClient().get(concat, GetOption.builder().isPrefix(true).build()).exceptionallyCompose(io.activej.etcd.EtcdUtils::convertStatusExceptionStage)).then((getResponse, exc) -> {
            if (exc != null) {
                return Promise.ofException(exc);
            }
            this.watchRevision = getResponse.getHeader().getRevision();
            return this.storage.listChunks().whenResult(set -> {
                List kvs = getResponse.getKvs();
                this.stalledChunkIds.addAll(set);
                Iterator it = kvs.iterator();
                while (it.hasNext()) {
                    this.stalledChunkIds.remove(this.chunkIdCodec.decodeKey(this.aggregationIdCodec.decodePrefix(((KeyValue) it.next()).getKey().substring(concat.size())).suffix()));
                }
                if (set.isEmpty()) {
                    logger.info("No stalled chunks found");
                } else {
                    this.reactor.delayBackground(this.cleanupOlderThanMillis, this::deleteStalledChunks);
                }
            });
        }).whenResult(() -> {
            this.watcher = createWatcher();
        }).then(this::cleanup).toVoid();
    }

    public Promise<Void> stop() {
        this.stopped = true;
        if (this.cleanupSchedule != null) {
            this.cleanupSchedule.cancel();
        }
        if (this.watcher != null) {
            this.watcher.close();
        }
        return Promise.complete();
    }

    @VisibleForTesting
    Promise<Void> cleanup() {
        Reactive.checkInReactorThread(this.reactor);
        return this.cleanup.run();
    }

    private Promise<Void> doCleanup() {
        Reactive.checkInReactorThread(this.reactor);
        if (this.cleanupSchedule != null) {
            this.cleanupSchedule.cancel();
            this.cleanupSchedule = null;
        }
        if (this.stopped) {
            return Promise.complete();
        }
        DeletedChunksEntry peek = this.deletedChunksQueue.peek();
        if (peek == null) {
            logger.trace("No chunks to be cleaned up");
            return Promise.complete();
        }
        if (peek.deleteTimestamp() + this.cleanupOlderThanMillis <= this.now.currentTimeMillis()) {
            return doCleanup(peek).whenResult(() -> {
                this.deletedChunksQueue.remove();
            }).whenException(exc -> {
                logger.warn("Failed to cleanup chunks", exc);
                if (this.stopped) {
                    return;
                }
                long currentTimeMillis = this.now.currentTimeMillis() + this.cleanupRetryMillis;
                if (logger.isTraceEnabled()) {
                    logger.trace("Scheduling next cleanup at {}", Instant.ofEpochMilli(currentTimeMillis));
                }
                this.cleanupSchedule = this.reactor.scheduleBackground(currentTimeMillis, this::cleanup);
            }).then(() -> {
                return doCleanup();
            });
        }
        long j = peek.deleteTimestamp + this.cleanupOlderThanMillis + 1;
        if (logger.isTraceEnabled()) {
            logger.trace("There are chunks to be cleaned up later, at {}", Instant.ofEpochMilli(j));
        }
        this.cleanupSchedule = this.reactor.scheduleBackground(j, this::cleanup);
        return Promise.complete();
    }

    private Promise<Void> doCleanup(DeletedChunksEntry deletedChunksEntry) {
        logger.trace("Chunks to be cleaned up: {}", deletedChunksEntry.chunkIds());
        return deleteChunksFromStorage(deletedChunksEntry.chunkIds()).whenResult(() -> {
            this.lastCleanupRevision = deletedChunksEntry.deleteRevision();
        }).whenResult(() -> {
            logger.trace("Chunks successfully cleaned up");
        }).whenComplete(this.promiseCleanup.recordStats());
    }

    private Promise<Void> deleteChunksFromStorage(Set<Long> set) {
        return this.storage.deleteChunks(set).mapException(exc -> {
            return new CubeException("Failed to delete chunks from storage", exc);
        }).whenComplete(this.promiseDeleteChunks.recordStats());
    }

    private Watch.Watcher createWatcher() {
        final long j = this.watchRevision + 1;
        return io.activej.etcd.EtcdUtils.watch(this.client.getWatchClient(), j, new EtcdUtils.WatchRequest[]{new EtcdUtils.WatchRequest(this.root.concat(this.timestampKey), EtcdKVCodecs.ofEmptyKey(io.activej.etcd.EtcdUtils.TOUCH_TIMESTAMP_CODEC), new EtcdEventProcessor<Void, Long, RefLong>() { // from class: io.activej.cube.etcd.CubeCleanerService.1
            /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
            public RefLong m27createEventsAccumulator() {
                return new RefLong(-1L);
            }

            public void onPut(RefLong refLong, Long l) {
                refLong.set(l.longValue());
            }

            public void onDelete(RefLong refLong, Void r5) {
                throw new UnsupportedOperationException();
            }
        }), EtcdUtils.WatchRequest.of(this.root.concat(this.prefixChunk), new EtcdKVDecoder<Long, Long>() { // from class: io.activej.cube.etcd.CubeCleanerService.2
            /* renamed from: decodeKV, reason: merged with bridge method [inline-methods] */
            public Long m28decodeKV(io.activej.etcd.codec.kv.KeyValue keyValue) throws MalformedEtcdDataException {
                return m29decodeKey(keyValue.key());
            }

            /* renamed from: decodeKey, reason: merged with bridge method [inline-methods] */
            public Long m29decodeKey(ByteSequence byteSequence) throws MalformedEtcdDataException {
                try {
                    return (Long) CubeCleanerService.this.chunkIdCodec.decodeKey(CubeCleanerService.this.aggregationIdCodec.decodePrefix(byteSequence).suffix());
                } catch (MalformedEtcdDataException e) {
                    throw new MalformedEtcdDataException("Failed to decode chunk ID of key '" + byteSequence + "'", e);
                }
            }
        }, new EtcdEventProcessor<Long, Long, Set<Long>>() { // from class: io.activej.cube.etcd.CubeCleanerService.3
            /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
            public Set<Long> m30createEventsAccumulator() {
                return new HashSet();
            }

            public void onPut(Set<Long> set, Long l) {
                CubeCleanerService.this.stalledChunkIds.remove(l);
            }

            public void onDelete(Set<Long> set, Long l) {
                set.add(l);
            }
        })}, new EtcdListener<Object[]>() { // from class: io.activej.cube.etcd.CubeCleanerService.4
            public void onConnectionEstablished() {
                CubeCleanerService.logger.trace("Watch connection to etcd server established");
                CubeCleanerService.this.watchConnectionLastEstablishedAt = CubeCleanerService.this.now.currentInstant();
            }

            public void onNext(long j2, Object[] objArr) {
                CubeCleanerService.this.watchRevision = j2;
                RefLong refLong = (RefLong) objArr[0];
                Set set = (Set) objArr[1];
                long j3 = refLong.get();
                if (j3 == -1) {
                    if (set.isEmpty()) {
                        return;
                    }
                    CubeCleanerService.logger.warn("No transaction timestamp found, skip deleting chunks {}", set);
                } else {
                    CubeCleanerService.this.watchTimestamp = j3;
                    if (set.isEmpty()) {
                        return;
                    }
                    CubeCleanerService.this.deletedChunksQueue.add(new DeletedChunksEntry(j2, j3, set));
                    CubeCleanerService.this.reactor.execute(() -> {
                        if (CubeCleanerService.this.cleanupSchedule == null) {
                            CubeCleanerService.this.cleanup();
                        }
                    });
                }
            }

            public void onError(Throwable th) {
                if (th instanceof MalformedEtcdDataException) {
                    CubeCleanerService.this.malformedDataExceptionStats.recordException(th, this);
                } else if (th instanceof CompactedException) {
                    CompactedException compactedException = (CompactedException) th;
                    long compactedRevision = compactedException.getCompactedRevision();
                    CubeCleanerService.logger.warn("Watch revision {} was compacted, compacted revision is {}", new Object[]{Long.valueOf(j), Long.valueOf(compactedRevision), compactedException});
                    if (CubeCleanerService.this.retryFromCompactedRevision) {
                        CubeCleanerService.logger.trace("Retrying from the compacted revision {}", Long.valueOf(compactedRevision));
                        CubeCleanerService.this.watchRevision = compactedRevision - 1;
                    }
                } else {
                    CubeCleanerService.logger.warn("Error occurred while watching chunks to be cleaned up", th);
                }
                CubeCleanerService.this.watchEtcdExceptionStats.recordException(th, this);
                CubeCleanerService.this.watcher.close();
            }

            public void onCompleted() {
                CubeCleanerService.logger.warn("Watch has been completed");
                CubeCleanerService.this.watchLastCompletedAt = CubeCleanerService.this.now.currentInstant();
                CubeCleanerService.this.reactor.execute(() -> {
                    CubeCleanerService.this.reactor.delayBackground(CubeCleanerService.WATCH_RETRY_INTERVAL, () -> {
                        if (CubeCleanerService.this.stopped) {
                            return;
                        }
                        CubeCleanerService.logger.trace("Recreating watcher");
                        CubeCleanerService.this.watcher = CubeCleanerService.this.createWatcher();
                    });
                });
            }
        });
    }

    private void deleteStalledChunks() {
        if (this.stalledChunkIds.isEmpty()) {
            logger.info("No stalled chunks to delete");
            return;
        }
        if (this.reactor.currentTimeMillis() - this.watchTimestamp > this.cleanupOlderThanMillis / 2) {
            logger.info("Last watch timestamp was too long ago, stalled chunks will not be deleted");
            this.stalledChunkIds.clear();
            return;
        }
        logger.info("Deleting stalled chunks {}", this.stalledChunkIds);
        Promise whenException = this.storage.deleteChunks(this.stalledChunkIds).whenResult(() -> {
            logger.info("Deleted stalled chunks");
        }).whenException(exc -> {
            logger.warn("Failed to delete stalled chunks");
        });
        Set<Long> set = this.stalledChunkIds;
        Objects.requireNonNull(set);
        whenException.whenComplete(set::clear);
    }

    @JmxAttribute
    public String getCubeEtcdPrefix() {
        return this.prefixChunk.toString();
    }

    @JmxAttribute
    public Duration getCleanupOlderThan() {
        return Duration.ofMillis(this.cleanupOlderThanMillis);
    }

    @JmxAttribute
    public void setCleanupOlderThan(Duration duration) {
        this.cleanupOlderThanMillis = duration.toMillis();
        cleanup();
    }

    @JmxAttribute
    public Duration getCleanupRetryInterval() {
        return Duration.ofMillis(this.cleanupRetryMillis);
    }

    @JmxAttribute
    public void setCleanupRetryInterval(Duration duration) {
        this.cleanupRetryMillis = duration.toMillis();
    }

    @JmxAttribute
    public long getLastCleanupRevision() {
        return this.lastCleanupRevision;
    }

    @JmxAttribute
    public long getWatchRevision() {
        return this.watchRevision;
    }

    @JmxAttribute
    public long getWatchTimestamp() {
        return this.watchTimestamp;
    }

    @JmxAttribute
    public boolean isStopped() {
        return this.stopped;
    }

    @JmxAttribute
    public int getCurrentDeletedChunksQueueSize() {
        return this.deletedChunksQueue.size();
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseDeleteChunks() {
        return this.promiseDeleteChunks;
    }

    @JmxAttribute
    public ExceptionStats getWatchEtcdExceptionStats() {
        return this.watchEtcdExceptionStats;
    }

    @JmxAttribute
    public ExceptionStats getMalformedDataExceptionStats() {
        return this.malformedDataExceptionStats;
    }

    @JmxAttribute
    @Nullable
    public Instant getWatchLastCompletedAt() {
        return this.watchLastCompletedAt;
    }

    @JmxAttribute
    @Nullable
    public Instant getWatchConnectionLastEstablishedAt() {
        return this.watchConnectionLastEstablishedAt;
    }

    @JmxAttribute
    public String getEtcdRoot() {
        return this.root.toString();
    }
}
