package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Stream;
import io.pravega.connectors.flink.serialization.FlinkSerializer;
import io.pravega.connectors.flink.serialization.PravegaSerializationSchema;
import io.pravega.connectors.flink.serialization.SerializerFromSchemaRegistry;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaOutputFormat.class */
public class FlinkPravegaOutputFormat<T> extends RichOutputFormat<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkPravegaOutputFormat.class);
    private static final long serialVersionUID = 1;
    private final String stream;
    private final String scope;
    private final SerializationSchema<T> serializationSchema;
    private transient EventStreamClientFactory clientFactory;
    private final ClientConfig clientConfig;
    private final PravegaEventRouter<T> eventRouter;
    private transient EventStreamWriter<T> pravegaWriter;
    private final AtomicReference<Throwable> writeError;
    private final AtomicInteger pendingWritesCount;
    private transient ExecutorService executorService;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaOutputFormat$Builder.class */
    public static class Builder<T> extends AbstractWriterBuilder<Builder<T>> {
        private SerializationSchema<T> serializationSchema;
        private PravegaEventRouter<T> eventRouter;

        public Builder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return builder();
        }

        public Builder<T> withSerializationSchemaFromRegistry(String str, Class<T> cls) {
            this.serializationSchema = new PravegaSerializationSchema(new SerializerFromSchemaRegistry(getPravegaConfig(), str, cls));
            return builder();
        }

        public Builder<T> withEventRouter(PravegaEventRouter<T> pravegaEventRouter) {
            this.eventRouter = pravegaEventRouter;
            return builder();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractWriterBuilder
        public Builder<T> builder() {
            return this;
        }

        public FlinkPravegaOutputFormat<T> build() {
            Preconditions.checkNotNull(this.serializationSchema, "serializationSchema");
            return new FlinkPravegaOutputFormat<>(getPravegaConfig().getClientConfig(), resolveStream(), this.serializationSchema, this.eventRouter);
        }
    }

    public FlinkPravegaOutputFormat(ClientConfig clientConfig, Stream stream, SerializationSchema<T> serializationSchema, PravegaEventRouter<T> pravegaEventRouter) {
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        Preconditions.checkNotNull(stream, "stream");
        this.stream = stream.getStreamName();
        this.scope = stream.getScope();
        this.serializationSchema = (SerializationSchema) Preconditions.checkNotNull(serializationSchema, "serializationSchema");
        this.eventRouter = pravegaEventRouter;
        this.writeError = new AtomicReference<>(null);
        this.pendingWritesCount = new AtomicInteger(0);
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        FlinkSerializer flinkSerializer = new FlinkSerializer(this.serializationSchema);
        EventWriterConfig build = EventWriterConfig.builder().build();
        this.clientFactory = createClientFactory(this.scope, this.clientConfig);
        this.pravegaWriter = this.clientFactory.createEventWriter(this.stream, flinkSerializer, build);
        this.executorService = createExecutorService();
    }

    public void writeRecord(T t) throws IOException {
        checkWriteError();
        this.pendingWritesCount.incrementAndGet();
        (this.eventRouter != null ? this.pravegaWriter.writeEvent(this.eventRouter.getRoutingKey(t), t) : this.pravegaWriter.writeEvent(t)).whenCompleteAsync((r5, th) -> {
            if (th != null) {
                LOG.warn("Detected a write failure: {}", th);
                this.writeError.compareAndSet(null, th);
            }
            synchronized (this) {
                this.pendingWritesCount.decrementAndGet();
                notify();
            }
        }, (Executor) this.executorService);
    }

    public void close() throws IOException {
        Exception exc = null;
        try {
            flushAndVerify();
        } catch (Exception e) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
        }
        if (this.clientFactory != null) {
            this.clientFactory.close();
        }
        if (this.executorService != null) {
            try {
                this.executorService.shutdown();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
        }
        if (exc != null) {
            throw new IOException("exception occurred while trying to close the writer", exc);
        }
    }

    private void flushAndVerify() throws IOException {
        this.pravegaWriter.flush();
        synchronized (this) {
            while (this.pendingWritesCount.get() > 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    throw new IOException("received interrupted exception while waiting for the writes to complete", e);
                }
            }
        }
        checkWriteError();
    }

    @VisibleForTesting
    protected void checkWriteError() throws IOException {
        Throwable andSet = this.writeError.getAndSet(null);
        if (andSet != null) {
            throw new IOException("Write failure", andSet);
        }
    }

    @VisibleForTesting
    protected EventStreamClientFactory createClientFactory(String str, ClientConfig clientConfig) {
        return EventStreamClientFactory.withScope(str, clientConfig);
    }

    @VisibleForTesting
    protected ExecutorService createExecutorService() {
        return Executors.newSingleThreadExecutor();
    }

    @VisibleForTesting
    protected SerializationSchema<T> getSerializationSchema() {
        return this.serializationSchema;
    }

    @VisibleForTesting
    protected String getStream() {
        return this.stream;
    }

    @VisibleForTesting
    protected String getScope() {
        return this.scope;
    }

    @VisibleForTesting
    protected boolean isErrorOccurred() {
        return this.writeError.get() != null;
    }

    @VisibleForTesting
    protected AtomicInteger getPendingWritesCount() {
        return this.pendingWritesCount;
    }

    @VisibleForTesting
    protected PravegaEventRouter<T> getEventRouter() {
        return this.eventRouter;
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }
}
