package org.apache.pulsar.io.file;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.file.utils.GZipFiles;
import org.apache.pulsar.io.file.utils.ZipFiles;

/* loaded from: input_file:org/apache/pulsar/io/file/FileConsumerThread.class */
public class FileConsumerThread extends Thread {
    private final PushSource<byte[]> source;
    private final BlockingQueue<File> workQueue;
    private final BlockingQueue<File> inProcess;
    private final BlockingQueue<File> recentlyProcessed;

    public FileConsumerThread(PushSource<byte[]> pushSource, BlockingQueue<File> blockingQueue, BlockingQueue<File> blockingQueue2, BlockingQueue<File> blockingQueue3) {
        this.source = pushSource;
        this.workQueue = blockingQueue;
        this.inProcess = blockingQueue2;
        this.recentlyProcessed = blockingQueue3;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                File take = this.workQueue.take();
                do {
                } while (!this.inProcess.add(take));
                consumeFile(take);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private void consumeFile(File file) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        try {
            try {
                Stream<String> lines = getLines(file);
                Throwable th = null;
                try {
                    lines.forEachOrdered(str -> {
                        process(file, atomicInteger.getAndIncrement(), str);
                    });
                    if (lines != null) {
                        if (0 != 0) {
                            try {
                                lines.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lines.close();
                        }
                    }
                    do {
                    } while (!this.inProcess.remove(file));
                    do {
                    } while (!this.recentlyProcessed.add(file));
                } catch (Throwable th3) {
                    if (lines != null) {
                        if (0 != 0) {
                            try {
                                lines.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            lines.close();
                        }
                    }
                    throw th3;
                }
            } catch (IOException e) {
                e.printStackTrace();
                do {
                } while (!this.inProcess.remove(file));
                do {
                } while (!this.recentlyProcessed.add(file));
            }
        } catch (Throwable th5) {
            do {
            } while (!this.inProcess.remove(file));
            do {
            } while (!this.recentlyProcessed.add(file));
            throw th5;
        }
    }

    private Stream<String> getLines(File file) throws IOException {
        if (file == null) {
            return null;
        }
        return GZipFiles.isGzip(file) ? GZipFiles.lines(Paths.get(file.getAbsolutePath(), new String[0])) : ZipFiles.isZip(file) ? ZipFiles.lines(Paths.get(file.getAbsolutePath(), new String[0])) : Files.lines(Paths.get(file.getAbsolutePath(), new String[0]), Charset.defaultCharset());
    }

    private void process(File file, int i, String str) {
        this.source.consume(new FileRecord(file, i, str.getBytes()));
    }
}
