package com.pushtechnology.diffusion.examples.runnable;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicException;
import com.pushtechnology.diffusion.client.features.IncompatibleTopicStateException;
import com.pushtechnology.diffusion.client.features.NoSuchTopicException;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.UnsatisfiedConstraintException;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.PermissionsException;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/pushtechnology/diffusion/examples/runnable/CompetitiveIncrement.class */
public final class CompetitiveIncrement extends AbstractClient {
    private static final Logger LOG = LoggerFactory.getLogger(CompetitiveIncrement.class);
    private static final UpdateConstraint.Factory CONSTRAINTS = Diffusion.updateConstraints();
    private final ScheduledExecutorService executor;
    private volatile Future<?> updateTask;
    private volatile Long value;

    public CompetitiveIncrement(String str, String str2) {
        super(str, str2);
        this.executor = Executors.newSingleThreadScheduledExecutor();
    }

    @Override // com.pushtechnology.diffusion.examples.runnable.AbstractClient
    public void onStarted(Session session) {
        subscribeToPath(session);
    }

    private void subscribeToPath(Session session) {
        Topics feature = session.feature(Topics.class);
        feature.addStream("long/increment", Long.class, new Topics.ValueStream.Default<Long>() { // from class: com.pushtechnology.diffusion.examples.runnable.CompetitiveIncrement.1
            public void onValue(String str, TopicSpecification topicSpecification, Long l, Long l2) {
                CompetitiveIncrement.this.value = l2;
            }
        });
        feature.subscribe("long/increment").whenComplete((obj, th) -> {
            if (th == null) {
                initialiseTopic(feature);
            } else {
                LOG.warn("Subscription failed", th);
                stop();
            }
        });
    }

    private void initialiseTopic(Topics topics) {
        topics.addAndSet("long/increment", Diffusion.newTopicSpecification(TopicType.INT64), Long.class, 0L, CONSTRAINTS.noTopic()).whenComplete((topicCreationResult, th) -> {
            if (th == null) {
                if (topicCreationResult == TopicCreationResult.CREATED) {
                    LOG.info("Topic created");
                    scheduleIncrement(topics);
                    return;
                }
                return;
            }
            Throwable cause = th.getCause();
            if (cause instanceof UnsatisfiedConstraintException) {
                LOG.info("Topic exists");
                scheduleIncrement(topics);
                return;
            }
            if (cause instanceof IncompatibleTopicException) {
                LOG.warn("An existing topic is not compatible");
                stop();
            } else if (cause instanceof IncompatibleTopicStateException) {
                LOG.warn("An existing topic is managed by a different component");
                stop();
            } else if (cause instanceof TopicControl.TopicLicenseLimitException) {
                LOG.warn("License limit reached", th);
                stop();
            } else {
                LOG.warn("Topic creation failed", th);
                stop();
            }
        });
    }

    private void scheduleIncrement(Topics topics) {
        this.updateTask = this.executor.schedule(() -> {
            performIncrement(topics);
        }, 5L, TimeUnit.SECONDS);
    }

    private void performIncrement(Topics topics) {
        Long l = this.value;
        topics.set("long/increment", Long.class, Long.valueOf(l.longValue() + 1), CONSTRAINTS.value(UpdateConstraint.Operator.IS, l)).whenComplete((obj, th) -> {
            if (th != null) {
                handleIncrementFailure(topics, th);
            } else {
                LOG.info("Topic incremented {} -> {}", l, Long.valueOf(l.longValue() + 1));
                scheduleIncrement(topics);
            }
        });
    }

    private void handleIncrementFailure(Topics topics, Throwable th) {
        Throwable cause = th.getCause();
        if ((cause instanceof ClusterRoutingException) || (cause instanceof UnsatisfiedConstraintException)) {
            performIncrement(topics);
            return;
        }
        if (cause instanceof NoSuchTopicException) {
            LOG.warn("The topic has been deleted");
            stop();
            return;
        }
        if (cause instanceof IncompatibleTopicException) {
            LOG.warn("The topic is not compatible");
            stop();
            return;
        }
        if (cause instanceof IncompatibleTopicStateException) {
            LOG.warn("The topic is managed by a different component");
            stop();
        } else if (cause instanceof PermissionsException) {
            LOG.warn("The session does't have permission to update the path");
            stop();
        } else if (cause instanceof SessionClosedException) {
            LOG.warn("The session has closed");
            stop();
        }
    }

    @Override // com.pushtechnology.diffusion.examples.runnable.AbstractClient
    public void onDisconnected() {
        Future<?> future = this.updateTask;
        if (future != null) {
            future.cancel(false);
        }
    }

    public static void main(String[] strArr) throws InterruptedException {
        CompetitiveIncrement competitiveIncrement = new CompetitiveIncrement("ws://diffusion.example.com:80", "auth");
        competitiveIncrement.start("auth_secret");
        competitiveIncrement.waitForStopped();
    }
}
