package io.activej.cube.etcd;

import io.activej.async.exception.AsyncCloseException;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.util.LogUtils;
import io.activej.common.ApplicationSettings;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.collection.CollectionUtils;
import io.activej.common.collection.CollectorUtils;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.common.tuple.Tuple2;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.cube.CubeState;
import io.activej.cube.CubeStructure;
import io.activej.cube.aggregation.AggregationChunk;
import io.activej.cube.aggregation.json.JsonCodecs;
import io.activej.cube.aggregation.ot.AggregationDiff;
import io.activej.cube.ot.CubeDiff;
import io.activej.datastream.supplier.BlockingPutQueue;
import io.activej.etcd.EtcdEventProcessor;
import io.activej.etcd.EtcdUtils;
import io.activej.etcd.TxnOps;
import io.activej.etcd.codec.key.EtcdKeyCodecs;
import io.activej.etcd.codec.kv.EtcdKVCodec;
import io.activej.etcd.codec.kv.EtcdKVCodecs;
import io.activej.etcd.codec.prefix.EtcdPrefixCodec;
import io.activej.etcd.exception.MalformedEtcdDataException;
import io.activej.etcd.state.AbstractEtcdStateManager;
import io.activej.etl.LogDiff;
import io.activej.etl.LogPositionDiff;
import io.activej.etl.LogState;
import io.activej.jmx.api.ConcurrentJmxBean;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.multilog.LogPosition;
import io.activej.ot.StateManager;
import io.activej.promise.Promise;
import io.activej.reactor.Reactive;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.options.DeleteOption;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/etcd/CubeEtcdStateManager.class */
public final class CubeEtcdStateManager extends AbstractEtcdStateManager<LogState<CubeDiff, CubeState>, LogDiff<CubeDiff>> implements StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>>, ConcurrentJmxBean {
    private static final Logger logger = LoggerFactory.getLogger(CubeEtcdStateManager.class);
    public static final Duration WATCH_RETRY_INTERVAL = ApplicationSettings.getDuration(CubeEtcdStateManager.class, "watchRetryInterval", Duration.ofSeconds(1));
    public static final int STATE_SUBSCRIBER_BUFFER_SIZE = ApplicationSettings.getInt(CubeEtcdStateManager.class, "stateSubscriberBufferSize", 128).intValue();
    private final CubeStructure cubeStructure;
    private final Set<StateTransitionListener> listeners;
    private EtcdPrefixCodec<String> aggregationIdCodec;
    private Function<String, EtcdKVCodec<Long, AggregationChunk>> chunkCodecsFactory;
    private ByteSequence prefixPos;
    private ByteSequence prefixChunk;
    private ByteSequence timestampKey;
    private ScheduledExecutorService scheduledExecutor;
    private CurrentTimeProvider now;
    private volatile boolean nextWatchScheduled;
    private volatile boolean stopping;
    private final ExceptionStats watchEtcdExceptionStats;
    private final ExceptionStats malformedDataExceptionStats;
    private Instant watchConnectionLastEstablishedAt;
    private Instant watchStateLastUpdatedAt;
    private Instant watchLastCompletedAt;

    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdStateManager$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, CubeEtcdStateManager> {
        private final ByteSequence root;
        private final EtcdUtils.CheckoutRequest<?, ?>[] checkoutRequests;
        private final EtcdUtils.WatchRequest<?, ?, ?>[] watchRequests;

        private Builder(ByteSequence byteSequence, EtcdUtils.CheckoutRequest<?, ?>[] checkoutRequestArr, EtcdUtils.WatchRequest<?, ?, ?>[] watchRequestArr) {
            this.root = byteSequence;
            this.checkoutRequests = checkoutRequestArr;
            this.watchRequests = watchRequestArr;
        }

        public Builder withChunkCodecsFactory(Function<String, EtcdKVCodec<Long, AggregationChunk>> function) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.chunkCodecsFactory = function;
            return this;
        }

        public Builder withPrefixPos(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.prefixPos = byteSequence;
            return this;
        }

        public Builder withPrefixChunk(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.prefixChunk = byteSequence;
            return this;
        }

        public Builder withTimestampKey(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.timestampKey = byteSequence;
            return this;
        }

        public Builder withAggregationIdCodec(EtcdPrefixCodec<String> etcdPrefixCodec) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.aggregationIdCodec = etcdPrefixCodec;
            return this;
        }

        public Builder withCurrentTimeProvider(CurrentTimeProvider currentTimeProvider) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.now = currentTimeProvider;
            return this;
        }

        public Builder withScheduledExecutor(ScheduledExecutorService scheduledExecutorService) {
            checkNotBuilt(this);
            CubeEtcdStateManager.this.scheduledExecutor = scheduledExecutorService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeEtcdStateManager m38doBuild() {
            if (CubeEtcdStateManager.this.chunkCodecsFactory == null) {
                Map map = (Map) CubeEtcdStateManager.this.cubeStructure.getAggregationStructures().entrySet().stream().collect(CollectorUtils.entriesToLinkedHashMap(aggregationStructure -> {
                    return new AggregationChunkJsonEtcdKVCodec(JsonCodecs.ofPrimaryKey(aggregationStructure));
                }));
                CubeEtcdStateManager cubeEtcdStateManager = CubeEtcdStateManager.this;
                Objects.requireNonNull(map);
                cubeEtcdStateManager.chunkCodecsFactory = (v1) -> {
                    return r1.get(v1);
                };
            }
            this.checkoutRequests[0] = EtcdUtils.CheckoutRequest.ofMapEntry(this.root.concat(CubeEtcdStateManager.this.prefixPos), EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), CubeEtcdOTUplink.logPositionEtcdCodec()), CollectorUtils.entriesToLinkedHashMap());
            this.checkoutRequests[1] = EtcdUtils.CheckoutRequest.of(this.root.concat(CubeEtcdStateManager.this.prefixChunk), EtcdKVCodecs.ofPrefixedEntry(CubeEtcdStateManager.this.aggregationIdCodec, CubeEtcdStateManager.this.chunkCodecsFactory), Collectors.groupingBy((v0) -> {
                return v0.value1();
            }, Collectors.mapping((v0) -> {
                return v0.value2();
            }, Collectors.toSet())));
            this.watchRequests[0] = EtcdUtils.WatchRequest.ofMapEntry(this.root.concat(CubeEtcdStateManager.this.prefixPos), EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), CubeEtcdOTUplink.logPositionEtcdCodec()), new EtcdEventProcessor<String, Map.Entry<String, LogPosition>, Map<String, LogPositionDiff>>() { // from class: io.activej.cube.etcd.CubeEtcdStateManager.Builder.1
                /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
                public Map<String, LogPositionDiff> m39createEventsAccumulator() {
                    return new LinkedHashMap();
                }

                public void onPut(Map<String, LogPositionDiff> map2, Map.Entry<String, LogPosition> entry) {
                    map2.put(entry.getKey(), new LogPositionDiff((LogPosition) null, entry.getValue()));
                }

                public void onDelete(Map<String, LogPositionDiff> map2, String str) {
                    throw new UnsupportedOperationException();
                }
            });
            this.watchRequests[1] = EtcdUtils.WatchRequest.of(this.root.concat(CubeEtcdStateManager.this.prefixChunk), EtcdKVCodecs.ofPrefixedEntry(CubeEtcdStateManager.this.aggregationIdCodec, CubeEtcdStateManager.this.chunkCodecsFactory), new EtcdEventProcessor<Tuple2<String, Long>, Tuple2<String, AggregationChunk>, Map<String, AggregationDiff>>() { // from class: io.activej.cube.etcd.CubeEtcdStateManager.Builder.2
                /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
                public Map<String, AggregationDiff> m40createEventsAccumulator() {
                    return new LinkedHashMap();
                }

                public void onPut(Map<String, AggregationDiff> map2, Tuple2<String, AggregationChunk> tuple2) {
                    map2.compute((String) tuple2.value1(), (str, aggregationDiff) -> {
                        return aggregationDiff == null ? AggregationDiff.of(Set.of((AggregationChunk) tuple2.value2()), Set.of()) : AggregationDiff.of(CollectionUtils.union(aggregationDiff.getAddedChunks(), Set.of((AggregationChunk) tuple2.value2())), aggregationDiff.getRemovedChunks());
                    });
                }

                public void onDelete(Map<String, AggregationDiff> map2, Tuple2<String, Long> tuple2) {
                    map2.compute((String) tuple2.value1(), (str, aggregationDiff) -> {
                        return aggregationDiff == null ? AggregationDiff.of(Set.of(), Set.of(AggregationChunk.ofId(((Long) tuple2.value2()).longValue()))) : AggregationDiff.of(aggregationDiff.getAddedChunks(), CollectionUtils.union(aggregationDiff.getRemovedChunks(), Set.of(AggregationChunk.ofId(((Long) tuple2.value2()).longValue()))));
                    });
                }
            });
            return CubeEtcdStateManager.this;
        }
    }

    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdStateManager$StateChangesListener.class */
    private final class StateChangesListener extends AbstractAsyncCloseable implements StateTransitionListener, StateManager.StateChangesSupplier<LogDiff<CubeDiff>> {
        private final Predicate<LogDiff<CubeDiff>> predicate;
        private final ChannelZeroBuffer<LogDiff<CubeDiff>> zeroBuffer = new ChannelZeroBuffer<>();
        private final Queue queue = new Queue();

        /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdStateManager$StateChangesListener$Queue.class */
        private class Queue extends BlockingPutQueue<LogDiff<CubeDiff>> {
            public Queue() {
                super(CubeEtcdStateManager.STATE_SUBSCRIBER_BUFFER_SIZE);
            }

            protected void onMoreData() {
                if (StateChangesListener.this.zeroBuffer.isSaturated()) {
                    return;
                }
                StateChangesListener.this.zeroBuffer.put((LogDiff) take());
            }
        }

        public StateChangesListener(Predicate<LogDiff<CubeDiff>> predicate) {
            this.predicate = predicate;
            CubeEtcdStateManager.this.subscribeToStateTransitions(this);
        }

        @Override // io.activej.cube.etcd.CubeEtcdStateManager.StateTransitionListener
        public void onStateChange(LogDiff<CubeDiff> logDiff) {
            if (this.predicate.test(logDiff)) {
                if (!this.queue.isSaturated()) {
                    try {
                        this.queue.put(logDiff);
                    } catch (InterruptedException e) {
                        throw new AssertionError("Should not happen");
                    }
                } else {
                    CubeEtcdStateManager.this.unsubscribeFromStateTransitions(this);
                    CubeEtcdStateManager.logger.error("Queue is saturated");
                    AsyncCloseException asyncCloseException = new AsyncCloseException("State changes rate exceed supplier rate");
                    this.reactor.execute(() -> {
                        closeEx(asyncCloseException);
                    });
                }
            }
        }

        public Promise<LogDiff<CubeDiff>> get() {
            Reactive.checkInReactorThread(this.reactor);
            return !this.queue.isEmpty() ? Promise.of((LogDiff) this.queue.take()) : this.zeroBuffer.take();
        }

        @Override // io.activej.cube.etcd.CubeEtcdStateManager.StateTransitionListener
        public void onStop() {
            this.reactor.execute(this::close);
        }

        public void onClosed(Exception exc) {
            CubeEtcdStateManager.this.unsubscribeFromStateTransitions(this);
            this.zeroBuffer.closeEx(exc);
            this.queue.close();
        }
    }

    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdStateManager$StateTransitionListener.class */
    public interface StateTransitionListener {
        void onStateChange(LogDiff<CubeDiff> logDiff);

        void onStop();
    }

    private CubeEtcdStateManager(Client client, ByteSequence byteSequence, EtcdUtils.CheckoutRequest<?, ?>[] checkoutRequestArr, EtcdUtils.WatchRequest<?, ?, ?>[] watchRequestArr, CubeStructure cubeStructure) {
        super(client, byteSequence, checkoutRequestArr, watchRequestArr);
        this.listeners = ConcurrentHashMap.newKeySet();
        this.aggregationIdCodec = EtcdUtils.AGGREGATION_ID_CODEC;
        this.prefixPos = EtcdUtils.POS;
        this.prefixChunk = EtcdUtils.CHUNK;
        this.timestampKey = EtcdUtils.TIMESTAMP;
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.now = CurrentTimeProvider.ofSystem();
        this.watchEtcdExceptionStats = ExceptionStats.create();
        this.malformedDataExceptionStats = ExceptionStats.create();
        this.watchConnectionLastEstablishedAt = null;
        this.watchStateLastUpdatedAt = null;
        this.watchLastCompletedAt = null;
        this.cubeStructure = cubeStructure;
    }

    public static Builder builder(Client client, ByteSequence byteSequence, CubeStructure cubeStructure) {
        EtcdUtils.CheckoutRequest[] checkoutRequestArr = new EtcdUtils.CheckoutRequest[2];
        EtcdUtils.WatchRequest[] watchRequestArr = new EtcdUtils.WatchRequest[2];
        CubeEtcdStateManager cubeEtcdStateManager = new CubeEtcdStateManager(client, byteSequence, checkoutRequestArr, watchRequestArr, cubeStructure);
        Objects.requireNonNull(cubeEtcdStateManager);
        return new Builder(byteSequence, checkoutRequestArr, watchRequestArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: finishState, reason: merged with bridge method [inline-methods] */
    public LogState<CubeDiff, CubeState> m37finishState(Response.Header header, Object[] objArr) throws MalformedEtcdDataException {
        LogState<CubeDiff, CubeState> create = LogState.create(CubeState.create(this.cubeStructure));
        create.init();
        Map map = (Map) objArr[0];
        Map map2 = (Map) objArr[1];
        for (Map.Entry entry : map2.entrySet()) {
            Iterator it = ((Set) entry.getValue()).iterator();
            while (it.hasNext()) {
                try {
                    this.cubeStructure.validateMeasures((String) entry.getKey(), ((AggregationChunk) it.next()).getMeasures());
                } catch (MalformedDataException e) {
                    throw new MalformedEtcdDataException(e.getMessage());
                }
            }
        }
        create.apply(LogDiff.of((Map) map.entrySet().stream().collect(CollectorUtils.entriesToLinkedHashMap(logPosition -> {
            return new LogPositionDiff((LogPosition) null, logPosition);
        })), CubeDiff.of((Map) map2.entrySet().stream().collect(CollectorUtils.entriesToLinkedHashMap(AggregationDiff::of)))));
        return create;
    }

    public Promise<Void> catchUp() {
        return push(List.of()).whenComplete(LogUtils.toLogger(logger, "catchUp", new Object[]{this}));
    }

    public Promise<Void> push(List<LogDiff<CubeDiff>> list) {
        return Promise.ofCompletionStage(push(LogDiff.reduce(list, CubeDiff::reduce))).toVoid().whenComplete(LogUtils.toLogger(logger, "push", new Object[]{list, this}));
    }

    public StateManager.StateChangesSupplier<LogDiff<CubeDiff>> subscribeToStateChanges(Predicate<LogDiff<CubeDiff>> predicate) {
        return new StateChangesListener(predicate);
    }

    public void subscribeToStateTransitions(StateTransitionListener stateTransitionListener) {
        if (this.stopping) {
            stateTransitionListener.onStop();
        } else {
            this.listeners.add(stateTransitionListener);
        }
    }

    public void unsubscribeFromStateTransitions(StateTransitionListener stateTransitionListener) {
        this.listeners.remove(stateTransitionListener);
    }

    public void stop() {
        this.stopping = true;
        super.stop();
        Iterator<StateTransitionListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onStop();
        }
        this.listeners.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void applyStateTransitions(LogState<CubeDiff, CubeState> logState, Object[] objArr) throws MalformedEtcdDataException {
        Map map = (Map) objArr[0];
        Map map2 = (Map) objArr[1];
        for (Map.Entry entry : map2.entrySet()) {
            Iterator<AggregationChunk> it = ((AggregationDiff) entry.getValue()).getAddedChunks().iterator();
            while (it.hasNext()) {
                try {
                    this.cubeStructure.validateMeasures((String) entry.getKey(), it.next().getMeasures());
                } catch (MalformedDataException e) {
                    throw new MalformedEtcdDataException(e.getMessage());
                }
            }
        }
        LogDiff<CubeDiff> of = LogDiff.of(map, CubeDiff.of(map2));
        logState.apply(of);
        this.watchStateLastUpdatedAt = this.now.currentInstant();
        notifyListeners(of);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doPush(TxnOps txnOps, LogDiff<CubeDiff> logDiff) {
        if (isCatchUp(logDiff)) {
            return;
        }
        io.activej.etcd.EtcdUtils.touchTimestamp(txnOps, this.timestampKey, this.now);
        EtcdUtils.saveCubeLogDiff(this.prefixPos, this.prefixChunk, this.aggregationIdCodec, this.chunkCodecsFactory, txnOps, logDiff);
    }

    protected void onWatchConnectionEstablished() {
        logger.trace("Watch connection to etcd server established");
        this.watchConnectionLastEstablishedAt = this.now.currentInstant();
    }

    protected void onWatchError(Throwable th) {
        logger.warn("Error while watching keys", th);
        this.watchEtcdExceptionStats.recordException(th, this);
        if (th instanceof MalformedEtcdDataException) {
            this.malformedDataExceptionStats.recordException(th, this);
        }
    }

    protected void onWatchCompleted() {
        logger.warn("Watch has been completed");
        this.watchLastCompletedAt = this.now.currentInstant();
        if (this.stopping) {
            return;
        }
        scheduleNextWatch();
    }

    private void scheduleNextWatch() {
        if (this.nextWatchScheduled) {
            return;
        }
        this.nextWatchScheduled = true;
        this.scheduledExecutor.schedule(() -> {
            this.nextWatchScheduled = false;
            watch();
        }, WATCH_RETRY_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void notifyListeners(LogDiff<CubeDiff> logDiff) {
        Iterator<StateTransitionListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onStateChange(logDiff);
        }
    }

    private static boolean isCatchUp(LogDiff<CubeDiff> logDiff) {
        return logDiff.getPositions().values().stream().allMatch((v0) -> {
            return v0.isEmpty();
        }) && logDiff.getDiffs().stream().allMatch((v0) -> {
            return v0.isEmpty();
        });
    }

    @VisibleForTesting
    public void delete() throws ExecutionException, InterruptedException {
        KV kVClient = this.client.getKVClient();
        kVClient.delete(this.root, DeleteOption.builder().isPrefix(true).build()).get();
        kVClient.put(this.root.concat(this.timestampKey), io.activej.etcd.EtcdUtils.TOUCH_TIMESTAMP_CODEC.encodeValue(Long.valueOf(this.now.currentTimeMillis()))).get();
    }

    public String toString() {
        return "CubeEtcdStateManager{root=" + this.root + ", revision=" + getRevision() + "}";
    }

    @JmxAttribute
    public ExceptionStats getWatchEtcdExceptionStats() {
        return this.watchEtcdExceptionStats;
    }

    @JmxAttribute
    public ExceptionStats getMalformedDataExceptionStats() {
        return this.malformedDataExceptionStats;
    }

    @JmxAttribute
    @Nullable
    public Instant getWatchLastCompletedAt() {
        return this.watchLastCompletedAt;
    }

    @JmxAttribute
    @Nullable
    public Instant getWatchConnectionLastEstablishedAt() {
        return this.watchConnectionLastEstablishedAt;
    }

    @JmxAttribute
    @Nullable
    public Instant getWatchStateLastUpdatedAt() {
        return this.watchStateLastUpdatedAt;
    }

    @JmxAttribute
    public long getRevision() {
        return super.getRevision();
    }

    @JmxAttribute
    public String getEtcdRoot() {
        return this.root.toString();
    }
}
