package io.synadia.flink.examples;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.synadia.flink.examples.support.ExampleConnectionListener;
import io.synadia.flink.examples.support.ExampleErrorListener;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.sink.NatsSink;
import io.synadia.flink.v0.sink.NatsSinkBuilder;
import io.synadia.flink.v0.source.NatsSource;
import io.synadia.flink.v0.source.NatsSourceBuilder;
import java.util.Iterator;
import java.util.Properties;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:io/synadia/flink/examples/SourceToSinkExample.class */
public class SourceToSinkExample {
    public static void main(String[] strArr) throws Exception {
        Properties loadPropertiesFromFile = PropertiesUtils.loadPropertiesFromFile("src/examples/resources/application.properties");
        Connection connect = connect(loadPropertiesFromFile);
        Publisher publisher = new Publisher(connect, PropertiesUtils.getPropertyAsList(loadPropertiesFromFile, "source.subjects", new String[0]));
        new Thread(publisher).start();
        Dispatcher createDispatcher = connect.createDispatcher(message -> {
            System.out.printf("Listening. Subject: %s Payload: %s\n", message.getSubject(), new String(message.getData()));
        });
        Iterator<String> it = PropertiesUtils.getPropertyAsList(loadPropertiesFromFile, "sink.subjects", new String[0]).iterator();
        while (it.hasNext()) {
            createDispatcher.subscribe(it.next());
        }
        NatsSource build = new NatsSourceBuilder().sourceProperties(loadPropertiesFromFile).connectionProperties(loadPropertiesFromFile).build();
        System.out.println(build);
        NatsSink build2 = new NatsSinkBuilder().sinkProperties(loadPropertiesFromFile).connectionProperties(loadPropertiesFromFile).build();
        System.out.println(build2);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "NatsSource").sinkTo(build2);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5L)));
        executionEnvironment.executeAsync("Example");
        Thread.sleep(10000L);
        publisher.stop();
        executionEnvironment.close();
        System.exit(0);
    }

    private static Connection connect(Properties properties) throws Exception {
        return Nats.connect(new Options.Builder().properties(properties).connectionListener(new ExampleConnectionListener()).errorListener(new ExampleErrorListener()).build());
    }
}
