package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/pushtechnology/diffusion/examples/TopicUpdateExample.class */
public final class TopicUpdateExample {
    private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateExample.class);

    private TopicUpdateExample() {
    }

    public static void statelessSet(Session session) {
        session.feature(TopicUpdate.class).set("a/path", Long.class, 6L).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetWithConstraint(Session session) {
        TopicUpdate feature = session.feature(TopicUpdate.class);
        UpdateConstraint.Factory updateConstraints = Diffusion.updateConstraints();
        session.lock("a/lock").thenApply(sessionLock -> {
            return updateConstraints.value(UpdateConstraint.Operator.IS, 5L).and(updateConstraints.locked(sessionLock));
        }).thenCompose(updateConstraint -> {
            return feature.set("a/path", Long.class, 6L, updateConstraint);
        }).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetRepeatedly(Session session) {
        TopicUpdate feature = session.feature(TopicUpdate.class);
        UpdateConstraint.Factory updateConstraints = Diffusion.updateConstraints();
        feature.set("a/path", String.class, "hello").thenCompose(obj -> {
            return feature.set("a/path", String.class, (Object) null);
        }).thenCompose(obj2 -> {
            return feature.set("a/path", String.class, "who are you?", updateConstraints.noValue());
        }).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void statelessSetPeriodically(Session session) {
        TopicUpdate feature = session.feature(TopicUpdate.class);
        Random random = new Random();
        runPeriodicallyUntilFirstFailure(Executors.newSingleThreadScheduledExecutor(), () -> {
            return feature.set("random/long", Long.class, Long.valueOf(random.nextLong()));
        }, 5L, TimeUnit.SECONDS).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void addAndSetTopicWithStateless(Session session) {
        session.feature(TopicUpdate.class).addAndSet("a/path", Diffusion.newTopicSpecification(TopicType.STRING), String.class, "hello").thenAccept(topicCreationResult -> {
            if (topicCreationResult == TopicCreationResult.CREATED) {
                LOG.info("A new topic was created");
            } else {
                LOG.info("An existing topic was updated");
            }
        });
    }

    public static void streamSet(Session session) {
        session.feature(TopicUpdate.class).newUpdateStreamBuilder().build("a/path", Long.class).set(6L).whenComplete((v0, v1) -> {
            updateHandler(v0, v1);
        });
    }

    public static void streamSetWithConstraint(Session session) {
        TopicUpdate feature = session.feature(TopicUpdate.class);
        UpdateConstraint.Factory updateConstraints = Diffusion.updateConstraints();
        session.lock("a/lock").thenApply(sessionLock -> {
            return updateConstraints.value(UpdateConstraint.Operator.IS, 5L).and(updateConstraints.locked(sessionLock));
        }).thenApply(updateConstraint -> {
            return feature.newUpdateStreamBuilder().constraint(updateConstraint).build("a/path", Long.class);
        }).thenCompose(updateStream -> {
            return updateStream.set(6L);
        }).whenComplete((v0, v1) -> {
            updateHandler(v0, v1);
        });
    }

    public static void streamSetRepeatedly(Session session) {
        UpdateStream build = session.feature(TopicUpdate.class).newUpdateStreamBuilder().constraint(Diffusion.updateConstraints().noValue()).build("a/path", String.class);
        build.set("hello").thenCompose(topicCreationResult -> {
            return build.set((Object) null);
        }).thenCompose(topicCreationResult2 -> {
            return build.set("who are you?");
        }).whenComplete((v0, v1) -> {
            updateHandler(v0, v1);
        });
    }

    public static void streamSetRepeatedlyWithoutWaiting(Session session) {
        UpdateStream build = session.feature(TopicUpdate.class).newUpdateStreamBuilder().constraint(Diffusion.updateConstraints().noValue()).build("a/path", String.class);
        build.set("hello");
        build.set((Object) null);
        build.set("who are you?");
    }

    public static void streamSetPeriodically(Session session) {
        UpdateStream build = session.feature(TopicUpdate.class).newUpdateStreamBuilder().build("random/long", Long.class);
        CompletableFuture validate = build.validate();
        validate.whenComplete((topicCreationResult, th) -> {
            if (th != null) {
                LOG.warn("Failed to validate stream", th);
            }
        });
        Random random = new Random();
        validate.thenCompose(topicCreationResult2 -> {
            return runPeriodicallyUntilFirstFailure(Executors.newSingleThreadScheduledExecutor(), () -> {
                return build.set(Long.valueOf(random.nextLong()));
            }, 5L, TimeUnit.SECONDS);
        }).whenComplete(TopicUpdateExample::updateHandler);
    }

    public static void createTopicWithStream(Session session) {
        session.feature(TopicUpdate.class).newUpdateStreamBuilder().specification(Diffusion.newTopicSpecification(TopicType.STRING)).build("a/path", String.class).validate().thenAccept(topicCreationResult -> {
            if (topicCreationResult == TopicCreationResult.CREATED) {
                LOG.info("The topic was created");
            } else {
                LOG.info("The topic already exist");
            }
        });
    }

    public static void addAndSetTopicWithStream(Session session) {
        session.feature(TopicUpdate.class).newUpdateStreamBuilder().specification(Diffusion.newTopicSpecification(TopicType.STRING)).build("a/path", String.class).set("hello").thenAccept(topicCreationResult -> {
            if (topicCreationResult == TopicCreationResult.CREATED) {
                LOG.info("A new topic was created");
            } else {
                LOG.info("An existing topic was updated");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<?> runPeriodicallyUntilFirstFailure(ScheduledExecutorService scheduledExecutorService, Supplier<CompletableFuture<?>> supplier, long j, TimeUnit timeUnit) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        scheduleNextUpdate(scheduledExecutorService, supplier, j, timeUnit, completableFuture);
        return completableFuture;
    }

    private static void scheduleNextUpdate(ScheduledExecutorService scheduledExecutorService, Supplier<CompletableFuture<?>> supplier, long j, TimeUnit timeUnit, CompletableFuture<?> completableFuture) {
        scheduledExecutorService.schedule(() -> {
            if (completableFuture.isDone()) {
                return;
            }
            ((CompletableFuture) supplier.get()).whenComplete((obj, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    scheduleNextUpdate(scheduledExecutorService, supplier, j, timeUnit, completableFuture);
                }
            });
        }, j, timeUnit);
    }

    private static <T> void updateHandler(T t, Throwable th) {
        if (th != null) {
            LOG.error("Update failed", th);
        }
    }
}
