package io.deephaven.client.impl;

import com.google.flatbuffers.FlatBufferBuilder;
import com.google.protobuf.ByteStringAccess;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageStreamReader;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.extensions.barrage.util.StreamReader;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/deephaven/client/impl/BarrageSubscriptionImpl.class */
public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implements BarrageSubscription {
    private static final Logger log = LoggerFactory.getLogger(BarrageSubscriptionImpl.class);
    private final String logName;
    private final TableHandle tableHandle;
    private final BarrageSubscriptionOptions options;
    private final ClientCallStreamObserver<Flight.FlightData> observer;
    private BarrageTable resultTable;
    private volatile Condition completedCondition;
    private volatile boolean completed;
    private volatile long rowsReceived;
    private volatile Throwable exceptionWhileCompleting;
    private InstrumentedTableUpdateListener listener;
    private boolean subscribed;
    private volatile boolean connected;
    private boolean isSnapshot;

    /* loaded from: input_file:io/deephaven/client/impl/BarrageSubscriptionImpl$BarrageDataMarshaller.class */
    public static class BarrageDataMarshaller implements MethodDescriptor.Marshaller<BarrageMessage> {
        private final BarrageSubscriptionOptions options;
        private final ChunkType[] columnChunkTypes;
        private final Class<?>[] columnTypes;
        private final Class<?>[] componentTypes;
        private final StreamReader streamReader;

        public BarrageDataMarshaller(BarrageSubscriptionOptions barrageSubscriptionOptions, ChunkType[] chunkTypeArr, Class<?>[] clsArr, Class<?>[] clsArr2, StreamReader streamReader) {
            this.options = barrageSubscriptionOptions;
            this.columnChunkTypes = chunkTypeArr;
            this.columnTypes = clsArr;
            this.componentTypes = clsArr2;
            this.streamReader = streamReader;
        }

        public InputStream stream(BarrageMessage barrageMessage) {
            throw new UnsupportedOperationException("BarrageDataMarshaller unexpectedly used to directly convert BarrageMessage to InputStream");
        }

        /* renamed from: parse, reason: merged with bridge method [inline-methods] */
        public BarrageMessage m3parse(InputStream inputStream) {
            return this.streamReader.safelyParseFrom(this.options, (BitSet) null, this.columnChunkTypes, this.columnTypes, this.componentTypes, inputStream);
        }
    }

    /* loaded from: input_file:io/deephaven/client/impl/BarrageSubscriptionImpl$DoExchangeObserver.class */
    private class DoExchangeObserver implements ClientResponseObserver<Flight.FlightData, BarrageMessage> {
        private DoExchangeObserver() {
        }

        public void beforeStart(ClientCallStreamObserver<Flight.FlightData> clientCallStreamObserver) {
            clientCallStreamObserver.disableAutoInboundFlowControl();
        }

        public void onNext(BarrageMessage barrageMessage) {
            if (barrageMessage == null) {
                return;
            }
            try {
                BarrageTable barrageTable = BarrageSubscriptionImpl.this.resultTable;
                if (!BarrageSubscriptionImpl.this.connected || barrageTable == null) {
                    if (barrageMessage != null) {
                        barrageMessage.close();
                    }
                } else {
                    BarrageSubscriptionImpl.this.rowsReceived += barrageMessage.rowsIncluded.size();
                    barrageTable.handleBarrageMessage(barrageMessage);
                    if (barrageMessage != null) {
                        barrageMessage.close();
                    }
                }
            } catch (Throwable th) {
                if (barrageMessage != null) {
                    try {
                        barrageMessage.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public void onError(Throwable th) {
            BarrageSubscriptionImpl.log.error().append(BarrageSubscriptionImpl.this).append(": Error detected in subscription: ").append(th).endl();
            BarrageTable barrageTable = BarrageSubscriptionImpl.this.resultTable;
            if (!BarrageSubscriptionImpl.this.connected || barrageTable == null) {
                return;
            }
            barrageTable.handleBarrageError(th);
            BarrageSubscriptionImpl.this.handleDisconnect();
        }

        public void onCompleted() {
            BarrageSubscriptionImpl.this.handleDisconnect();
        }
    }

    public BarrageSubscriptionImpl(BarrageSession barrageSession, ScheduledExecutorService scheduledExecutorService, TableHandle tableHandle, BarrageSubscriptionOptions barrageSubscriptionOptions) {
        super(false);
        this.completed = false;
        this.rowsReceived = 0L;
        this.exceptionWhileCompleting = null;
        this.listener = null;
        this.subscribed = false;
        this.connected = true;
        this.isSnapshot = false;
        this.logName = tableHandle.exportId().toString();
        this.options = barrageSubscriptionOptions;
        this.tableHandle = tableHandle;
        BarrageUtil.ConvertedArrowSchema convertArrowSchema = BarrageUtil.convertArrowSchema(tableHandle.response());
        this.resultTable = BarrageTable.make(scheduledExecutorService, convertArrowSchema.tableDef, convertArrowSchema.attributes, -1L);
        this.resultTable.addParentReference(this);
        MethodDescriptor<Flight.FlightData, BarrageMessage> clientDoExchangeDescriptor = getClientDoExchangeDescriptor(barrageSubscriptionOptions, this.resultTable.getWireChunkTypes(), this.resultTable.getWireTypes(), this.resultTable.getWireComponentTypes(), new BarrageStreamReader(this.resultTable.getDeserializationTmConsumer()));
        Context attach = Context.ROOT.attach();
        try {
            ClientCall newCall = barrageSession.channel().newCall(clientDoExchangeDescriptor, CallOptions.DEFAULT);
            Context.ROOT.detach(attach);
            this.observer = ClientCalls.asyncBidiStreamingCall(newCall, new DoExchangeObserver());
            this.observer.request(Integer.MAX_VALUE);
        } catch (Throwable th) {
            Context.ROOT.detach(attach);
            throw th;
        }
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public boolean isCompleted() {
        return this.completed;
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public long getRowsReceived() {
        return this.rowsReceived;
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable entireTable() throws InterruptedException {
        return entireTable(true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable entireTable(boolean z) throws InterruptedException {
        return partialTable(null, null, false, z);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable partialTable(RowSet rowSet, BitSet bitSet) throws InterruptedException {
        return partialTable(rowSet, bitSet, false, true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable partialTable(RowSet rowSet, BitSet bitSet, boolean z) throws InterruptedException {
        return partialTable(rowSet, bitSet, z, true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public synchronized BarrageTable partialTable(final RowSet rowSet, BitSet bitSet, final boolean z, boolean z2) throws InterruptedException {
        if (!this.connected) {
            throw new UncheckedDeephavenException(this + " is no longer an active subscription and cannot be retained further");
        }
        if (this.subscribed) {
            throw new UncheckedDeephavenException("BarrageSubscription objects cannot be reused.");
        }
        if (UpdateGraphProcessor.DEFAULT.sharedLock().isHeldByCurrentThread()) {
            throw new UnsupportedOperationException("Cannot create subscription while holding the UpdateGraphProcessor shared lock");
        }
        if (UpdateGraphProcessor.DEFAULT.exclusiveLock().isHeldByCurrentThread()) {
            this.completedCondition = UpdateGraphProcessor.DEFAULT.exclusiveLock().newCondition();
        }
        this.resultTable.setInitialSnapshotViewportRowCount(rowSet == null ? -1L : rowSet.size());
        this.observer.onNext(Flight.FlightData.newBuilder().setAppMetadata(ByteStringAccess.wrap(makeRequestInternal(rowSet, bitSet, z, this.options))).build());
        this.subscribed = true;
        this.listener = new InstrumentedTableUpdateListener("example-listener") { // from class: io.deephaven.client.impl.BarrageSubscriptionImpl.1
            protected void onFailureInternal(Throwable th, TableListener.Entry entry) {
                BarrageSubscriptionImpl.this.exceptionWhileCompleting = th;
                if (BarrageSubscriptionImpl.this.completedCondition != null) {
                    UpdateGraphProcessor.DEFAULT.requestSignal(BarrageSubscriptionImpl.this.completedCondition);
                    return;
                }
                synchronized (BarrageSubscriptionImpl.this) {
                    BarrageSubscriptionImpl.this.notifyAll();
                }
            }

            public void onUpdate(TableUpdate tableUpdate) {
                boolean z3 = false;
                if (rowSet == null && BarrageSubscriptionImpl.this.resultTable.getServerViewport() == null) {
                    z3 = true;
                } else if (rowSet != null && BarrageSubscriptionImpl.this.resultTable.getServerViewport() != null && z == BarrageSubscriptionImpl.this.resultTable.getServerReverseViewport()) {
                    z3 = rowSet.subsetOf(BarrageSubscriptionImpl.this.resultTable.getServerViewport());
                }
                if (z3) {
                    if (BarrageSubscriptionImpl.this.isSnapshot) {
                        BarrageSubscriptionImpl.this.resultTable.sealTable(() -> {
                            BarrageSubscriptionImpl.this.observer.onCompleted();
                            BarrageSubscriptionImpl.this.signalCompletion();
                        }, () -> {
                            BarrageSubscriptionImpl.this.exceptionWhileCompleting = new Exception();
                        });
                    } else {
                        BarrageSubscriptionImpl.this.signalCompletion();
                    }
                    BarrageSubscriptionImpl.this.resultTable.removeUpdateListener(this);
                    BarrageSubscriptionImpl.this.listener = null;
                }
            }
        };
        this.resultTable.listenForUpdates(this.listener);
        if (z2) {
            while (!this.completed && this.exceptionWhileCompleting == null) {
                if (this.completedCondition != null) {
                    this.completedCondition.await();
                } else {
                    wait();
                }
            }
        }
        if (this.exceptionWhileCompleting == null) {
            return this.resultTable;
        }
        throw new UncheckedDeephavenException("Error while handling subscription:", this.exceptionWhileCompleting);
    }

    private void signalCompletion() {
        this.completed = true;
        if (this.completedCondition != null) {
            UpdateGraphProcessor.DEFAULT.requestSignal(this.completedCondition);
        } else {
            synchronized (this) {
                notifyAll();
            }
        }
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable snapshotEntireTable() throws InterruptedException {
        return snapshotEntireTable(true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable snapshotEntireTable(boolean z) throws InterruptedException {
        return snapshotPartialTable(null, null, false, z);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable snapshotPartialTable(RowSet rowSet, BitSet bitSet) throws InterruptedException {
        return snapshotPartialTable(rowSet, bitSet, false, true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public BarrageTable snapshotPartialTable(RowSet rowSet, BitSet bitSet, boolean z) throws InterruptedException {
        return snapshotPartialTable(rowSet, bitSet, z, true);
    }

    @Override // io.deephaven.client.impl.BarrageSubscription
    public synchronized BarrageTable snapshotPartialTable(RowSet rowSet, BitSet bitSet, boolean z, boolean z2) throws InterruptedException {
        this.isSnapshot = true;
        return partialTable(rowSet, bitSet, z, z2);
    }

    protected void destroy() {
        super.destroy();
        close();
    }

    private void handleDisconnect() {
        if (this.connected) {
            if (!this.isSnapshot) {
                log.error().append(this).append(": unexpectedly closed by other host").endl();
            }
            cleanup();
        }
    }

    @Override // io.deephaven.client.impl.BarrageSubscription, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.connected) {
            ClientCallStreamObserver<Flight.FlightData> clientCallStreamObserver = this.observer;
            Objects.requireNonNull(clientCallStreamObserver);
            GrpcUtil.safelyExecute(clientCallStreamObserver::onCompleted);
            cleanup();
        }
    }

    private void cleanup() {
        this.connected = false;
        this.tableHandle.close();
        this.resultTable = null;
    }

    public LogOutput append(LogOutput logOutput) {
        return logOutput.append("Barrage/ClientSubscription/").append(this.logName).append("/").append(System.identityHashCode(this)).append("/");
    }

    private ByteBuffer makeRequestInternal(@Nullable RowSet rowSet, @Nullable BitSet bitSet, boolean z, @Nullable BarrageSubscriptionOptions barrageSubscriptionOptions) {
        FlatBufferBuilder flatBufferBuilder = new FlatBufferBuilder();
        int i = 0;
        if (bitSet != null) {
            i = BarrageSubscriptionRequest.createColumnsVector(flatBufferBuilder, bitSet.toByteArray());
        }
        int i2 = 0;
        if (rowSet != null) {
            i2 = BarrageSubscriptionRequest.createViewportVector(flatBufferBuilder, BarrageProtoUtil.toByteBuffer(rowSet));
        }
        int i3 = 0;
        if (barrageSubscriptionOptions != null) {
            i3 = barrageSubscriptionOptions.appendTo(flatBufferBuilder);
        }
        int createTicketVector = BarrageSubscriptionRequest.createTicketVector(flatBufferBuilder, this.tableHandle.ticketId().bytes());
        BarrageSubscriptionRequest.startBarrageSubscriptionRequest(flatBufferBuilder);
        BarrageSubscriptionRequest.addColumns(flatBufferBuilder, i);
        BarrageSubscriptionRequest.addViewport(flatBufferBuilder, i2);
        BarrageSubscriptionRequest.addSubscriptionOptions(flatBufferBuilder, i3);
        BarrageSubscriptionRequest.addTicket(flatBufferBuilder, createTicketVector);
        BarrageSubscriptionRequest.addReverseViewport(flatBufferBuilder, z);
        flatBufferBuilder.finish(BarrageSubscriptionRequest.endBarrageSubscriptionRequest(flatBufferBuilder));
        FlatBufferBuilder flatBufferBuilder2 = new FlatBufferBuilder();
        flatBufferBuilder2.finish(BarrageMessageWrapper.createBarrageMessageWrapper(flatBufferBuilder2, 1852338276L, (byte) 5, flatBufferBuilder2.createByteVector(flatBufferBuilder.dataBuffer())));
        return flatBufferBuilder2.dataBuffer();
    }

    public static <ReqT, RespT> MethodDescriptor<ReqT, RespT> descriptorFor(MethodDescriptor.MethodType methodType, String str, String str2, MethodDescriptor.Marshaller<ReqT> marshaller, MethodDescriptor.Marshaller<RespT> marshaller2, MethodDescriptor<?, ?> methodDescriptor) {
        return MethodDescriptor.newBuilder().setType(methodType).setFullMethodName(MethodDescriptor.generateFullMethodName(str, str2)).setSampledToLocalTracing(false).setRequestMarshaller(marshaller).setResponseMarshaller(marshaller2).setSchemaDescriptor(methodDescriptor.getSchemaDescriptor()).build();
    }

    public static MethodDescriptor<Flight.FlightData, BarrageMessage> getClientDoExchangeDescriptor(BarrageSubscriptionOptions barrageSubscriptionOptions, ChunkType[] chunkTypeArr, Class<?>[] clsArr, Class<?>[] clsArr2, StreamReader streamReader) {
        return descriptorFor(MethodDescriptor.MethodType.BIDI_STREAMING, "arrow.flight.protocol.FlightService", "DoExchange", ProtoUtils.marshaller(Flight.FlightData.getDefaultInstance()), new BarrageDataMarshaller(barrageSubscriptionOptions, chunkTypeArr, clsArr, clsArr2, streamReader), FlightServiceGrpc.getDoExchangeMethod());
    }
}
