package org.apache.pulsar.io.file;

import java.io.File;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;

/* loaded from: input_file:org/apache/pulsar/io/file/FileSource.class */
public class FileSource extends PushSource<byte[]> {
    private ExecutorService executor;
    private final BlockingQueue<File> workQueue = new LinkedBlockingQueue();
    private final BlockingQueue<File> inProcess = new LinkedBlockingQueue();
    private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue();

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        FileSourceConfig load = FileSourceConfig.load(map);
        load.validate();
        this.executor = Executors.newFixedThreadPool(load.getNumWorkers().intValue() + 2);
        this.executor.execute(new FileListingThread(load, this.workQueue, this.inProcess, this.recentlyProcessed));
        this.executor.execute(new ProcessedFileThread(load, this.recentlyProcessed));
        for (int i = 0; i < load.getNumWorkers().intValue(); i++) {
            this.executor.execute(new FileConsumerThread(this, this.workQueue, this.inProcess, this.recentlyProcessed));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(800L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                this.executor.shutdownNow();
            }
        }
    }
}
