package org.jtrim2.concurrent.query.io;

import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.InterruptibleChannel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jtrim2.cancel.CancellationToken;
import org.jtrim2.concurrent.query.AsyncDataController;
import org.jtrim2.concurrent.query.AsyncDataLink;
import org.jtrim2.concurrent.query.AsyncDataListener;
import org.jtrim2.concurrent.query.AsyncDataState;
import org.jtrim2.concurrent.query.AsyncHelper;
import org.jtrim2.concurrent.query.AsyncReport;
import org.jtrim2.concurrent.query.DoNothingDataController;
import org.jtrim2.concurrent.query.SimpleDataState;
import org.jtrim2.concurrent.query.io.ChannelProcessor;
import org.jtrim2.event.ListenerRef;
import org.jtrim2.executor.CancelableTask;
import org.jtrim2.executor.TaskExecutor;

/* loaded from: input_file:org/jtrim2/concurrent/query/io/AsyncChannelLink.class */
public final class AsyncChannelLink<DataType> implements AsyncDataLink<DataType> {
    private static final Logger LOGGER = Logger.getLogger(AsyncChannelLink.class.getName());
    private final CheckedAsyncChannelLink<? extends DataType, ?> impl;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jtrim2/concurrent/query/io/AsyncChannelLink$ChannelProcessorTask.class */
    public static class ChannelProcessorTask<DataType, ChannelType extends Channel> implements CancelableTask {
        private final ChannelOpener<? extends ChannelType> channelOpener;
        private final ChannelProcessor<DataType, ChannelType> channelProcessor;
        private final AsyncDataListener<DataType> safeListener;
        private final SimpleStateListener stateListener;

        public ChannelProcessorTask(ChannelOpener<? extends ChannelType> channelOpener, ChannelProcessor<DataType, ChannelType> channelProcessor, AsyncDataListener<DataType> asyncDataListener, SimpleStateListener simpleStateListener) {
            this.channelOpener = channelOpener;
            this.channelProcessor = channelProcessor;
            this.safeListener = asyncDataListener;
            this.stateListener = simpleStateListener;
        }

        public void execute(CancellationToken cancellationToken) {
            boolean z = false;
            try {
                ChannelType openChanel = this.channelOpener.openChanel();
                try {
                    this.stateListener.setChannel(openChanel);
                    if (cancellationToken.isCanceled()) {
                        z = true;
                    } else {
                        this.channelProcessor.processChannel(openChanel, this.safeListener, this.stateListener);
                    }
                    if (openChanel != null) {
                        openChanel.close();
                    }
                    this.stateListener.setChannel(null);
                    this.safeListener.onDoneReceive(AsyncReport.getReport(null, z));
                } catch (Throwable th) {
                    if (openChanel != null) {
                        try {
                            openChanel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (ClosedChannelException e) {
                this.stateListener.setChannel(null);
                this.safeListener.onDoneReceive(AsyncReport.getReport(null, true));
            } catch (Throwable th3) {
                this.stateListener.setChannel(null);
                this.safeListener.onDoneReceive(AsyncReport.getReport(null, false));
                throw th3;
            }
        }
    }

    /* loaded from: input_file:org/jtrim2/concurrent/query/io/AsyncChannelLink$CheckedAsyncChannelLink.class */
    private static class CheckedAsyncChannelLink<DataType, ChannelType extends Channel> {
        private final TaskExecutor processorExecutor;
        private final TaskExecutor cancelExecutor;
        private final ChannelOpener<? extends ChannelType> channelOpener;
        private final ChannelProcessor<DataType, ChannelType> channelProcessor;

        public CheckedAsyncChannelLink(TaskExecutor taskExecutor, TaskExecutor taskExecutor2, ChannelOpener<? extends ChannelType> channelOpener, ChannelProcessor<DataType, ChannelType> channelProcessor) {
            Objects.requireNonNull(taskExecutor, "processorExecutor");
            Objects.requireNonNull(taskExecutor2, "cancelExecutor");
            Objects.requireNonNull(channelOpener, "channelOpener");
            Objects.requireNonNull(channelProcessor, "channelProcessor");
            this.processorExecutor = taskExecutor;
            this.cancelExecutor = taskExecutor2;
            this.channelOpener = channelOpener;
            this.channelProcessor = channelProcessor;
        }

        public AsyncDataController getData(CancellationToken cancellationToken, AsyncDataListener<? super DataType> asyncDataListener) {
            if (cancellationToken.isCanceled()) {
                asyncDataListener.onDoneReceive(AsyncReport.CANCELED);
                return DoNothingDataController.INSTANCE;
            }
            AsyncDataListener makeSafeListener = AsyncHelper.makeSafeListener(asyncDataListener);
            final SimpleStateListener simpleStateListener = new SimpleStateListener(this.cancelExecutor);
            ChannelProcessorTask channelProcessorTask = new ChannelProcessorTask(this.channelOpener, this.channelProcessor, makeSafeListener, simpleStateListener);
            Objects.requireNonNull(simpleStateListener);
            ListenerRef addCancellationListener = cancellationToken.addCancellationListener(simpleStateListener::cancel);
            this.processorExecutor.execute(cancellationToken, channelProcessorTask).whenComplete((r5, th) -> {
                try {
                    makeSafeListener.onDoneReceive(AsyncReport.CANCELED);
                    addCancellationListener.unregister();
                } catch (Throwable th) {
                    addCancellationListener.unregister();
                    throw th;
                }
            });
            return new AsyncDataController() { // from class: org.jtrim2.concurrent.query.io.AsyncChannelLink.CheckedAsyncChannelLink.1
                @Override // org.jtrim2.concurrent.query.AsyncDataController
                public void controlData(Object obj) {
                }

                @Override // org.jtrim2.concurrent.query.AsyncDataController
                public AsyncDataState getDataState() {
                    return simpleStateListener.getState();
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jtrim2/concurrent/query/io/AsyncChannelLink$SimpleStateListener.class */
    public static class SimpleStateListener implements ChannelProcessor.StateListener {
        private final TaskExecutor cancelExecutor;
        private volatile AsyncDataState currentState = new SimpleDataState("Task was submitted to process the channel.", 0.0d);
        private volatile Channel channelToCancel = null;
        private final AtomicBoolean canceledChannel = new AtomicBoolean(false);
        private volatile boolean canceled = false;

        public SimpleStateListener(TaskExecutor taskExecutor) {
            this.cancelExecutor = taskExecutor;
        }

        public void setChannel(Channel channel) {
            this.channelToCancel = channel instanceof InterruptibleChannel ? channel : null;
            if (this.canceled) {
                tryCancelCurrentChannel();
            }
        }

        private void closeCurrentChannel() {
            Channel channel = this.channelToCancel;
            if (channel != null) {
                try {
                    channel.close();
                } catch (Throwable th) {
                    if (AsyncChannelLink.LOGGER.isLoggable(Level.WARNING)) {
                        AsyncChannelLink.LOGGER.log(Level.WARNING, "Closing the channel to cancel has failed: " + channel, th);
                    }
                }
            }
        }

        private void tryCancelCurrentChannel() {
            if (this.channelToCancel == null || this.canceledChannel.getAndSet(true)) {
                return;
            }
            this.cancelExecutor.execute(this::closeCurrentChannel);
        }

        public void cancel() {
            this.canceled = true;
            tryCancelCurrentChannel();
        }

        @Override // org.jtrim2.concurrent.query.io.ChannelProcessor.StateListener
        public void setState(AsyncDataState asyncDataState) {
            this.currentState = asyncDataState;
        }

        public AsyncDataState getState() {
            return this.currentState;
        }
    }

    public <ChannelType extends Channel> AsyncChannelLink(TaskExecutor taskExecutor, TaskExecutor taskExecutor2, ChannelOpener<? extends ChannelType> channelOpener, ChannelProcessor<? extends DataType, ChannelType> channelProcessor) {
        this.impl = new CheckedAsyncChannelLink<>(taskExecutor, taskExecutor2, channelOpener, channelProcessor);
    }

    @Override // org.jtrim2.concurrent.query.AsyncDataLink
    public AsyncDataController getData(CancellationToken cancellationToken, AsyncDataListener<? super DataType> asyncDataListener) {
        return this.impl.getData(cancellationToken, asyncDataListener);
    }
}
