package io.synadia.flink.v0.sink.writer;

import io.nats.client.Connection;
import io.nats.client.impl.Headers;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.utils.MiscUtils;
import io.synadia.flink.v0.payload.PayloadSerializer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

/* loaded from: input_file:io/synadia/flink/v0/sink/writer/NatsSinkWriter.class */
public class NatsSinkWriter<InputT> implements SinkWriter<InputT>, Serializable {
    private final String sinkId;
    private final List<String> subjects;
    private final ConnectionFactory connectionFactory;
    private final PayloadSerializer<InputT> payloadSerializer;
    private final Sink.InitContext sinkInitContext;
    private transient String id;
    private transient Connection connection;

    public NatsSinkWriter(String str, List<String> list, PayloadSerializer<InputT> payloadSerializer, ConnectionFactory connectionFactory, Sink.InitContext initContext) throws IOException {
        this.sinkId = str;
        this.id = MiscUtils.generatePrefixedId(str);
        this.subjects = list;
        this.payloadSerializer = payloadSerializer;
        this.connectionFactory = connectionFactory;
        this.sinkInitContext = initContext;
        this.connection = connectionFactory.connect();
    }

    public void write(InputT inputt, SinkWriter.Context context) throws IOException, InterruptedException {
        byte[] bytes = this.payloadSerializer.getBytes(inputt);
        Iterator<String> it = this.subjects.iterator();
        while (it.hasNext()) {
            this.connection.publish(it.next(), (String) null, (Headers) null, bytes);
        }
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        if (this.connection.getStatus() == Connection.Status.CONNECTED) {
            this.connection.flushBuffer();
        }
    }

    public void close() throws Exception {
        this.connection.close();
    }

    private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
        objectInputStream.defaultReadObject();
        this.id = MiscUtils.generatePrefixedId(this.sinkId);
        this.connection = this.connectionFactory.connect();
    }

    public String toString() {
        return "NatsSinkWriter{id='" + this.id + "', subjects=" + this.subjects + "}";
    }
}
