package com.pushtechnology.diffusion.client.features.impl.update.stream;

import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.RecoverableUpdateStream;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.features.UpdateStreamRetryLimitException;
import com.pushtechnology.diffusion.client.session.retry.RetryStrategy;
import com.pushtechnology.diffusion.exceptions.DiffusionInterruptedException;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/pushtechnology/diffusion/client/features/impl/update/stream/RecoverableUpdateStreamImpl.class */
public final class RecoverableUpdateStreamImpl<T> implements RecoverableUpdateStream<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableUpdateStreamImpl.class);
    private final Supplier<UpdateStream<T>> streamSupplier;
    private final RetryStrategy strategy;
    private final AtomicReference<UpdateStream<T>> delegate;
    private final AtomicLong counter = new AtomicLong(Long.MIN_VALUE);
    private final AtomicReference<Throwable> priorException = new AtomicReference<>(null);

    @GuardedBy("queue")
    private final ConcurrentLinkedQueue<PendingTopicUpdate<T>> queue = new ConcurrentLinkedQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:com/pushtechnology/diffusion/client/features/impl/update/stream/RecoverableUpdateStreamImpl$PendingTopicUpdate.class */
    public static final class PendingTopicUpdate<T> {
        private final long counter;
        private final T updateValue;

        @GuardedBy("this")
        private CompletableFuture<TopicCreationResult> future;

        private PendingTopicUpdate(long j, T t) {
            this.counter = j;
            this.updateValue = t;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public T getUpdateValue() {
            return this.updateValue;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized CompletableFuture<TopicCreationResult> getFuture() {
            return this.future;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void initFuture(CompletableFuture<TopicCreationResult> completableFuture) {
            if (this.future != null) {
                throw new IllegalStateException("future is already set");
            }
            this.future = completableFuture;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PendingTopicUpdate pendingTopicUpdate = (PendingTopicUpdate) obj;
            return Objects.equals(Long.valueOf(this.counter), Long.valueOf(pendingTopicUpdate.counter)) && Objects.equals(this.updateValue, pendingTopicUpdate.updateValue);
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.counter), this.updateValue);
        }
    }

    public RecoverableUpdateStreamImpl(Supplier<UpdateStream<T>> supplier, RetryStrategy retryStrategy) {
        this.streamSupplier = (Supplier) Objects.requireNonNull(supplier, "supplier is null");
        this.delegate = new AtomicReference<>(Objects.requireNonNull(supplier.get(), "supplier returns null"));
        this.strategy = (RetryStrategy) Objects.requireNonNull(retryStrategy, "strategy is null");
    }

    @Override // com.pushtechnology.diffusion.client.features.UpdateStream
    public CompletableFuture<TopicCreationResult> set(T t) {
        PendingTopicUpdate<T> pendingTopicUpdate = new PendingTopicUpdate<>(this.counter.getAndIncrement(), t);
        this.queue.add(pendingTopicUpdate);
        CompletableFuture<TopicCreationResult> whenComplete = this.delegate.get().set(t).whenComplete((topicCreationResult, th) -> {
            processUpdateResponse(pendingTopicUpdate, topicCreationResult, th);
        });
        pendingTopicUpdate.initFuture(whenComplete);
        return whenComplete;
    }

    @Override // com.pushtechnology.diffusion.client.features.UpdateStream
    public T get() {
        return this.delegate.get().get();
    }

    @Override // com.pushtechnology.diffusion.client.features.UpdateStream
    public CompletableFuture<TopicCreationResult> validate() {
        return this.delegate.get().validate().whenComplete((topicCreationResult, th) -> {
            if (th != null) {
                processException(th);
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.features.RecoverableUpdateStream
    public boolean isRecoverable() {
        return isRecoverable(this.priorException.get());
    }

    boolean isRecoverable(Throwable th) {
        if (null == th) {
            return false;
        }
        return (th instanceof ClusterRoutingException) || isRecoverable(th.getCause());
    }

    @Override // com.pushtechnology.diffusion.client.features.RecoverableUpdateStream
    public void recover() throws UpdateStreamRetryLimitException {
        if (!isRecoverable(this.priorException.get())) {
            throw new IllegalStateException("Can only recover following a recoverable exception", this.priorException.get());
        }
        this.priorException.set(null);
        synchronisePendingWork();
        int attempts = this.strategy.getAttempts();
        while (!this.queue.isEmpty()) {
            try {
                uncheckedSleep(this.strategy.getInterval());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Attempting recovery of {} topic updates", Integer.valueOf(queueSize()));
                }
                this.delegate.set(this.streamSupplier.get());
                this.delegate.get().validate().get();
                CompletableFuture.allOf((CompletableFuture[]) this.queue.stream().map(pendingTopicUpdate -> {
                    CompletableFuture<TopicCreationResult> whenComplete;
                    synchronized (this) {
                        whenComplete = this.delegate.get().set(pendingTopicUpdate.getUpdateValue()).whenComplete((topicCreationResult, th) -> {
                            processUpdateResponse(pendingTopicUpdate, topicCreationResult, th);
                        });
                    }
                    return whenComplete;
                }).toArray(i -> {
                    return new CompletableFuture[i];
                })).get(this.strategy.getInterval(), TimeUnit.MILLISECONDS);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Completed recovery during attempt {}", Integer.valueOf(this.strategy.getAttempts() - attempts));
                    return;
                }
                return;
            } catch (InterruptedException | CancellationException | ExecutionException | TimeoutException e) {
                if (attempts != Integer.MAX_VALUE) {
                    attempts--;
                }
                if (!isRecoverable(e) || attempts <= 0) {
                    throw ((UpdateStreamRetryLimitException) new UpdateStreamRetryLimitException("Cannot recover within strategy: " + this.strategy).initCause(e));
                }
            }
        }
    }

    private void synchronisePendingWork() throws UpdateStreamRetryLimitException {
        try {
            CompletableFuture.allOf((CompletableFuture[]) this.queue.stream().map(obj -> {
                return ((PendingTopicUpdate) obj).getFuture();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).get(this.strategy.getInterval(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | CancellationException | ExecutionException e) {
            LOG.debug("Synchronizing CompletableFutures", e);
        } catch (TimeoutException e2) {
            throw ((UpdateStreamRetryLimitException) new UpdateStreamRetryLimitException("Cannot recover within strategy: " + this.strategy).initCause(e2));
        }
    }

    private void processUpdateResponse(PendingTopicUpdate<T> pendingTopicUpdate, TopicCreationResult topicCreationResult, Throwable th) {
        if (th == null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received set response: {}, result: {}", pendingTopicUpdate.getUpdateValue(), topicCreationResult);
            }
            if (!this.queue.remove(pendingTopicUpdate)) {
                throw new AssertionError("Cannot find topic update in queue " + pendingTopicUpdate);
            }
            return;
        }
        processException(th);
        if (isRecoverable(th) && LOG.isTraceEnabled()) {
            LOG.trace("Recoverable error detected. {} updates can be recovered.", Integer.valueOf(this.queue.size()), th);
        }
    }

    private void processException(Throwable th) {
        this.priorException.compareAndSet(null, th);
    }

    private static void uncheckedSleep(long j) {
        try {
            LOG.trace("Pausing for {}ms", Long.valueOf(j));
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw new DiffusionInterruptedException(e);
        }
    }

    int queueSize() {
        return this.queue.size();
    }

    public String toString() {
        return "RecoverableUpdateStream{strategy=" + this.strategy + ", delegate=" + this.delegate + ", priorException=" + this.priorException + '}';
    }
}
