package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.class */
public class DefaultFileRecordsPollingConsumer implements FileRecordsPollingConsumer<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileRecordsPollingConsumer.class);
    private final boolean ignoreCommittedOffsets;
    private final FileInputReader reader;
    private final RecordFilterPipeline<FileRecord<TypedStruct>> pipeline;
    private final SourceOffsetPolicy offsetPolicy;
    private StateListener listener;
    private final SourceTaskContext taskContext;
    private FileRecord<TypedStruct> latestPolledRecord;
    private FileInputIterator<FileRecord<TypedStruct>> currentIterator;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Queue<DelegateFileInputIterator> queue = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultFileRecordsPollingConsumer(SourceTaskContext sourceTaskContext, FileInputReader fileInputReader, RecordFilterPipeline<FileRecord<TypedStruct>> recordFilterPipeline, SourceOffsetPolicy sourceOffsetPolicy, boolean z) {
        this.ignoreCommittedOffsets = z;
        this.reader = fileInputReader;
        this.pipeline = recordFilterPipeline;
        this.offsetPolicy = sourceOffsetPolicy;
        this.taskContext = sourceTaskContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addAll(List<URI> list) {
        if (isClosed()) {
            throw new IllegalStateException("Can't add new input files, consumer is closed");
        }
        ArrayList arrayList = new ArrayList(list.size());
        for (URI uri : list) {
            if (this.reader.canBeRead(uri)) {
                try {
                    FileObjectMeta objectMetadata = this.reader.getObjectMetadata(uri);
                    FileObjectKey of = FileObjectKey.of(this.offsetPolicy.toPartitionJson(objectMetadata));
                    arrayList.add(new DelegateFileInputIterator(of, uri, this.reader));
                    if (hasListener()) {
                        this.listener.onScheduled(new FileObjectContext(of, objectMetadata));
                    }
                } catch (Exception e) {
                    throw new ConnectFilePulseException("Failed to compute object-file key while initializing processing for '" + uri + "'.  Connector must be restated.", e);
                }
            } else {
                try {
                    GenericFileObjectMeta genericFileObjectMeta = new GenericFileObjectMeta(uri);
                    FileObjectKey of2 = FileObjectKey.of(this.offsetPolicy.toPartitionJson(genericFileObjectMeta));
                    if (hasListener()) {
                        LOG.warn("Object-file does not exist or is not readable. Skip and continue '{}'", uri);
                        this.listener.onInvalid(new FileObjectContext(of2, genericFileObjectMeta));
                    }
                } catch (Exception e2) {
                    throw new ConnectFilePulseException("Failed to compute object-file key while initializing processing for '" + uri + "'. Connector must be restated.", e2);
                }
            }
        }
        this.queue.addAll(arrayList);
    }

    public FileObjectContext context() {
        if (this.currentIterator == null) {
            return null;
        }
        FileObjectContext context = this.currentIterator.context();
        if (this.latestPolledRecord != null) {
            context = new FileObjectContext(context.key(), context.metadata(), this.latestPolledRecord.offset().toSourceOffset());
        }
        return context;
    }

    public void seekTo(FileObjectOffset fileObjectOffset) {
        throw new UnsupportedOperationException();
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public RecordsIterable<FileRecord<TypedStruct>> m14next() {
        if (isClosed()) {
            throw new IllegalStateException("FileRecordsPollingConsumer is closed, no more element can be returned");
        }
        if (!hasNext()) {
            return RecordsIterable.empty();
        }
        this.currentIterator = findNextFileObjectIterator();
        if (this.currentIterator == null) {
            return RecordsIterable.empty();
        }
        try {
            try {
                RecordsIterable<FileRecord<TypedStruct>> apply = this.pipeline.apply(this.currentIterator.next(), this.currentIterator.hasNext());
                if (!apply.isEmpty()) {
                    this.latestPolledRecord = (FileRecord) apply.last();
                }
                return apply;
            } catch (Exception e) {
                throw new ConnectFilePulseException(e);
            } catch (ConnectFilePulseException e2) {
                throw e2;
            }
        } finally {
            if (0 != 0) {
                LOG.error("Stopped processing due to error during filter-chain execution for object-file: '{}'", this.currentIterator.context().metadata());
                closeIterator(this.currentIterator, null);
            }
        }
    }

    private FileInputIterator<FileRecord<TypedStruct>> findNextFileObjectIterator() {
        if (this.queue.isEmpty()) {
            return null;
        }
        FileInputIterator<FileRecord<TypedStruct>> fileInputIterator = null;
        do {
            DelegateFileInputIterator peek = this.queue.peek();
            GenericFileObjectMeta genericFileObjectMeta = new GenericFileObjectMeta(peek.getObjectURI());
            if (peek.isOpen()) {
                fileInputIterator = getOrCloseIteratorIfNoMoreRecord(peek);
            } else {
                try {
                    if (peek.isValid()) {
                        fileInputIterator = openAndGetIteratorOrNullIfCompleted(peek);
                        if (fileInputIterator == null) {
                            deleteFileQueueAndInvokeListener(new FileObjectContext(peek.key(), genericFileObjectMeta), null);
                        }
                    } else {
                        LOG.warn("Object-file does not exist or is not readable. Skip and continue '{}'", peek.getObjectURI());
                        this.queue.remove();
                        this.listener.onInvalid(new FileObjectContext(peek.key(), genericFileObjectMeta));
                    }
                } catch (Exception e) {
                    LOG.error("Failed to open and initialize new iterator for object-file: {}.", peek.getObjectURI());
                    deleteFileQueueAndInvokeListener(new FileObjectContext(peek.key(), genericFileObjectMeta), e);
                    throw e;
                }
            }
            if (!hasNext()) {
                break;
            }
        } while (fileInputIterator == null);
        return fileInputIterator;
    }

    public boolean hasNext() {
        return !this.queue.isEmpty();
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        while (true) {
            DelegateFileInputIterator poll = this.queue.poll();
            if (poll == null) {
                this.reader.close();
                return;
            }
            try {
                poll.close();
            } catch (Exception e) {
            }
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void setStateListener(StateListener stateListener) {
        this.listener = stateListener;
    }

    private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfCompleted(DelegateFileInputIterator delegateFileInputIterator) {
        FileObjectOffset empty;
        FileObjectMeta fileObjectMeta = null;
        try {
            fileObjectMeta = delegateFileInputIterator.getMetadata();
            empty = !this.ignoreCommittedOffsets ? (FileObjectOffset) this.offsetPolicy.getOffsetFor(this.taskContext, fileObjectMeta).orElse(FileObjectOffset.empty()) : FileObjectOffset.empty();
        } catch (Exception e) {
            if (0 == 0) {
                throw e;
            }
            LOG.warn("Failed to load committed offset for object file {}. Previous offset will be ignored. Error: {}", delegateFileInputIterator.getObjectURI(), e.getMessage());
            empty = FileObjectOffset.empty();
        }
        boolean z = empty.position() >= fileObjectMeta.contentLength().longValue();
        if (!this.ignoreCommittedOffsets && z) {
            LOG.warn("Detected object-file already completed. Skip entry and continue '{}'", delegateFileInputIterator.getObjectURI());
            return null;
        }
        delegateFileInputIterator.open();
        delegateFileInputIterator.seekTo(empty);
        this.pipeline.init(delegateFileInputIterator.context());
        if (hasListener()) {
            this.listener.onStart(delegateFileInputIterator.context());
        }
        return delegateFileInputIterator;
    }

    private FileInputIterator<FileRecord<TypedStruct>> getOrCloseIteratorIfNoMoreRecord(DelegateFileInputIterator delegateFileInputIterator) {
        if (delegateFileInputIterator.hasNext()) {
            return delegateFileInputIterator;
        }
        closeIterator(delegateFileInputIterator, null);
        return null;
    }

    public void closeCurrentIterator(Exception exc) {
        closeIterator(this.currentIterator, exc);
    }

    private void closeIterator(FileInputIterator<FileRecord<TypedStruct>> fileInputIterator, Exception exc) {
        FileObjectContext context = fileInputIterator.context();
        try {
            try {
                fileInputIterator.close();
                deleteFileQueueAndInvokeListener(context, exc);
            } catch (Exception e) {
                LOG.debug("Error while closing iterator for: '{}'", context.metadata(), e);
                deleteFileQueueAndInvokeListener(context, exc);
            }
        } catch (Throwable th) {
            deleteFileQueueAndInvokeListener(context, exc);
            throw th;
        }
    }

    private void deleteFileQueueAndInvokeListener(FileObjectContext fileObjectContext, Throwable th) {
        this.queue.remove();
        if (hasListener()) {
            if (th != null) {
                this.listener.onFailure(fileObjectContext, th);
            } else {
                this.listener.onCompleted(fileObjectContext);
            }
        }
    }

    private boolean hasListener() {
        return this.listener != null;
    }
}
