package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.tests.SmokeTestUtil;
import org.apache.kafka.test.TestUtils;

/* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestClient.class */
public class SmokeTestClient extends SmokeTestUtil {
    private final String name;
    private Thread thread;
    private KafkaStreams streams;
    private boolean uncaughtException = false;
    private boolean started;
    private boolean closed;

    public SmokeTestClient(String str) {
        this.name = str;
    }

    public boolean started() {
        return this.started;
    }

    public boolean closed() {
        return this.closed;
    }

    public void start(Properties properties) {
        this.streams = createKafkaStreams(properties);
        this.streams.setUncaughtExceptionHandler((thread, th) -> {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-EXCEPTION");
            this.uncaughtException = true;
            th.printStackTrace();
        });
        Exit.addShutdownHook("streams-shutdown-hook", () -> {
            close();
        });
        this.thread = new Thread(() -> {
            this.streams.start();
        });
        this.thread.start();
    }

    public void closeAsync() {
        this.streams.close(Duration.ZERO);
    }

    public void close() {
        this.streams.close(Duration.ofSeconds(5L));
        if (!this.uncaughtException) {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-CLOSED");
        }
        try {
            this.thread.join();
        } catch (Exception e) {
            System.out.println(this.name + ": SMOKE-TEST-CLIENT-EXCEPTION");
        }
    }

    private Properties getStreamsConfig(Properties properties) {
        Properties properties2 = new Properties(properties);
        properties2.put("application.id", "SmokeTest");
        properties2.put("client.id", "SmokeTest-" + this.name);
        properties2.put("num.stream.threads", 3);
        properties2.put("num.standby.replicas", 2);
        properties2.put("buffered.records.per.partition", 100);
        properties2.put("commit.interval.ms", 1000);
        properties2.put("replication.factor", 3);
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("acks", "all");
        properties2.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        properties2.putAll(properties);
        return properties2;
    }

    private KafkaStreams createKafkaStreams(Properties properties) {
        KafkaStreams kafkaStreams = new KafkaStreams(getTopology(), getStreamsConfig(properties));
        kafkaStreams.setStateListener((state, state2) -> {
            System.out.printf("%s %s: %s -> %s%n", this.name, Instant.now(), state2, state);
            if (state2 == KafkaStreams.State.REBALANCING && state == KafkaStreams.State.RUNNING) {
                this.started = true;
            }
            if (state == KafkaStreams.State.NOT_RUNNING) {
                this.closed = true;
            }
        });
        kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            System.out.println(this.name + ": FATAL: An unexpected exception is encountered on thread " + thread + ": " + th);
            kafkaStreams.close(Duration.ofSeconds(30L));
        });
        return kafkaStreams;
    }

    public Topology getTopology() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("data", Consumed.with(stringSerde, intSerde));
        stream.filterNot((str, num) -> {
            return str.equals("flush");
        }).to("echo", Produced.with(stringSerde, intSerde));
        KStream filter = stream.filter((str2, num2) -> {
            return num2 == null || num2.intValue() != Integer.MAX_VALUE;
        });
        filter.process(SmokeTestUtil.printProcessorSupplier("data", this.name), new String[0]);
        KGroupedStream groupByKey = filter.groupByKey(Grouped.with(stringSerde, intSerde));
        KTable aggregate = groupByKey.windowedBy(TimeWindows.of(Duration.ofDays(1L)).grace(Duration.ofMinutes(1L))).aggregate(() -> {
            return Integer.MAX_VALUE;
        }, (str3, num3, num4) -> {
            return num3.intValue() < num4.intValue() ? num3 : num4;
        }, Materialized.as("uwin-min").withValueSerde(intSerde).withRetention(Duration.ofHours(25L)));
        streamify(aggregate, "min-raw");
        streamify(aggregate.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())), "min-suppressed");
        aggregate.toStream(new SmokeTestUtil.Unwindow()).filterNot((str4, num5) -> {
            return str4.equals("flush");
        }).to("min", Produced.with(stringSerde, intSerde));
        KTable reduce = groupByKey.windowedBy(TimeWindows.of(Duration.ofSeconds(2L)).advanceBy(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(30L))).reduce((num6, num7) -> {
            return Integer.valueOf(num6.intValue() + num7.intValue());
        });
        streamify(reduce, "sws-raw");
        streamify(reduce.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())), "sws-suppressed");
        KTable table = streamsBuilder.table("min", Consumed.with(stringSerde, intSerde), Materialized.as("minStoreName"));
        table.toStream().process(SmokeTestUtil.printProcessorSupplier("min", this.name), new String[0]);
        groupByKey.windowedBy(TimeWindows.of(Duration.ofDays(2L))).aggregate(() -> {
            return Integer.MIN_VALUE;
        }, (str5, num8, num9) -> {
            return num8.intValue() > num9.intValue() ? num8 : num9;
        }, Materialized.as("uwin-max").withValueSerde(intSerde)).toStream(new SmokeTestUtil.Unwindow()).filterNot((str6, num10) -> {
            return str6.equals("flush");
        }).to("max", Produced.with(stringSerde, intSerde));
        KTable table2 = streamsBuilder.table("max", Consumed.with(stringSerde, intSerde), Materialized.as("maxStoreName"));
        table2.toStream().process(SmokeTestUtil.printProcessorSupplier("max", this.name), new String[0]);
        groupByKey.windowedBy(TimeWindows.of(Duration.ofDays(2L))).aggregate(() -> {
            return 0L;
        }, (str7, num11, l) -> {
            return Long.valueOf(num11.intValue() + l.longValue());
        }, Materialized.as("win-sum").withValueSerde(longSerde)).toStream(new SmokeTestUtil.Unwindow()).filterNot((str8, l2) -> {
            return str8.equals("flush");
        }).to("sum", Produced.with(stringSerde, longSerde));
        KTable table3 = streamsBuilder.table("sum", Consumed.with(stringSerde, longSerde));
        table3.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", this.name), new String[0]);
        groupByKey.windowedBy(TimeWindows.of(Duration.ofDays(2L))).count(Materialized.as("uwin-cnt")).toStream(new SmokeTestUtil.Unwindow()).filterNot((str9, l3) -> {
            return str9.equals("flush");
        }).to("cnt", Produced.with(stringSerde, longSerde));
        KTable table4 = streamsBuilder.table("cnt", Consumed.with(stringSerde, longSerde), Materialized.as("cntStoreName"));
        table4.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", this.name), new String[0]);
        table2.join(table, (num12, num13) -> {
            return Integer.valueOf(num12.intValue() - num13.intValue());
        }).toStream().filterNot((str10, num14) -> {
            return str10.equals("flush");
        }).to("dif", Produced.with(stringSerde, intSerde));
        table3.join(table4, (l4, l5) -> {
            return Double.valueOf(l4.longValue() / l5.longValue());
        }).toStream().filterNot((str11, d) -> {
            return str11.equals("flush");
        }).to("avg", Produced.with(stringSerde, doubleSerde));
        SmokeTestUtil.Agg agg = new SmokeTestUtil.Agg();
        table4.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)).aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).toStream().to("tagg", Produced.with(stringSerde, longSerde));
        return streamsBuilder.build();
    }

    private static void streamify(KTable<Windowed<String>, Integer> kTable, String str) {
        kTable.toStream().filterNot((windowed, num) -> {
            return ((String) windowed.key()).equals("flush");
        }).map((windowed2, num2) -> {
            return new KeyValue(windowed2.toString(), num2);
        }).to(str, Produced.with(stringSerde, intSerde));
    }
}
