package io.activej.cube.etcd;

import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.collection.CollectionUtils;
import io.activej.cube.aggregation.ChunksAlreadyLockedException;
import io.activej.cube.aggregation.IChunkLocker;
import io.activej.etcd.codec.key.EtcdKeyCodec;
import io.activej.etcd.codec.key.EtcdKeyCodecs;
import io.activej.etcd.exception.MalformedEtcdDataException;
import io.activej.etcd.exception.TransactionNotSucceededException;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/activej/cube/etcd/EtcdChunkLocker.class */
public final class EtcdChunkLocker extends AbstractReactive implements IChunkLocker, ReactiveService {
    private static final EtcdKeyCodec<Long> CHUNK_ID_CODEC = EtcdKeyCodecs.ofLong();
    public static final Duration DEFAULT_TTL = ApplicationSettings.getDuration(EtcdChunkLocker.class, "ttl", Duration.ofMinutes(10));
    private final Client client;
    private final ByteSequence root;
    private Duration ttl;
    private final Map<Set<Long>, Long> leaseIds;

    /* loaded from: input_file:io/activej/cube/etcd/EtcdChunkLocker$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, EtcdChunkLocker> {
        private Builder() {
        }

        public Builder withTTL(Duration duration) {
            checkNotBuilt(this);
            EtcdChunkLocker.this.ttl = duration;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public EtcdChunkLocker m42doBuild() {
            return EtcdChunkLocker.this;
        }
    }

    private EtcdChunkLocker(Reactor reactor, Client client, ByteSequence byteSequence) {
        super(reactor);
        this.ttl = DEFAULT_TTL;
        this.leaseIds = new HashMap();
        this.client = client;
        this.root = byteSequence;
    }

    public static EtcdChunkLocker create(Reactor reactor, Client client, ByteSequence byteSequence) {
        return (EtcdChunkLocker) builder(reactor, client, byteSequence).build();
    }

    public static Builder builder(Reactor reactor, Client client, ByteSequence byteSequence) {
        return new Builder();
    }

    @Override // io.activej.cube.aggregation.IChunkLocker
    public Promise<Void> lockChunks(Set<Long> set) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(!set.isEmpty(), "Nothing to lock");
        return Promise.ofCompletionStage(this.client.getLeaseClient().grant(this.ttl.toSeconds()).exceptionallyCompose(io.activej.etcd.EtcdUtils::convertStatusExceptionStage)).map((v0) -> {
            return v0.getID();
        }).then(l -> {
            return Promise.ofCompletionStage(io.activej.etcd.EtcdUtils.executeTxnOps(this.client.getKVClient(), this.root, txnOps -> {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    ByteSequence encodeKey = CHUNK_ID_CODEC.encodeKey(Long.valueOf(((Long) it.next()).longValue()));
                    txnOps.cmp(encodeKey, Cmp.Op.EQUAL, CmpTarget.createRevision(0L));
                    txnOps.put(encodeKey, ByteSequence.EMPTY, PutOption.builder().withLeaseId(l.longValue()).build());
                }
            })).mapException(TransactionNotSucceededException.class, (v1) -> {
                return new ChunksAlreadyLockedException(v1);
            }).whenResult(txnResponse -> {
                this.leaseIds.put(set, l);
            });
        }).toVoid();
    }

    @Override // io.activej.cube.aggregation.IChunkLocker
    public Promise<Void> releaseChunks(Set<Long> set) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(!set.isEmpty(), "Nothing to release");
        Long l = this.leaseIds.get(set);
        Lease leaseClient = this.client.getLeaseClient();
        if (l != null) {
            return Promise.ofCompletionStage(leaseClient.revoke(l.longValue()).exceptionallyCompose(io.activej.etcd.EtcdUtils::convertStatusExceptionStage)).toVoid();
        }
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<Set<Long>, Long> entry : this.leaseIds.entrySet()) {
            Set<Long> key = entry.getKey();
            Set intersection = CollectionUtils.intersection(key, set);
            hashSet.addAll(intersection);
            if (intersection.size() == key.size()) {
                hashSet2.add(entry.getValue());
            }
        }
        return Promise.ofCompletionStage(io.activej.etcd.EtcdUtils.executeTxnOps(this.client.getKVClient(), this.root, txnOps -> {
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                txnOps.delete(CHUNK_ID_CODEC.encodeKey(Long.valueOf(((Long) it.next()).longValue())), DeleteOption.DEFAULT);
            }
        })).whenResult(() -> {
            Iterator it = hashSet2.iterator();
            while (it.hasNext()) {
                leaseClient.revoke(((Long) it.next()).longValue());
            }
        }).toVoid();
    }

    @Override // io.activej.cube.aggregation.IChunkLocker
    public Promise<Set<Long>> getLockedChunks() {
        return Promise.ofCompletionStage(this.client.getKVClient().get(this.root, GetOption.builder().isPrefix(true).build()).exceptionallyCompose(io.activej.etcd.EtcdUtils::convertStatusExceptionStage)).map(getResponse -> {
            HashSet hashSet = new HashSet();
            Iterator it = getResponse.getKvs().iterator();
            while (it.hasNext()) {
                ByteSequence substring = ((KeyValue) it.next()).getKey().substring(this.root.size());
                try {
                    hashSet.add(Long.valueOf(((Long) CHUNK_ID_CODEC.decodeKey(substring)).longValue()));
                } catch (MalformedEtcdDataException e) {
                    throw new MalformedEtcdDataException("Failed to decode key '" + substring + "'", e);
                }
            }
            return hashSet;
        });
    }

    public Promise<?> start() {
        return Promise.complete();
    }

    public Promise<?> stop() {
        Map copyOf = Map.copyOf(this.leaseIds);
        this.leaseIds.clear();
        Lease leaseClient = this.client.getLeaseClient();
        return Promises.all(copyOf.values().stream().map(l -> {
            return Promise.ofCompletionStage(leaseClient.revoke(l.longValue()).exceptionallyCompose(io.activej.etcd.EtcdUtils::convertStatusExceptionStage));
        }));
    }

    @VisibleForTesting
    public void delete() throws ExecutionException, InterruptedException {
        this.client.getKVClient().delete(this.root, DeleteOption.builder().isPrefix(true).build()).get();
    }
}
