package com.pushtechnology.diffusion.examples.runnable;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicException;
import com.pushtechnology.diffusion.client.features.InvalidUpdateStreamException;
import com.pushtechnology.diffusion.client.features.NoSuchTopicException;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UnsatisfiedConstraintException;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.session.PermissionsException;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/pushtechnology/diffusion/examples/runnable/CoordinatingSessionExample.class */
public final class CoordinatingSessionExample extends AbstractClient {
    private static final long PERIOD = 5000;
    private final ScheduledExecutorService executor;
    private final Random random;
    private static final Logger LOG = LoggerFactory.getLogger(CoordinatingSessionExample.class);
    private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;

    /* loaded from: input_file:com/pushtechnology/diffusion/examples/runnable/CoordinatingSessionExample$UpdateTask.class */
    private final class UpdateTask {
        private final Supplier<UpdateStream<Long>> updateStreamFactory;
        private final Supplier<Long> valueSupplier;
        private volatile UpdateStream<Long> updateStream;

        private UpdateTask(Supplier<UpdateStream<Long>> supplier, UpdateStream<Long> updateStream, Supplier<Long> supplier2) {
            this.updateStreamFactory = supplier;
            this.updateStream = updateStream;
            this.valueSupplier = supplier2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void scheduleUpdate() {
            CoordinatingSessionExample.this.executor.schedule(this::performUpdate, CoordinatingSessionExample.PERIOD, CoordinatingSessionExample.UNIT);
        }

        private void performUpdate() {
            performUpdate(this.valueSupplier.get().longValue());
        }

        private void performUpdate(long j) {
            this.updateStream.set(Long.valueOf(j)).whenComplete((topicCreationResult, th) -> {
                if (th != null) {
                    handleUpdateFailure(j, th);
                } else {
                    scheduleUpdate();
                }
            });
        }

        private void handleUpdateFailure(long j, Throwable th) {
            Throwable cause = th.getCause();
            if (cause instanceof ClusterRoutingException) {
                this.updateStream = this.updateStreamFactory.get();
                performUpdate(j);
                return;
            }
            if (cause instanceof UnsatisfiedConstraintException) {
                CoordinatingSessionExample.LOG.warn("Another session has gained the responsibility for updating the topic");
                CoordinatingSessionExample.this.stop();
                return;
            }
            if (cause instanceof NoSuchTopicException) {
                CoordinatingSessionExample.LOG.warn("The topic has been deleted");
                CoordinatingSessionExample.this.stop();
                return;
            }
            if (cause instanceof InvalidUpdateStreamException) {
                CoordinatingSessionExample.LOG.warn("The update stream is no longer valid");
                CoordinatingSessionExample.this.stop();
                return;
            }
            if (cause instanceof IncompatibleTopicException) {
                CoordinatingSessionExample.LOG.warn("The topic is not compatible");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof PermissionsException) {
                CoordinatingSessionExample.LOG.warn("The session doesn't have permission to update the path");
                CoordinatingSessionExample.this.stop();
            } else if (cause instanceof SessionClosedException) {
                CoordinatingSessionExample.LOG.warn("The session has closed");
            }
        }
    }

    public CoordinatingSessionExample(String str, String str2) {
        super(str, str2);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.random = new Random();
    }

    @Override // com.pushtechnology.diffusion.examples.runnable.AbstractClient
    public void onConnected(Session session) {
        updatePath(session, "topic");
    }

    private void updatePath(Session session, String str) {
        session.lock(str, Session.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS).thenAccept(sessionLock -> {
            onLockAcquired(session, str, sessionLock);
        });
    }

    private void onLockAcquired(Session session, String str, Session.SessionLock sessionLock) {
        TopicUpdate feature = session.feature(TopicUpdate.class);
        UpdateConstraint locked = Diffusion.updateConstraints().locked(sessionLock);
        feature.addAndSet(str + "/last_updater", Diffusion.newTopicSpecification(TopicType.STRING), String.class, session.getSessionId().toString(), locked);
        TopicSpecification newTopicSpecification = Diffusion.newTopicSpecification(TopicType.INT64);
        Supplier supplier = () -> {
            return feature.newUpdateStreamBuilder().specification(newTopicSpecification).constraint(locked).build(str, Long.class);
        };
        UpdateStream updateStream = (UpdateStream) supplier.get();
        updateStream.validate().whenComplete((topicCreationResult, th) -> {
            if (th != null) {
                LOG.warn("Unable to initialise first value stream. Unable to begin updating path.", th);
                stop();
            } else {
                Random random = this.random;
                random.getClass();
                new UpdateTask(supplier, updateStream, random::nextLong).scheduleUpdate();
            }
        });
    }
}
