package org.apache.kafka.streams.state.internals;

import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredSessionStore.class */
public class MeteredSessionStore<K, V> extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V> implements SessionStore<K, V> {
    private final String metricsScope;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final Time time;
    private StateSerdes<K, V> serdes;
    private StreamsMetricsImpl streamsMetrics;
    private Sensor putSensor;
    private Sensor fetchSensor;
    private Sensor flushSensor;
    private Sensor removeSensor;
    private Sensor e2eLatencySensor;
    private InternalProcessorContext context;
    private final String threadId;
    private String taskId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MeteredSessionStore(SessionStore<Bytes, byte[]> sessionStore, String str, Serde<K> serde, Serde<V> serde2, Time time) {
        super(sessionStore);
        this.threadId = Thread.currentThread().getName();
        this.metricsScope = str;
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.time = time;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = processorContext instanceof InternalProcessorContext ? (InternalProcessorContext) processorContext : null;
        initStoreSerde(processorContext);
        this.taskId = processorContext.taskId().toString();
        this.streamsMetrics = (StreamsMetricsImpl) processorContext.metrics();
        registerMetrics();
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
            super.init(processorContext, stateStore);
        }, this.time, StateStoreMetrics.restoreSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics));
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.context = stateStoreContext instanceof InternalProcessorContext ? (InternalProcessorContext) stateStoreContext : null;
        initStoreSerde(stateStoreContext);
        this.taskId = stateStoreContext.taskId().toString();
        this.streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
        registerMetrics();
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
            super.init(stateStoreContext, stateStore);
        }, this.time, StateStoreMetrics.restoreSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics));
    }

    private void registerMetrics() {
        this.putSensor = StateStoreMetrics.putSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics);
        this.fetchSensor = StateStoreMetrics.fetchSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics);
        this.flushSensor = StateStoreMetrics.flushSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics);
        this.removeSensor = StateStoreMetrics.removeSensor(this.threadId, this.taskId, this.metricsScope, name(), this.streamsMetrics);
        this.e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(this.taskId, this.metricsScope, name(), this.streamsMetrics);
    }

    private void initStoreSerde(ProcessorContext processorContext) {
        String name = name();
        String changelogFor = ProcessorContextUtils.changelogFor(processorContext, name);
        this.serdes = new StateSerdes<>(changelogFor != null ? changelogFor : ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), name), WrappingNullableUtils.prepareKeySerde(this.keySerde, processorContext.keySerde(), processorContext.valueSerde()), WrappingNullableUtils.prepareValueSerde(this.valueSerde, processorContext.keySerde(), processorContext.valueSerde()));
    }

    private void initStoreSerde(StateStoreContext stateStoreContext) {
        String name = name();
        String changelogFor = ProcessorContextUtils.changelogFor(stateStoreContext, name);
        this.serdes = new StateSerdes<>(changelogFor != null ? changelogFor : ProcessorStateManager.storeChangelogTopic(stateStoreContext.applicationId(), name), WrappingNullableUtils.prepareKeySerde(this.keySerde, stateStoreContext.keySerde(), stateStoreContext.valueSerde()), WrappingNullableUtils.prepareValueSerde(this.valueSerde, stateStoreContext.keySerde(), stateStoreContext.valueSerde()));
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public boolean setFlushListener(CacheFlushListener<Windowed<K>, V> cacheFlushListener, boolean z) {
        SessionStore<Bytes, byte[]> wrapped = wrapped();
        if (wrapped instanceof CachedStateStore) {
            return ((CachedStateStore) wrapped).setFlushListener((bArr, bArr2, bArr3, j) -> {
                cacheFlushListener.apply(SessionKeySchema.from(bArr, this.serdes.keyDeserializer(), this.serdes.topic()), bArr2 != null ? this.serdes.valueFrom(bArr2) : null, bArr3 != null ? this.serdes.valueFrom(bArr3) : null, j);
            }, z);
        }
        return false;
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<K> windowed, V v) {
        Objects.requireNonNull(windowed, "sessionKey can't be null");
        try {
            StreamsMetricsImpl.maybeMeasureLatency(() -> {
                ((SessionStore) wrapped()).put(new Windowed<>(keyBytes(windowed.key()), windowed.window()), this.serdes.rawValue(v));
            }, this.time, this.putSensor);
            maybeRecordE2ELatency();
        } catch (ProcessorStateException e) {
            throw new ProcessorStateException(String.format(e.getMessage(), windowed.key(), v), e);
        }
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<K> windowed) {
        Objects.requireNonNull(windowed, "sessionKey can't be null");
        try {
            StreamsMetricsImpl.maybeMeasureLatency(() -> {
                ((SessionStore) wrapped()).remove(new Windowed<>(keyBytes(windowed.key()), windowed.window()));
            }, this.time, this.removeSensor);
        } catch (ProcessorStateException e) {
            throw new ProcessorStateException(String.format(e.getMessage(), windowed.key()), e);
        }
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public V fetchSession(K k, long j, long j2) {
        Objects.requireNonNull(k, "key cannot be null");
        return (V) StreamsMetricsImpl.maybeMeasureLatency(() -> {
            byte[] fetchSession = wrapped().fetchSession(keyBytes(k), j, j2);
            if (fetchSession == null) {
                return null;
            }
            return this.serdes.valueFrom(fetchSession);
        }, this.time, this.fetchSensor);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> fetch(K k) {
        Objects.requireNonNull(k, "key cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().fetch(keyBytes(k)), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> backwardFetch(K k) {
        Objects.requireNonNull(k, "key cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().backwardFetch(keyBytes(k)), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> fetch(K k, K k2) {
        Objects.requireNonNull(k, "from cannot be null");
        Objects.requireNonNull(k2, "to cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().fetch(keyBytes(k), keyBytes(k2)), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> backwardFetch(K k, K k2) {
        Objects.requireNonNull(k, "from cannot be null");
        Objects.requireNonNull(k2, "to cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().backwardFetch(keyBytes(k), keyBytes(k2)), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> findSessions(K k, long j, long j2) {
        Objects.requireNonNull(k, "key cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().findSessions(keyBytes(k), j, j2), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(K k, long j, long j2) {
        Objects.requireNonNull(k, "key cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().backwardFindSessions(keyBytes(k), j, j2), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> findSessions(K k, K k2, long j, long j2) {
        Objects.requireNonNull(k, "keyFrom cannot be null");
        Objects.requireNonNull(k2, "keyTo cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().findSessions(keyBytes(k), keyBytes(k2), j, j2), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(K k, K k2, long j, long j2) {
        Objects.requireNonNull(k, "keyFrom cannot be null");
        Objects.requireNonNull(k2, "keyTo cannot be null");
        return new MeteredWindowedKeyValueIterator(wrapped().backwardFindSessions(keyBytes(k), keyBytes(k2), j, j2), this.fetchSensor, this.streamsMetrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
            super.flush();
        }, this.time, this.flushSensor);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        try {
            wrapped().close();
        } finally {
            this.streamsMetrics.removeAllStoreLevelSensorsAndMetrics(this.taskId, name());
        }
    }

    private Bytes keyBytes(K k) {
        return Bytes.wrap(this.serdes.rawKey(k));
    }

    private void maybeRecordE2ELatency() {
        if (!this.e2eLatencySensor.shouldRecord() || this.context == null) {
            return;
        }
        this.e2eLatencySensor.record(r0 - this.context.timestamp(), this.time.milliseconds());
    }
}
