package org.apache.pulsar.transaction.coordinator.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Generated;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.class */
public class TxnLogBufferedWriter<T> {
    public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 3585;
    public static final int BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN = 2;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN = 2;
    private final boolean batchEnabled;
    private final ManagedLedger managedLedger;
    private final Timer timer;
    private final Executor singleThreadExecutorForWrite;
    private final DataSerializer<T> dataSerializer;
    private Timeout timeout;
    private final int batchedWriteMaxRecords;
    private final int batchedWriteMaxSize;
    private final int batchedWriteMaxDelayInMillis;
    private final ArrayList<T> dataArray;
    private FlushContext flushContext;
    private long bytesSize;
    private volatile State state;
    private final TxnLogBufferedWriterMetricsStats metrics;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TxnLogBufferedWriter.class);
    private static final ManagedLedgerException BUFFERED_WRITER_CLOSED_EXCEPTION = new ManagedLedgerException.ManagedLedgerFencedException(new Exception("Transaction log buffered write has closed"));
    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(TxnLogBufferedWriter.class, State.class, "state");
    private final TxnLogBufferedWriter<T>.BookKeeperBatchedWriteCallback bookKeeperBatchedWriteCallback = new BookKeeperBatchedWriteCallback();
    private final TimerTask timingFlushTask = timeout -> {
        if (timeout.isCancelled()) {
            return;
        }
        trigFlushByTimingTask();
    };

    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$AddDataCallback.class */
    public interface AddDataCallback {
        void addComplete(Position position, Object obj);

        void addFailed(ManagedLedgerException managedLedgerException, Object obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$AsyncAddArgs.class */
    public static class AsyncAddArgs {
        private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER = new Recycler<AsyncAddArgs>() { // from class: org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.AsyncAddArgs.1
            protected AsyncAddArgs newObject(Recycler.Handle<AsyncAddArgs> handle) {
                return new AsyncAddArgs(handle);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m16newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<AsyncAddArgs>) handle);
            }
        };
        private final Recycler.Handle<AsyncAddArgs> handle;
        private AddDataCallback callback;
        private Object ctx;
        private long addedTime;
        private ByteBuf byteBuf;

        private static AsyncAddArgs newInstance(AddDataCallback addDataCallback, Object obj, long j) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs) ASYNC_ADD_ARGS_RECYCLER.get();
            asyncAddArgs.callback = addDataCallback;
            asyncAddArgs.ctx = obj;
            asyncAddArgs.addedTime = j;
            return asyncAddArgs;
        }

        private static AsyncAddArgs newInstance(AddDataCallback addDataCallback, Object obj, long j, ByteBuf byteBuf) {
            AsyncAddArgs newInstance = newInstance(addDataCallback, obj, j);
            newInstance.byteBuf = byteBuf;
            return newInstance;
        }

        private AsyncAddArgs(Recycler.Handle<AsyncAddArgs> handle) {
            this.handle = handle;
        }

        public void recycle() {
            this.callback = null;
            this.ctx = null;
            this.addedTime = 0L;
            if (this.byteBuf != null) {
                this.byteBuf.release();
                this.byteBuf = null;
            }
            this.handle.recycle(this);
        }

        @Generated
        public String toString() {
            Recycler.Handle<AsyncAddArgs> handle = this.handle;
            AddDataCallback callback = getCallback();
            Object ctx = getCtx();
            long addedTime = getAddedTime();
            ByteBuf byteBuf = this.byteBuf;
            return "TxnLogBufferedWriter.AsyncAddArgs(handle=" + handle + ", callback=" + callback + ", ctx=" + ctx + ", addedTime=" + addedTime + ", byteBuf=" + handle + ")";
        }

        @Generated
        public AddDataCallback getCallback() {
            return this.callback;
        }

        @Generated
        public Object getCtx() {
            return this.ctx;
        }

        @Generated
        public long getAddedTime() {
            return this.addedTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$BookKeeperBatchedWriteCallback.class */
    public class BookKeeperBatchedWriteCallback implements AsyncCallbacks.AddEntryCallback {
        private BookKeeperBatchedWriteCallback() {
        }

        public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
            FlushContext flushContext = (FlushContext) obj;
            try {
                try {
                    int size = flushContext.asyncAddArgsList.size();
                    for (int i = 0; i < size; i++) {
                        AsyncAddArgs asyncAddArgs = flushContext.asyncAddArgsList.get(i);
                        try {
                            asyncAddArgs.callback.addComplete(new TxnBatchedPositionImpl(position, size, i), asyncAddArgs.ctx);
                        } catch (Exception e) {
                            TxnLogBufferedWriter.log.error("After writing to the transaction batched log complete, the callback failed. managedLedger: " + TxnLogBufferedWriter.this.managedLedger.getName(), e);
                        }
                    }
                } catch (Exception e2) {
                    TxnLogBufferedWriter.log.error("Handle callback fail after ML write complete", e2);
                    flushContext.recycle();
                }
            } finally {
                flushContext.recycle();
            }
        }

        public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
            try {
                TxnLogBufferedWriter.this.failureCallbackByContextAndRecycle((FlushContext) obj, managedLedgerException);
            } catch (Exception e) {
                TxnLogBufferedWriter.log.error("Handle callback fail after ML write fail", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$DataSerializer.class */
    public interface DataSerializer<T> {
        int getSerializedSize(T t);

        ByteBuf serialize(T t);

        ByteBuf serialize(ArrayList<T> arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$DisabledBatchCallback.class */
    public static class DisabledBatchCallback implements AsyncCallbacks.AddEntryCallback {
        private static final DisabledBatchCallback INSTANCE = new DisabledBatchCallback();

        private DisabledBatchCallback() {
        }

        public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs) obj;
            try {
                asyncAddArgs.callback.addComplete(position, asyncAddArgs.ctx);
                asyncAddArgs.recycle();
            } catch (Throwable th) {
                asyncAddArgs.recycle();
                throw th;
            }
        }

        public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs) obj;
            try {
                asyncAddArgs.callback.addFailed(managedLedgerException, asyncAddArgs.ctx);
                asyncAddArgs.recycle();
            } catch (Throwable th) {
                asyncAddArgs.recycle();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$FlushContext.class */
    public static class FlushContext {
        private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER = new Recycler<FlushContext>() { // from class: org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.FlushContext.1
            protected FlushContext newObject(Recycler.Handle<FlushContext> handle) {
                return new FlushContext(handle);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m19newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<FlushContext>) handle);
            }
        };
        private final Recycler.Handle<FlushContext> handle;
        private final ArrayList<AsyncAddArgs> asyncAddArgsList = new ArrayList<>(8);
        private ByteBuf byteBuf;

        private FlushContext(Recycler.Handle<FlushContext> handle) {
            this.handle = handle;
        }

        private static FlushContext newInstance() {
            return (FlushContext) FLUSH_CONTEXT_RECYCLER.get();
        }

        public void recycle() {
            Iterator<AsyncAddArgs> it = this.asyncAddArgsList.iterator();
            while (it.hasNext()) {
                it.next().recycle();
            }
            if (this.byteBuf != null) {
                this.byteBuf.release();
                this.byteBuf = null;
            }
            this.asyncAddArgsList.clear();
            this.handle.recycle(this);
        }

        public void addCallback(AddDataCallback addDataCallback, Object obj) {
            this.asyncAddArgsList.add(AsyncAddArgs.newInstance(addDataCallback, obj, System.currentTimeMillis()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter$State.class */
    public enum State {
        OPEN,
        CLOSING,
        CLOSED
    }

    public TxnLogBufferedWriter(ManagedLedger managedLedger, Executor executor, Timer timer, DataSerializer<T> dataSerializer, int i, int i2, int i3, boolean z, TxnLogBufferedWriterMetricsStats txnLogBufferedWriterMetricsStats) {
        if (i <= 1 && z) {
            if (txnLogBufferedWriterMetricsStats != null) {
                log.warn("Transaction Log Buffered Writer with the metrics name beginning with {} has batching enabled yet the maximum batch size was configured to less than or equal to 1 record, hence due to performance reasons batching is disabled", txnLogBufferedWriterMetricsStats.getMetricsPrefix());
            } else {
                log.warn("Transaction Log Buffered Writer has batching enabled yet the maximum batch size was configured to less than or equal to 1 record, hence due to performance reasons batching is disabled");
            }
        }
        this.batchEnabled = z && i > 1;
        this.managedLedger = managedLedger;
        this.singleThreadExecutorForWrite = executor;
        this.dataSerializer = dataSerializer;
        this.batchedWriteMaxRecords = i;
        this.batchedWriteMaxSize = i2;
        this.batchedWriteMaxDelayInMillis = i3;
        this.flushContext = FlushContext.newInstance();
        this.dataArray = new ArrayList<>();
        STATE_UPDATER.set(this, State.OPEN);
        if (txnLogBufferedWriterMetricsStats == null) {
            throw new IllegalArgumentException("Build TxnLogBufferedWriter error: param metrics can not be null");
        }
        this.metrics = txnLogBufferedWriterMetricsStats;
        this.timer = timer;
        if (this.batchEnabled) {
            nextTimingTrigger();
        }
    }

    private void nextTimingTrigger() {
        try {
            if (this.state == State.CLOSED || this.state == State.CLOSING) {
                return;
            }
            this.timeout = this.timer.newTimeout(this.timingFlushTask, this.batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("Start timing flush trigger failed. managedLedger: " + this.managedLedger.getName(), e);
        }
    }

    public void asyncAddData(T t, AddDataCallback addDataCallback, Object obj) {
        if (this.batchEnabled) {
            CompletableFuture.runAsync(() -> {
                internalAsyncAddData(t, addDataCallback, obj);
            }, this.singleThreadExecutorForWrite).exceptionally(th -> {
                log.warn("Execute 'internalAsyncAddData' fail", th);
                return null;
            });
        } else if (this.state == State.CLOSING || this.state == State.CLOSED) {
            addDataCallback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, obj);
        } else {
            ByteBuf serialize = this.dataSerializer.serialize((DataSerializer<T>) t);
            this.managedLedger.asyncAddEntry(serialize, DisabledBatchCallback.INSTANCE, AsyncAddArgs.newInstance(addDataCallback, obj, System.currentTimeMillis(), serialize));
        }
    }

    private void internalAsyncAddData(T t, AddDataCallback addDataCallback, Object obj) {
        if (this.state == State.CLOSING || this.state == State.CLOSED) {
            addDataCallback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, obj);
            return;
        }
        try {
            int serializedSize = this.dataSerializer.getSerializedSize(t);
            if (serializedSize >= this.batchedWriteMaxSize) {
                trigFlushByLargeSingleData();
                try {
                    ByteBuf serialize = this.dataSerializer.serialize((DataSerializer<T>) t);
                    this.managedLedger.asyncAddEntry(serialize, DisabledBatchCallback.INSTANCE, AsyncAddArgs.newInstance(addDataCallback, obj, System.currentTimeMillis(), serialize));
                    return;
                } catch (Exception e) {
                    addDataCallback.addFailed(new ManagedLedgerException.ManagedLedgerInterceptException(e), obj);
                    return;
                }
            }
            try {
                this.flushContext.addCallback(addDataCallback, obj);
                this.dataArray.add(t);
                this.bytesSize += serializedSize;
                trigFlushIfReachMaxRecordsOrMaxSize();
            } catch (Exception e2) {
                addDataCallback.addFailed(new ManagedLedgerException.ManagedLedgerInterceptException(e2), obj);
            }
        } catch (Exception e3) {
            addDataCallback.addFailed(new ManagedLedgerException.ManagedLedgerInterceptException(e3), obj);
        }
    }

    private void trigFlushByTimingTask() {
        CompletableFuture.runAsync(() -> {
            if (this.flushContext.asyncAddArgsList.isEmpty()) {
                return;
            }
            this.metrics.triggerFlushByByMaxDelay(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get(0).addedTime);
            doFlush();
        }, this.singleThreadExecutorForWrite).whenComplete((r5, th) -> {
            if (th != null) {
                log.warn("Execute 'trigFlushByTimingTask' fail", th);
            }
            nextTimingTrigger();
        });
    }

    private void trigFlushIfReachMaxRecordsOrMaxSize() {
        if (this.flushContext.asyncAddArgsList.size() >= this.batchedWriteMaxRecords) {
            this.metrics.triggerFlushByRecordsCount(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get(0).addedTime);
            doFlush();
        } else if (this.bytesSize >= this.batchedWriteMaxSize) {
            this.metrics.triggerFlushByBytesSize(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get(0).addedTime);
            doFlush();
        }
    }

    private void trigFlushByLargeSingleData() {
        if (this.flushContext.asyncAddArgsList.isEmpty()) {
            return;
        }
        this.metrics.triggerFlushByLargeSingleData(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get(0).addedTime);
        doFlush();
    }

    private void doFlush() {
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(4);
        buffer.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
        buffer.writeShort(1);
        ByteBuf wrappedUnmodifiableBuffer = Unpooled.wrappedUnmodifiableBuffer(new ByteBuf[]{buffer, this.dataSerializer.serialize((ArrayList) this.dataArray)});
        this.flushContext.byteBuf = wrappedUnmodifiableBuffer;
        if (State.CLOSING == this.state || State.CLOSED == this.state) {
            failureCallbackByContextAndRecycle(this.flushContext, BUFFERED_WRITER_CLOSED_EXCEPTION);
        } else {
            this.managedLedger.asyncAddEntry(wrappedUnmodifiableBuffer, this.bookKeeperBatchedWriteCallback, this.flushContext);
        }
        this.dataArray.clear();
        this.flushContext = FlushContext.newInstance();
        this.bytesSize = 0L;
    }

    public CompletableFuture<Void> close() {
        if (!this.batchEnabled) {
            STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
            return CompletableFuture.completedFuture(null);
        }
        if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        FutureUtil.safeRunAsync(() -> {
            failureCallbackByContextAndRecycle(this.flushContext, new ManagedLedgerException.ManagedLedgerFencedException(new Exception("Transaction log buffered write has closed")));
            if (!this.timeout.isCancelled()) {
                this.timeout.cancel();
            }
            STATE_UPDATER.set(this, State.CLOSED);
            completableFuture.complete(null);
        }, this.singleThreadExecutorForWrite, completableFuture);
        return completableFuture;
    }

    private void failureCallbackByContextAndRecycle(FlushContext flushContext, ManagedLedgerException managedLedgerException) {
        if (flushContext == null) {
            return;
        }
        try {
            if (flushContext.asyncAddArgsList != null) {
                Iterator<AsyncAddArgs> it = flushContext.asyncAddArgsList.iterator();
                while (it.hasNext()) {
                    failureCallbackByArgs(it.next(), managedLedgerException, false);
                }
            }
        } finally {
            flushContext.recycle();
        }
    }

    private void failureCallbackByArgs(AsyncAddArgs asyncAddArgs, ManagedLedgerException managedLedgerException, boolean z) {
        if (asyncAddArgs == null) {
            return;
        }
        try {
            try {
                asyncAddArgs.callback.addFailed(managedLedgerException, asyncAddArgs.ctx);
                if (z) {
                    asyncAddArgs.recycle();
                }
            } catch (Exception e) {
                log.error("After writing to the transaction batched log failure, the callback executed also failed. managedLedger: " + this.managedLedger.getName(), e);
                if (z) {
                    asyncAddArgs.recycle();
                }
            }
        } catch (Throwable th) {
            if (z) {
                asyncAddArgs.recycle();
            }
            throw th;
        }
    }

    public TxnLogBufferedWriterMetricsStats getMetrics() {
        return this.metrics;
    }
}
