package org.jtrim2.concurrent.query;

import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jtrim2.executor.ContextAwareTaskExecutor;
import org.jtrim2.executor.TaskExecutors;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jtrim2/concurrent/query/SafeDataListener.class */
public final class SafeDataListener<DataType> implements AsyncDataListener<OrderedData<DataType>>, PossiblySafeListener {
    private static final int EXPECTED_MAX_TO_STRING_LENGTH = 256;
    private final AsyncDataListener<? super DataType> wrappedListener;
    private final ContextAwareTaskExecutor eventScheduler;
    private final Runnable dataForwardTask;
    private final Lock dataLock;
    private OrderedData<DataType> lastUnsent;
    private boolean done;
    private boolean forwardedData;
    private long lastSentIndex;

    /* loaded from: input_file:org/jtrim2/concurrent/query/SafeDataListener$DataForwardTask.class */
    private class DataForwardTask implements Runnable {
        static final /* synthetic */ boolean $assertionsDisabled;

        private DataForwardTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!$assertionsDisabled && !SafeDataListener.this.eventScheduler.isExecutingInThis()) {
                throw new AssertionError();
            }
            OrderedData<DataType> pollData = SafeDataListener.this.pollData();
            if (pollData == null) {
                return;
            }
            if ((!SafeDataListener.this.forwardedData || pollData.getIndex() > SafeDataListener.this.lastSentIndex) && !SafeDataListener.this.done) {
                SafeDataListener.this.lastSentIndex = pollData.getIndex();
                SafeDataListener.this.forwardedData = true;
                SafeDataListener.this.wrappedListener.onDataArrive(pollData.getRawData());
            }
        }

        static {
            $assertionsDisabled = !SafeDataListener.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/jtrim2/concurrent/query/SafeDataListener$DoneForwardTask.class */
    private class DoneForwardTask implements Runnable {
        private final AsyncReport report;
        static final /* synthetic */ boolean $assertionsDisabled;

        public DoneForwardTask(AsyncReport asyncReport) {
            this.report = asyncReport;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!$assertionsDisabled && !SafeDataListener.this.eventScheduler.isExecutingInThis()) {
                throw new AssertionError();
            }
            if (SafeDataListener.this.done) {
                return;
            }
            SafeDataListener.this.done = true;
            SafeDataListener.this.wrappedListener.onDoneReceive(this.report);
        }

        static {
            $assertionsDisabled = !SafeDataListener.class.desiredAssertionStatus();
        }
    }

    public SafeDataListener(AsyncDataListener<? super DataType> asyncDataListener) {
        Objects.requireNonNull(asyncDataListener, "wrappedListener");
        this.wrappedListener = asyncDataListener;
        this.eventScheduler = TaskExecutors.inOrderSyncExecutor();
        this.dataLock = new ReentrantLock();
        this.lastUnsent = null;
        this.lastSentIndex = Long.MAX_VALUE;
        this.forwardedData = false;
        this.done = false;
        this.dataForwardTask = new DataForwardTask();
    }

    @Override // org.jtrim2.concurrent.query.PossiblySafeListener
    public boolean isSafeListener() {
        return true;
    }

    private void storeData(OrderedData<DataType> orderedData) {
        Objects.requireNonNull(orderedData, "data");
        this.dataLock.lock();
        try {
            if (this.lastUnsent == null || this.lastUnsent.getIndex() < orderedData.getIndex()) {
                this.lastUnsent = orderedData;
            }
        } finally {
            this.dataLock.unlock();
        }
    }

    private OrderedData<DataType> pollData() {
        this.dataLock.lock();
        try {
            OrderedData<DataType> orderedData = this.lastUnsent;
            this.lastUnsent = null;
            return orderedData;
        } finally {
            this.dataLock.unlock();
        }
    }

    private void submitEventTask(Runnable runnable) {
        this.eventScheduler.execute(runnable);
    }

    @Override // org.jtrim2.concurrent.query.AsyncDataListener
    public void onDataArrive(OrderedData<DataType> orderedData) {
        storeData(orderedData);
        submitEventTask(this.dataForwardTask);
    }

    @Override // org.jtrim2.concurrent.query.AsyncDataListener
    public void onDoneReceive(AsyncReport asyncReport) {
        submitEventTask(new DoneForwardTask(asyncReport));
    }

    public String toString() {
        StringBuilder sb = new StringBuilder(EXPECTED_MAX_TO_STRING_LENGTH);
        sb.append("MakeSafe (");
        AsyncFormatHelper.appendIndented(this.wrappedListener, sb);
        sb.append(")");
        return sb.toString();
    }
}
