package com.pushtechnology.diffusion.client.features.impl.update.stream;

import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.InvalidUpdateStreamException;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.UpdateStreamRetryLimitException;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.primitive.impl.DoubleDataTypeImpl;
import com.pushtechnology.diffusion.datatype.primitive.impl.Int64DataTypeImpl;
import com.pushtechnology.diffusion.datatype.primitive.impl.StringDataTypeImpl;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.topics.update.conditional.AddAndSetTopicRequest;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamAndSetRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java8.util.concurrent.CompletableFuture;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:com/pushtechnology/diffusion/client/features/impl/update/stream/RecoveryStream.class */
final class RecoveryStream<T> extends AbstractPendingStream<T> {
    private final AtomicInteger numberOfRetryAttempts;
    private final UpdateStreamImpl<T> stream;
    private final List<ScheduledFuture<?>> futures;
    private final UpdateStreamRecovery<T> recovery;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecoveryStream(UpdateStreamImpl<T> updateStreamImpl, UpdateStreamRecovery<T> updateStreamRecovery) {
        super(updateStreamImpl, updateStreamRecovery.getUpdateBuffer().get(0), updateStreamRecovery.getDataType(), updateStreamRecovery.getToValueBytes(), updateStreamRecovery.getToValueOrDeltaBytes(), updateStreamRecovery.getValidateService(), updateStreamRecovery.getUpdateService(), updateStreamRecovery.getSetService(), updateStreamRecovery.getUpdateBufferSize());
        this.stream = getStream();
        this.recovery = updateStreamRecovery;
        this.numberOfRetryAttempts = new AtomicInteger(1);
        this.futures = new ArrayList();
    }

    @Override // com.pushtechnology.diffusion.client.features.UpdateStream
    public CompletableFuture<TopicCreationResult> validate() {
        throw new IllegalStateException("stream is attempting recovery");
    }

    @Override // com.pushtechnology.diffusion.client.features.impl.update.stream.InternalUpdateStream
    public boolean onSetFailed(Throwable th) {
        synchronized (this.stream) {
            this.stream.setDelegate(new InvalidSetStream(th, getNextValue()));
            getDeferredUpdates().forEach(pair -> {
                ((CompletableFuture) pair.getFirst()).completeExceptionally(new InvalidUpdateStreamException(th));
            });
        }
        this.futures.forEach(scheduledFuture -> {
            scheduledFuture.cancel(true);
        });
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recover() {
        T t = this.stream.getUpdateBuffer().get(0);
        DataType<T> dataType = getDataType();
        if (t == null && !(dataType instanceof Int64DataTypeImpl) && !(dataType instanceof DoubleDataTypeImpl) && !(dataType instanceof StringDataTypeImpl)) {
            throw new IllegalArgumentException("null can only be passed for int64, double or string topics");
        }
        try {
            retry((IBytes) dataType.toBytes(t));
        } catch (UpdateStreamRetryLimitException e) {
            this.stream.onSetFailed(e);
        }
    }

    synchronized void retry(IBytes iBytes) throws UpdateStreamRetryLimitException {
        if (this.numberOfRetryAttempts.get() > this.recovery.getNumberOfRetries()) {
            throw new UpdateStreamRetryLimitException("the number of retry attempts has exceeded the limit");
        }
        this.numberOfRetryAttempts.getAndIncrement();
        this.futures.add(this.recovery.getExecutor().schedule(() -> {
            requestNewStream(iBytes);
        }, this.recovery.getRetryInterval(), TimeUnit.MILLISECONDS));
    }

    void requestNewStream(IBytes iBytes) {
        if (this.recovery.isAddAndSetStream()) {
            this.recovery.getStreamAddAndSetTopicService().sendCommand(new AddAndSetTopicRequest(this.recovery.getPath(), this.recovery.getTopicSpecification(), iBytes, this.recovery.getConstraint())).whenComplete((updateStreamAddTopicResponse, th) -> {
                if (th == null) {
                    this.stream.onSetComplete(updateStreamAddTopicResponse);
                    return;
                }
                if (!(th instanceof ClusterRoutingException)) {
                    this.stream.onSetFailed(th);
                    return;
                }
                try {
                    retry(iBytes);
                } catch (UpdateStreamRetryLimitException e) {
                    this.stream.onSetFailed(e);
                }
            });
        } else {
            this.recovery.getCreateStreamAndSetService().sendCommand(new CreateUpdateStreamAndSetRequest(this.recovery.getPath(), this.recovery.getTopicType(), iBytes, this.recovery.getConstraint())).whenComplete((createUpdateStreamResponse, th2) -> {
                if (th2 == null) {
                    this.stream.onSetComplete(createUpdateStreamResponse);
                    return;
                }
                if (!(th2 instanceof ClusterRoutingException)) {
                    this.stream.onSetFailed(th2);
                    return;
                }
                try {
                    retry(iBytes);
                } catch (UpdateStreamRetryLimitException e) {
                    this.stream.onSetFailed(e);
                }
            });
        }
    }

    synchronized int getRetryAttempts() {
        return this.numberOfRetryAttempts.get();
    }
}
