package org.apache.kafka.test;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerStub;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/test/InternalMockProcessorContext.class */
public class InternalMockProcessorContext<KOut, VOut> extends AbstractProcessorContext<KOut, VOut> implements RecordCollector.Supplier {
    private StateManager stateManager;
    private final File stateDir;
    private final RecordCollector.Supplier recordCollectorSupplier;
    private final Map<String, StateStore> storeMap;
    private final Map<String, StateRestoreCallback> restoreFuncs;
    private final ToInternal toInternal;
    private Task.TaskType taskType;
    private Serde<?> keySerde;
    private Serde<?> valueSerde;
    private long timestamp;
    private final Time time;
    private final Map<String, String> storeToChangelogTopic;
    private final boolean consistencyEnabled;

    public InternalMockProcessorContext() {
        this(null, null, null, new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File file, StreamsConfig streamsConfig) {
        this(file, null, null, new StreamsMetricsImpl(new Metrics(), "mock", streamsConfig.getString("built.in.metrics.version"), new MockTime()), streamsConfig, null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(StreamsMetricsImpl streamsMetricsImpl) {
        this(null, null, null, streamsMetricsImpl, new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File file, StreamsConfig streamsConfig, RecordCollector recordCollector) {
        this(file, null, null, new StreamsMetricsImpl(new Metrics(), "mock", streamsConfig.getString("built.in.metrics.version"), new MockTime()), streamsConfig, () -> {
            return recordCollector;
        }, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File file, Serde<?> serde, Serde<?> serde2, StreamsConfig streamsConfig) {
        this(file, serde, serde2, new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), streamsConfig, null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(StateSerdes<?, ?> stateSerdes, RecordCollector recordCollector) {
        this(null, stateSerdes.keySerde(), stateSerdes.valueSerde(), recordCollector, null);
    }

    public InternalMockProcessorContext(StateSerdes<?, ?> stateSerdes, RecordCollector recordCollector, Metrics metrics) {
        this(null, stateSerdes.keySerde(), stateSerdes.valueSerde(), new StreamsMetricsImpl(metrics, "mock", "latest", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> {
            return recordCollector;
        }, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File file, Serde<?> serde, Serde<?> serde2, RecordCollector recordCollector, ThreadCache threadCache) {
        this(file, serde, serde2, new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> {
            return recordCollector;
        }, threadCache, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File file, Serde<?> serde, Serde<?> serde2, StreamsMetricsImpl streamsMetricsImpl, StreamsConfig streamsConfig, RecordCollector.Supplier supplier, ThreadCache threadCache, Time time) {
        this(file, serde, serde2, streamsMetricsImpl, streamsConfig, supplier, threadCache, time, new TaskId(0, 0));
    }

    public InternalMockProcessorContext(File file, Serde<?> serde, Serde<?> serde2, StreamsMetricsImpl streamsMetricsImpl, StreamsConfig streamsConfig, RecordCollector.Supplier supplier, ThreadCache threadCache, Time time, TaskId taskId) {
        super(taskId, streamsConfig, streamsMetricsImpl, threadCache);
        this.stateManager = new StateManagerStub();
        this.storeMap = new LinkedHashMap();
        this.restoreFuncs = new HashMap();
        this.toInternal = new ToInternal();
        this.taskType = Task.TaskType.ACTIVE;
        this.timestamp = -1L;
        this.storeToChangelogTopic = new HashMap();
        super.setCurrentNode(new ProcessorNode("TESTING_NODE"));
        this.stateDir = file;
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.recordCollectorSupplier = supplier;
        this.time = time;
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
        this.recordContext = new ProcessorRecordContext(0L, 0L, 0, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders());
    }

    protected StateManager stateManager() {
        return this.stateManager;
    }

    public void setStateManger(StateManager stateManager) {
        this.stateManager = stateManager;
    }

    public RecordCollector recordCollector() {
        RecordCollector recordCollector = this.recordCollectorSupplier.recordCollector();
        if (recordCollector == null) {
            throw new UnsupportedOperationException("No RecordCollector specified");
        }
        return recordCollector;
    }

    public void setKeySerde(Serde<?> serde) {
        this.keySerde = serde;
    }

    public void setValueSerde(Serde<?> serde) {
        this.valueSerde = serde;
    }

    public Serde<?> keySerde() {
        return this.keySerde;
    }

    public Serde<?> valueSerde() {
        return this.valueSerde;
    }

    public void initialize() {
    }

    public File stateDir() {
        if (this.stateDir == null) {
            throw new UnsupportedOperationException("State directory not specified");
        }
        return this.stateDir;
    }

    public void register(StateStore stateStore, StateRestoreCallback stateRestoreCallback, CommitCallback commitCallback) {
        this.storeMap.put(stateStore.name(), stateStore);
        this.restoreFuncs.put(stateStore.name(), stateRestoreCallback);
        stateManager().registerStore(stateStore, stateRestoreCallback, commitCallback);
    }

    public <S extends StateStore> S getStateStore(String str) {
        return (S) this.storeMap.get(str);
    }

    public Cancellable schedule(Duration duration, PunctuationType punctuationType, Punctuator punctuator) throws IllegalArgumentException {
        throw new UnsupportedOperationException("schedule() not supported.");
    }

    public void commit() {
    }

    public <K extends KOut, V extends VOut> void forward(Record<K, V> record) {
        forward((Record) record, (String) null);
    }

    public <K extends KOut, V extends VOut> void forward(Record<K, V> record, String str) {
        if (this.recordContext != null && record.timestamp() != this.recordContext.timestamp()) {
            setTime(record.timestamp());
        }
        ProcessorNode processorNode = this.currentNode;
        try {
            for (ProcessorNode processorNode2 : processorNode.children()) {
                this.currentNode = processorNode2;
                processorNode2.process(record);
            }
        } finally {
            this.currentNode = processorNode;
        }
    }

    public void forward(Object obj, Object obj2) {
        forward(obj, obj2, To.all());
    }

    public void forward(Object obj, Object obj2, To to) {
        this.toInternal.update(to);
        if (this.toInternal.hasTimestamp()) {
            setTime(this.toInternal.timestamp());
        }
        ProcessorNode processorNode = this.currentNode;
        try {
            for (ProcessorNode processorNode2 : processorNode.children()) {
                if (this.toInternal.child() == null || this.toInternal.child().equals(processorNode2.name())) {
                    this.currentNode = processorNode2;
                    processorNode2.process(new Record(obj, obj2, this.toInternal.timestamp(), headers()));
                    this.toInternal.update(to);
                }
            }
        } finally {
            this.currentNode = processorNode;
        }
    }

    public void setTime(long j) {
        if (this.recordContext != null) {
            this.recordContext = new ProcessorRecordContext(j, this.recordContext.offset(), this.recordContext.partition(), this.recordContext.topic(), this.recordContext.headers());
        }
        this.timestamp = j;
    }

    public long timestamp() {
        return this.recordContext == null ? this.timestamp : this.recordContext.timestamp();
    }

    public long currentSystemTimeMs() {
        return this.time.milliseconds();
    }

    public long currentStreamTimeMs() {
        throw new UnsupportedOperationException("this method is not supported in InternalMockProcessorContext");
    }

    public String topic() {
        if (this.recordContext == null) {
            return null;
        }
        return this.recordContext.topic();
    }

    public int partition() {
        if (this.recordContext == null) {
            return -1;
        }
        return this.recordContext.partition();
    }

    public long offset() {
        if (this.recordContext == null) {
            return -1L;
        }
        return this.recordContext.offset();
    }

    public Headers headers() {
        return this.recordContext == null ? new RecordHeaders() : this.recordContext.headers();
    }

    public Task.TaskType taskType() {
        return this.taskType;
    }

    public void logChange(String str, Bytes bytes, byte[] bArr, long j, Position position) {
        Headers recordHeaders = new RecordHeaders();
        if (this.consistencyEnabled) {
            recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
            recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(position).array()));
        } else {
            recordHeaders = null;
        }
        recordCollector().send(str + "-changelog", bytes, bArr, recordHeaders, Integer.valueOf(taskId().partition()), Long.valueOf(j), BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER, (String) null, (InternalProcessorContext) null);
    }

    public void transitionToActive(StreamTask streamTask, RecordCollector recordCollector, ThreadCache threadCache) {
        this.taskType = Task.TaskType.ACTIVE;
    }

    public void transitionToStandby(ThreadCache threadCache) {
        this.taskType = Task.TaskType.STANDBY;
    }

    public void registerCacheFlushListener(String str, ThreadCache.DirtyEntryFlushListener dirtyEntryFlushListener) {
        cache().addDirtyEntryFlushListener(str, dirtyEntryFlushListener);
    }

    public void restore(String str, Iterable<KeyValue<byte[], byte[]>> iterable) {
        RecordBatchingStateRestoreCallback adapt = StateRestoreCallbackAdapter.adapt(this.restoreFuncs.get(str));
        ArrayList arrayList = new ArrayList();
        for (KeyValue<byte[], byte[]> keyValue : iterable) {
            arrayList.add(new ConsumerRecord("", 0, 0L, keyValue.key, keyValue.value));
        }
        adapt.restoreBatch(arrayList);
    }

    public void restoreWithHeaders(String str, List<ConsumerRecord<byte[], byte[]>> list) {
        StateRestoreCallbackAdapter.adapt(this.restoreFuncs.get(str)).restoreBatch(list);
    }

    public void addChangelogForStore(String str, String str2) {
        this.storeToChangelogTopic.put(str, str2);
    }

    public String changelogFor(String str) {
        return this.storeToChangelogTopic.get(str);
    }

    public <K extends KOut, V extends VOut> void forward(FixedKeyRecord<K, V> fixedKeyRecord) {
        forward(new Record<>(fixedKeyRecord.key(), fixedKeyRecord.value(), fixedKeyRecord.timestamp(), fixedKeyRecord.headers()));
    }

    public <K extends KOut, V extends VOut> void forward(FixedKeyRecord<K, V> fixedKeyRecord, String str) {
        forward((Record) new Record<>(fixedKeyRecord.key(), fixedKeyRecord.value(), fixedKeyRecord.timestamp(), fixedKeyRecord.headers()), str);
    }
}
