package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.nio.ByteBuffer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.class */
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements ProcessorSupplier<KO, Change<VO>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ForeignJoinSubscriptionProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier$KTableKTableJoinProcessor.class */
    private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Change<VO>> {
        private Sensor droppedRecordsSensor;
        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;

        private KTableKTableJoinProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            InternalProcessorContext internalProcessorContext = (InternalProcessorContext) processorContext;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
            this.store = (TimestampedKeyValueStore) internalProcessorContext.getStateStore(ForeignJoinSubscriptionProcessorSupplier.this.storeBuilder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void process(KO ko, Change<VO> change) {
            if (ko == null) {
                ForeignJoinSubscriptionProcessorSupplier.LOG.warn("Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", change, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.droppedRecordsSensor.record();
                return;
            }
            Bytes prefixBytes = ForeignJoinSubscriptionProcessorSupplier.this.keySchema.prefixBytes(ko);
            KeyValueIterator<Bytes, SubscriptionWrapper<K>> range = this.store.range(prefixBytes, Bytes.increment(prefixBytes));
            Throwable th = null;
            while (range.hasNext()) {
                try {
                    try {
                        KeyValue next = range.next();
                        if (prefixEquals(((Bytes) next.key).get(), prefixBytes.get())) {
                            context().forward(ForeignJoinSubscriptionProcessorSupplier.this.keySchema.fromBytes((Bytes) next.key).getPrimaryKey(), new SubscriptionResponseWrapper(((SubscriptionWrapper) ((ValueAndTimestamp) next.value).value()).getHash(), change.newValue));
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (range != null) {
                        if (th != null) {
                            try {
                                range.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            range.close();
                        }
                    }
                    throw th3;
                }
            }
            if (range != null) {
                if (0 == 0) {
                    range.close();
                    return;
                }
                try {
                    range.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }

        private boolean prefixEquals(byte[] bArr, byte[] bArr2) {
            int min = Math.min(bArr.length, bArr2.length);
            return ByteBuffer.wrap(bArr, 0, min).equals(ByteBuffer.wrap(bArr2, 0, min));
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableKTableJoinProcessor) obj, (Change) obj2);
        }
    }

    public ForeignJoinSubscriptionProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> combinedKeySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = combinedKeySchema;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<KO, Change<VO>> get() {
        return new KTableKTableJoinProcessor();
    }
}
