package com.arangodb.internal.velocystream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLContext;

/* loaded from: input_file:com/arangodb/internal/velocystream/ConnectionAsync.class */
public class ConnectionAsync extends Connection {
    private ExecutorService executor;
    private final MessageStore messageStore;

    /* loaded from: input_file:com/arangodb/internal/velocystream/ConnectionAsync$Builder.class */
    public static class Builder {
        private final MessageStore messageStore;
        private String host;
        private Integer port;
        private Integer timeout;
        private Boolean useSsl;
        private SSLContext sslContext;

        public Builder(MessageStore messageStore) {
            this.messageStore = messageStore;
        }

        public Builder host(String str) {
            this.host = str;
            return this;
        }

        public Builder port(Integer num) {
            this.port = num;
            return this;
        }

        public Builder timeout(Integer num) {
            this.timeout = num;
            return this;
        }

        public Builder useSsl(Boolean bool) {
            this.useSsl = bool;
            return this;
        }

        public Builder sslContext(SSLContext sSLContext) {
            this.sslContext = sSLContext;
            return this;
        }

        public ConnectionAsync build() {
            return new ConnectionAsync(this.host, this.port, this.timeout, this.useSsl, this.sslContext, this.messageStore);
        }
    }

    private ConnectionAsync(String str, Integer num, Integer num2, Boolean bool, SSLContext sSLContext, MessageStore messageStore) {
        super(str, num, num2, bool, sSLContext);
        this.messageStore = messageStore;
    }

    public synchronized void open() throws IOException {
        if (isOpen()) {
            return;
        }
        super.open();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(() -> {
            ChunkStore chunkStore = new ChunkStore(this.messageStore);
            while (isOpen()) {
                try {
                    Chunk readChunk = readChunk();
                    ByteBuffer storeChunk = chunkStore.storeChunk(readChunk);
                    if (storeChunk != null) {
                        byte[] bArr = new byte[readChunk.getContentLength()];
                        readBytesIntoBuffer(bArr, 0, bArr.length);
                        storeChunk.put(bArr);
                        chunkStore.checkCompleteness(readChunk.getMessageId());
                    }
                } catch (Exception e) {
                    this.messageStore.clear(e);
                    close();
                    return;
                }
            }
            this.messageStore.clear(new IOException("The socket is closed."));
            close();
        });
    }

    public synchronized void close() {
        this.messageStore.clear();
        if (this.executor != null && !this.executor.isShutdown()) {
            this.executor.shutdown();
        }
        super.close();
    }

    public synchronized CompletableFuture<Message> write(Message message, Collection<Chunk> collection) {
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        this.messageStore.storeMessage(message.getId(), completableFuture);
        super.writeIntern(message, collection);
        return completableFuture;
    }
}
