package org.apache.kafka.connect.file.integration;

import java.io.File;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.class */
public class FileStreamSourceConnectorIntegrationTest {
    private static final String CONNECTOR_NAME = "test-connector";
    private static final String TOPIC = "test-topic";
    private static final String LINE_FORMAT = "Line %d";
    private static final int NUM_LINES = 5;
    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
    private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build();
    private File sourceFile;

    @BeforeEach
    public void setup() throws Exception {
        this.connect.start();
        this.sourceFile = createTempFile(NUM_LINES);
        this.connect.kafka().createTopic(TOPIC);
    }

    @AfterEach
    public void tearDown() {
        this.connect.stop();
    }

    @Test
    public void testSimpleSource() throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(this.sourceFile.getAbsolutePath()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        int i = 0;
        Iterator it = this.connect.kafka().consume(NUM_LINES, TIMEOUT_MS, new String[]{TOPIC}).iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assertions.assertEquals(String.format(LINE_FORMAT, Integer.valueOf(i2)), new String((byte[]) ((ConsumerRecord) it.next()).value()));
        }
    }

    @Test
    public void testStopResumeSavedOffset() throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(this.sourceFile.getAbsolutePath()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        this.connect.kafka().consume(NUM_LINES, TIMEOUT_MS, new String[]{TOPIC});
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        PrintStream printStream = new PrintStream(Files.newOutputStream(this.sourceFile.toPath(), StandardOpenOption.APPEND));
        Throwable th = null;
        try {
            for (int i = NUM_LINES; i < 10; i++) {
                printStream.println(String.format(LINE_FORMAT, Integer.valueOf(i)));
            }
            this.connect.resumeConnector(CONNECTOR_NAME);
            this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not resume in time");
            int i2 = 0;
            Iterator it = this.connect.kafka().consume(10, TIMEOUT_MS, new String[]{TOPIC}).iterator();
            while (it.hasNext()) {
                int i3 = i2;
                i2++;
                Assertions.assertEquals(String.format(LINE_FORMAT, Integer.valueOf(i3)), new String((byte[]) ((ConsumerRecord) it.next()).value()));
            }
            Assertions.assertEquals(10, this.connect.kafka().consumeAll(TIMEOUT_MS, new String[]{TOPIC}).count());
        } finally {
            if (printStream != null) {
                if (0 != 0) {
                    try {
                        printStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    printStream.close();
                }
            }
        }
    }

    @Test
    public void testAlterOffsets() throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(this.sourceFile.getAbsolutePath()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        this.connect.kafka().consume(NUM_LINES, TIMEOUT_MS, new String[]{TOPIC});
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        this.connect.alterSourceConnectorOffset(CONNECTOR_NAME, Collections.singletonMap("filename", this.sourceFile.getAbsolutePath()), Collections.singletonMap("position", 28L));
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not resume in time");
        Iterator it = this.connect.kafka().consume(6, TIMEOUT_MS, new String[]{TOPIC}).iterator();
        for (int i = 0; i < NUM_LINES; i++) {
            Assertions.assertEquals(String.format(LINE_FORMAT, Integer.valueOf(i)), new String((byte[]) ((ConsumerRecord) it.next()).value()));
        }
        Assertions.assertEquals(String.format(LINE_FORMAT, 4), new String((byte[]) ((ConsumerRecord) it.next()).value()));
    }

    @Test
    public void testResetOffsets() throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(this.sourceFile.getAbsolutePath()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        this.connect.kafka().consume(NUM_LINES, TIMEOUT_MS, new String[]{TOPIC});
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        this.connect.resetConnectorOffsets(CONNECTOR_NAME);
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not resume in time");
        Iterator it = this.connect.kafka().consume(10, TIMEOUT_MS, new String[]{TOPIC}).iterator();
        int i = 0;
        while (i < NUM_LINES) {
            int i2 = i;
            i++;
            Assertions.assertEquals(String.format(LINE_FORMAT, Integer.valueOf(i2)), new String((byte[]) ((ConsumerRecord) it.next()).value()));
        }
        while (i < 10) {
            Assertions.assertEquals(String.format(LINE_FORMAT, Integer.valueOf(i - NUM_LINES)), new String((byte[]) ((ConsumerRecord) it.next()).value()));
            i++;
        }
        Assertions.assertEquals(10, this.connect.kafka().consumeAll(TIMEOUT_MS, new String[]{TOPIC}).count());
    }

    private File createTempFile(int i) throws Exception {
        File tempFile = TestUtils.tempFile();
        PrintStream printStream = new PrintStream(Files.newOutputStream(tempFile.toPath(), new OpenOption[0]));
        Throwable th = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                try {
                    printStream.println(String.format(LINE_FORMAT, Integer.valueOf(i2)));
                } finally {
                }
            } catch (Throwable th2) {
                if (printStream != null) {
                    if (th != null) {
                        try {
                            printStream.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        printStream.close();
                    }
                }
                throw th2;
            }
        }
        if (printStream != null) {
            if (0 != 0) {
                try {
                    printStream.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                printStream.close();
            }
        }
        return tempFile;
    }

    private Map<String, String> baseConnectorConfigs(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", FileStreamSourceConnector.class.getName());
        hashMap.put("topic", TOPIC);
        hashMap.put("file", str);
        return hashMap;
    }
}
