package io.synadia.flink.examples.v0;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.payload.StringPayloadDeserializer;
import io.synadia.flink.v0.payload.StringPayloadSerializer;
import io.synadia.flink.v0.sink.NatsSinkBuilder;
import io.synadia.flink.v0.source.NatsJetStreamSource;
import io.synadia.flink.v0.source.NatsJetStreamSourceBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:io/synadia/flink/examples/v0/SourceToSinkJsExample.class */
public class SourceToSinkJsExample {
    public static void main(String[] strArr) throws Exception {
        Properties loadPropertiesFromFile = PropertiesUtils.loadPropertiesFromFile("src/examples/resources/application.properties");
        String property = loadPropertiesFromFile.getProperty("source.JsSubject");
        String property2 = loadPropertiesFromFile.getProperty("sink.JsSubject");
        String property3 = loadPropertiesFromFile.getProperty("source.stream");
        String property4 = loadPropertiesFromFile.getProperty("source.consumer");
        Connection connect = connect(loadPropertiesFromFile);
        JetStreamManagement jetStreamManagement = connect.jetStreamManagement();
        JetStream jetStream = connect.jetStream();
        createStream(jetStreamManagement, property3, property);
        publish(jetStream, property, 10);
        createConsumer(jetStreamManagement, property3, property, property4);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        NatsJetStreamSource build = new NatsJetStreamSourceBuilder().subjects(property).payloadDeserializer(new StringPayloadDeserializer()).connectionProperties(loadPropertiesFromFile).consumerName(property4).maxFetchRecords(100).maxFetchTime(Duration.ofSeconds(5L)).boundness(Boundedness.BOUNDED).build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(10000L);
        DataStreamSource fromSource = executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "nats-source-input");
        Dispatcher createDispatcher = connect.createDispatcher();
        Objects.requireNonNull(synchronizedList);
        createDispatcher.subscribe(property2, (v1) -> {
            r2.add(v1);
        });
        fromSource.sinkTo(new NatsSinkBuilder().subjects(property2).connectionProperties(loadPropertiesFromFile).payloadSerializer(new StringPayloadSerializer()).build());
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5L)));
        executionEnvironment.executeAsync("JetStream Source-to-Sink Example");
        Thread.sleep(12000L);
        createDispatcher.unsubscribe(property2);
        executionEnvironment.close();
        Iterator it = synchronizedList.iterator();
        while (it.hasNext()) {
            System.out.println("Received message at sink: " + new String(((Message) it.next()).getData()));
        }
        jetStreamManagement.deleteStream(property3);
        System.out.println("Stream deleted: " + property3);
        connect.close();
        System.exit(0);
    }

    private static Connection connect(Properties properties) throws Exception {
        return Nats.connect(properties.getProperty("io.nats.client.url"));
    }

    private static void createStream(JetStreamManagement jetStreamManagement, String str, String str2) throws Exception {
        jetStreamManagement.addStream(StreamConfiguration.builder().name(str).subjects(new String[]{str2}).build());
        System.out.println("Stream created: " + str);
    }

    private static void createConsumer(JetStreamManagement jetStreamManagement, String str, String str2, String str3) throws Exception {
        jetStreamManagement.addOrUpdateConsumer(str, ConsumerConfiguration.builder().durable(str3).ackPolicy(AckPolicy.All).filterSubject(str2).build());
        System.out.println("Consumer created: " + str3);
    }

    private static void publish(JetStream jetStream, String str, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            jetStream.publish(str, ("Message " + i2).getBytes());
        }
    }
}
