package io.synadia.flink.v0.source.reader;

import io.nats.client.Message;
import io.synadia.flink.v0.source.split.NatsSubjectSplit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/synadia/flink/v0/source/reader/NatsSourceFetcherManager.class */
public class NatsSourceFetcherManager extends SplitFetcherManager<Message, NatsSubjectSplit> implements Supplier<SplitReader<Message, NatsSubjectSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsSourceFetcherManager.class);
    private final Map<String, Integer> splitFetcherMapping;
    private final Map<Integer, Boolean> fetcherStatus;

    public NatsSourceFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message>> futureCompletingBlockingQueue, Supplier<SplitReader<Message, NatsSubjectSplit>> supplier, Configuration configuration) {
        super(futureCompletingBlockingQueue, supplier, configuration);
        this.splitFetcherMapping = new HashMap();
        this.fetcherStatus = new HashMap();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public SplitReader<Message, NatsSubjectSplit> get() {
        return null;
    }

    public void addSplits(List<NatsSubjectSplit> list) {
        for (NatsSubjectSplit natsSubjectSplit : list) {
            SplitFetcher<Message, NatsSubjectSplit> orCreateFetcher = getOrCreateFetcher(natsSubjectSplit.splitId());
            orCreateFetcher.addSplits(Collections.singletonList(natsSubjectSplit));
            startFetcher(orCreateFetcher);
        }
    }

    public void removeSplits(List<NatsSubjectSplit> list) {
    }

    protected void startFetcher(SplitFetcher<Message, NatsSubjectSplit> splitFetcher) {
        if (this.fetcherStatus.get(Integer.valueOf(splitFetcher.fetcherId())) != Boolean.TRUE) {
            this.fetcherStatus.put(Integer.valueOf(splitFetcher.fetcherId()), true);
            super.startFetcher(splitFetcher);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFetcher(String str) {
        Integer remove = this.splitFetcherMapping.remove(str);
        if (remove != null) {
            this.fetcherStatus.remove(remove);
            SplitFetcher splitFetcher = (SplitFetcher) this.fetchers.remove(remove);
            if (splitFetcher != null) {
                splitFetcher.shutdown();
            }
        }
    }

    public void acknowledgeMessages(Map<String, List<Message>> map) throws Exception {
        LOG.debug("Acknowledge messages {}", map);
        for (Map.Entry<String, List<Message>> entry : map.entrySet()) {
            String key = entry.getKey();
            triggerAcknowledge(getOrCreateFetcher(key), key, entry.getValue());
        }
    }

    private void triggerAcknowledge(SplitFetcher<Message, NatsSubjectSplit> splitFetcher, String str, List<Message> list) throws Exception {
        ((NatsSubjectSplitReader) splitFetcher.getSplitReader()).notifyCheckpointComplete(str, list);
        startFetcher(splitFetcher);
    }

    private SplitFetcher<Message, NatsSubjectSplit> getOrCreateFetcher(String str) {
        SplitFetcher<Message, NatsSubjectSplit> splitFetcher;
        Integer num = this.splitFetcherMapping.get(str);
        if (num == null) {
            splitFetcher = createSplitFetcher();
        } else {
            splitFetcher = (SplitFetcher) this.fetchers.get(num);
            if (splitFetcher == null) {
                this.fetcherStatus.remove(num);
                splitFetcher = createSplitFetcher();
            }
        }
        this.splitFetcherMapping.put(str, Integer.valueOf(splitFetcher.fetcherId()));
        return splitFetcher;
    }
}
