package org.apache.kafka.coordinator.share;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.ShareCoordinatorShard;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/share/ShareCoordinatorService.class */
public class ShareCoordinatorService implements ShareCoordinator {
    private final ShareCoordinatorConfig config;
    private final Logger log;
    private final CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime;
    private final ShareCoordinatorMetrics shareCoordinatorMetrics;
    private final Time time;
    private final Timer timer;
    private final PartitionWriter writer;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private volatile int numPartitions = -1;
    private final Map<TopicPartition, Long> lastPrunedOffsets = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/kafka/coordinator/share/ShareCoordinatorService$Builder.class */
    public static class Builder {
        private final int nodeId;
        private final ShareCoordinatorConfig config;
        private PartitionWriter writer;
        private CoordinatorLoader<CoordinatorRecord> loader;
        private Time time;
        private Timer timer;
        private ShareCoordinatorMetrics coordinatorMetrics;
        private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;

        public Builder(int i, ShareCoordinatorConfig shareCoordinatorConfig) {
            this.nodeId = i;
            this.config = shareCoordinatorConfig;
        }

        public Builder withWriter(PartitionWriter partitionWriter) {
            this.writer = partitionWriter;
            return this;
        }

        public Builder withLoader(CoordinatorLoader<CoordinatorRecord> coordinatorLoader) {
            this.loader = coordinatorLoader;
            return this;
        }

        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder withCoordinatorMetrics(ShareCoordinatorMetrics shareCoordinatorMetrics) {
            this.coordinatorMetrics = shareCoordinatorMetrics;
            return this;
        }

        public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public ShareCoordinatorService build() {
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.writer == null) {
                throw new IllegalArgumentException("Writer must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.coordinatorMetrics == null) {
                throw new IllegalArgumentException("Share Coordinator metrics must be set.");
            }
            if (this.coordinatorRuntimeMetrics == null) {
                throw new IllegalArgumentException("Coordinator runtime metrics must be set.");
            }
            String format = String.format("ShareCoordinator id=%d", Integer.valueOf(this.nodeId));
            LogContext logContext = new LogContext(String.format("[%s] ", format));
            return new ShareCoordinatorService(logContext, this.config, new CoordinatorRuntime.Builder().withTime(this.time).withTimer(this.timer).withLogPrefix(format).withLogContext(logContext).withEventProcessor(new MultiThreadedEventProcessor(logContext, "share-coordinator-event-processor-", this.config.shareCoordinatorNumThreads(), this.time, this.coordinatorRuntimeMetrics)).withPartitionWriter(this.writer).withLoader(this.loader).withCoordinatorShardBuilderSupplier(() -> {
                return new ShareCoordinatorShard.Builder(this.config);
            }).withTime(this.time).withDefaultWriteTimeOut(Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs())).withCoordinatorRuntimeMetrics(this.coordinatorRuntimeMetrics).withCoordinatorMetrics(this.coordinatorMetrics).withSerializer(new ShareCoordinatorRecordSerde()).withCompression(Compression.of(this.config.shareCoordinatorStateTopicCompressionType()).build()).withAppendLingerMs(this.config.shareCoordinatorAppendLingerMs()).withExecutorService(Executors.newSingleThreadExecutor()).build(), this.coordinatorMetrics, this.time, this.timer, this.writer);
        }
    }

    public ShareCoordinatorService(LogContext logContext, ShareCoordinatorConfig shareCoordinatorConfig, CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> coordinatorRuntime, ShareCoordinatorMetrics shareCoordinatorMetrics, Time time, Timer timer, PartitionWriter partitionWriter) {
        this.log = logContext.logger(ShareCoordinatorService.class);
        this.config = shareCoordinatorConfig;
        this.runtime = coordinatorRuntime;
        this.shareCoordinatorMetrics = shareCoordinatorMetrics;
        this.time = time;
        this.timer = timer;
        this.writer = partitionWriter;
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public int partitionFor(SharePartitionKey sharePartitionKey) {
        throwIfNotActive();
        return Utils.abs(sharePartitionKey.asCoordinatorKey().hashCode()) % this.numPartitions;
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public Properties shareGroupStateTopicConfigs() {
        Properties properties = new Properties();
        properties.put("cleanup.policy", "delete");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", Integer.valueOf(this.config.shareCoordinatorStateTopicSegmentBytes()));
        return properties;
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public void startup(IntSupplier intSupplier) {
        if (!this.isActive.compareAndSet(false, true)) {
            this.log.warn("Share coordinator is already running.");
            return;
        }
        this.log.info("Starting up.");
        this.numPartitions = intSupplier.getAsInt();
        setupRecordPruning();
        this.log.info("Startup complete.");
    }

    private void setupRecordPruning() {
        this.log.info("Scheduling share-group state topic prune job.");
        this.timer.add(new TimerTask(this.config.shareCoordinatorTopicPruneIntervalMs()) { // from class: org.apache.kafka.coordinator.share.ShareCoordinatorService.1
            public void run() {
                ArrayList arrayList = new ArrayList();
                ShareCoordinatorService.this.runtime.activeTopicPartitions().forEach(topicPartition -> {
                    arrayList.add(ShareCoordinatorService.this.performRecordPruning(topicPartition));
                });
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r5, th) -> {
                    if (th != null) {
                        ShareCoordinatorService.this.log.error("Received error in share-group state topic prune.", th);
                    }
                    ShareCoordinatorService.this.setupRecordPruning();
                });
            }
        });
    }

    private CompletableFuture<Void> performRecordPruning(TopicPartition topicPartition) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.runtime.scheduleWriteOperation("write-state-record-prune", topicPartition, Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), (v0) -> {
            return v0.lastRedundantOffset();
        }).whenComplete((optional, th) -> {
            if (th != null) {
                this.log.debug("Last redundant offset for tp {} lookup threw an error.", topicPartition, th);
                Errors forException = Errors.forException(th);
                if (forException.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || forException.equals(Errors.NOT_COORDINATOR)) {
                    completableFuture.complete(null);
                    return;
                } else {
                    this.log.error("Last redundant offset lookup for tp {} threw an error.", topicPartition, th);
                    completableFuture.completeExceptionally(th);
                    return;
                }
            }
            if (!optional.isPresent()) {
                this.log.debug("No offset value for tp {} found.", topicPartition);
                completableFuture.complete(null);
                return;
            }
            Long l = (Long) optional.get();
            Long l2 = this.lastPrunedOffsets.get(topicPartition);
            if (l2 == null || l2.longValue() != l.longValue()) {
                this.log.info("Pruning records in {} till offset {}.", topicPartition, l);
                this.writer.deleteRecords(topicPartition, l.longValue()).whenComplete((r11, th) -> {
                    if (th != null) {
                        this.log.debug("Exception while deleting records in {} till offset {}.", new Object[]{topicPartition, l, th});
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(null);
                        this.lastPrunedOffsets.put(topicPartition, l);
                    }
                });
            } else {
                this.log.debug("{} already pruned till offset {}", topicPartition, l);
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public void shutdown() {
        if (!this.isActive.compareAndSet(true, false)) {
            this.log.warn("Share coordinator is already shutting down.");
            return;
        }
        this.log.info("Shutting down.");
        Utils.closeQuietly(this.runtime, "coordinator runtime");
        Utils.closeQuietly(this.shareCoordinatorMetrics, "share coordinator metrics");
        this.log.info("Shutdown complete.");
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public CompletableFuture<WriteShareGroupStateResponseData> writeState(RequestContext requestContext, WriteShareGroupStateRequestData writeShareGroupStateRequestData) {
        if (isEmpty(writeShareGroupStateRequestData.topics())) {
            this.log.error("Topic Data is empty: {}", writeShareGroupStateRequestData);
            return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
        }
        for (WriteShareGroupStateRequestData.WriteStateData writeStateData : writeShareGroupStateRequestData.topics()) {
            if (isEmpty(writeStateData.partitions())) {
                this.log.error("Partition Data for topic {} is empty: {}", writeStateData.topicId(), writeShareGroupStateRequestData);
                return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
            }
        }
        String groupId = writeShareGroupStateRequestData.groupId();
        if (isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", writeShareGroupStateRequestData);
            return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
        }
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(generateErrorWriteStateResponse(writeShareGroupStateRequestData, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        HashMap hashMap = new HashMap();
        long hiResClockMs = this.time.hiResClockMs();
        writeShareGroupStateRequestData.topics().forEach(writeStateData2 -> {
            Map map = (Map) hashMap.computeIfAbsent(writeStateData2.topicId(), uuid -> {
                return new HashMap();
            });
            writeStateData2.partitions().forEach(partitionData -> {
                map.put(Integer.valueOf(partitionData.partition()), this.runtime.scheduleWriteOperation("write-share-group-state", topicPartitionFor(SharePartitionKey.getInstance(groupId, writeStateData2.topicId(), partitionData.partition())), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), shareCoordinatorShard -> {
                    return shareCoordinatorShard.writeState(new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData().setTopicId(writeStateData2.topicId()).setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData().setPartition(partitionData.partition()).setStartOffset(partitionData.startOffset()).setLeaderEpoch(partitionData.leaderEpoch()).setStateEpoch(partitionData.stateEpoch()).setStateBatches(partitionData.stateBatches()))))));
                }).exceptionally(th -> {
                    return (WriteShareGroupStateResponseData) CoordinatorOperationExceptionHelper.handleOperationException("write-share-group-state", writeShareGroupStateRequestData, th, (errors, str) -> {
                        return WriteShareGroupStateResponse.toErrorResponseData(writeStateData2.topicId(), partitionData.partition(), errors, "Unable to write share group state: " + th.getMessage());
                    }, this.log);
                }));
            });
        });
        return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().stream().flatMap(map -> {
            return map.values().stream();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(r11 -> {
            ArrayList arrayList = new ArrayList(hashMap.size());
            hashMap.forEach((uuid, map2) -> {
                ArrayList arrayList2 = new ArrayList(map2.size());
                map2.forEach((num, completableFuture) -> {
                    arrayList2.addAll(((WriteShareGroupStateResponseData.WriteStateResult) ((WriteShareGroupStateResponseData) completableFuture.getNow(null)).results().get(0)).partitions());
                });
                arrayList.add(WriteShareGroupStateResponse.toResponseWriteStateResult(uuid, arrayList2));
            });
            this.shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, this.time.hiResClockMs() - hiResClockMs);
            return new WriteShareGroupStateResponseData().setResults(arrayList);
        });
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestContext requestContext, ReadShareGroupStateRequestData readShareGroupStateRequestData) {
        String groupId = readShareGroupStateRequestData.groupId();
        HashMap hashMap = new HashMap();
        if (isEmpty(readShareGroupStateRequestData.topics())) {
            this.log.error("Topic Data is empty: {}", readShareGroupStateRequestData);
            return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
        }
        for (ReadShareGroupStateRequestData.ReadStateData readStateData : readShareGroupStateRequestData.topics()) {
            if (isEmpty(readStateData.partitions())) {
                this.log.error("Partition Data for topic {} is empty: {}", readStateData.topicId(), readShareGroupStateRequestData);
                return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
            }
        }
        if (isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", readShareGroupStateRequestData);
            return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
        }
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(generateErrorReadStateResponse(readShareGroupStateRequestData, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        for (ReadShareGroupStateRequestData.ReadStateData readStateData2 : readShareGroupStateRequestData.topics()) {
            Uuid uuid = readStateData2.topicId();
            for (ReadShareGroupStateRequestData.PartitionData partitionData : readStateData2.partitions()) {
                SharePartitionKey sharePartitionKey = SharePartitionKey.getInstance(readShareGroupStateRequestData.groupId(), uuid, partitionData.partition());
                ReadShareGroupStateRequestData topics = new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData().setTopicId(uuid).setPartitions(Collections.singletonList(partitionData))));
                ((Map) hashMap.computeIfAbsent(uuid, uuid2 -> {
                    return new HashMap();
                })).put(Integer.valueOf(partitionData.partition()), this.runtime.scheduleWriteOperation("read-update-leader-epoch-state", topicPartitionFor(sharePartitionKey), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), shareCoordinatorShard -> {
                    return shareCoordinatorShard.readStateAndMaybeUpdateLeaderEpoch(topics);
                }).exceptionally(th -> {
                    return (ReadShareGroupStateResponseData) CoordinatorOperationExceptionHelper.handleOperationException("read-update-leader-epoch-state", readShareGroupStateRequestData, th, (errors, str) -> {
                        return ReadShareGroupStateResponse.toErrorResponseData(readStateData2.topicId(), partitionData.partition(), errors, "Unable to read share group state: " + th.getMessage());
                    }, this.log);
                }));
            }
        }
        return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().stream().flatMap(map -> {
            return map.values().stream();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(r5 -> {
            ArrayList arrayList = new ArrayList(hashMap.size());
            hashMap.forEach((uuid3, map2) -> {
                ArrayList arrayList2 = new ArrayList(map2.size());
                map2.forEach((num, completableFuture) -> {
                    arrayList2.add((ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) ((ReadShareGroupStateResponseData) completableFuture.getNow(null)).results().get(0)).partitions().get(0));
                });
                arrayList.add(ReadShareGroupStateResponse.toResponseReadStateResult(uuid3, arrayList2));
            });
            return new ReadShareGroupStateResponseData().setResults(arrayList);
        });
    }

    private ReadShareGroupStateResponseData generateErrorReadStateResponse(ReadShareGroupStateRequestData readShareGroupStateRequestData, Errors errors, String str) {
        return new ReadShareGroupStateResponseData().setResults((List) readShareGroupStateRequestData.topics().stream().map(readStateData -> {
            ReadShareGroupStateResponseData.ReadStateResult readStateResult = new ReadShareGroupStateResponseData.ReadStateResult();
            readStateResult.setTopicId(readStateData.topicId());
            readStateResult.setPartitions((List) readStateData.partitions().stream().map(partitionData -> {
                return ReadShareGroupStateResponse.toErrorResponsePartitionResult(partitionData.partition(), errors, str);
            }).collect(Collectors.toList()));
            return readStateResult;
        }).collect(Collectors.toList()));
    }

    private WriteShareGroupStateResponseData generateErrorWriteStateResponse(WriteShareGroupStateRequestData writeShareGroupStateRequestData, Errors errors, String str) {
        return new WriteShareGroupStateResponseData().setResults((List) writeShareGroupStateRequestData.topics().stream().map(writeStateData -> {
            WriteShareGroupStateResponseData.WriteStateResult writeStateResult = new WriteShareGroupStateResponseData.WriteStateResult();
            writeStateResult.setTopicId(writeStateData.topicId());
            writeStateResult.setPartitions((List) writeStateData.partitions().stream().map(partitionData -> {
                return WriteShareGroupStateResponse.toErrorResponsePartitionResult(partitionData.partition(), errors, str);
            }).collect(Collectors.toList()));
            return writeStateResult;
        }).collect(Collectors.toList()));
    }

    private static boolean isGroupIdEmpty(String str) {
        return str == null || str.isEmpty();
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public void onElection(int i, int i2) {
        throwIfNotActive();
        this.runtime.scheduleLoadOperation(new TopicPartition("__share_group_state", i), i2);
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public void onResignation(int i, OptionalInt optionalInt) {
        throwIfNotActive();
        TopicPartition topicPartition = new TopicPartition("__share_group_state", i);
        this.lastPrunedOffsets.remove(topicPartition);
        this.runtime.scheduleUnloadOperation(topicPartition, optionalInt);
    }

    @Override // org.apache.kafka.coordinator.share.ShareCoordinator
    public void onNewMetadataImage(MetadataImage metadataImage, MetadataDelta metadataDelta) {
        throwIfNotActive();
        this.runtime.onNewMetadataImage(metadataImage, metadataDelta);
    }

    TopicPartition topicPartitionFor(SharePartitionKey sharePartitionKey) {
        return new TopicPartition("__share_group_state", partitionFor(sharePartitionKey));
    }

    private static <P> boolean isEmpty(List<P> list) {
        return list == null || list.isEmpty();
    }

    private void throwIfNotActive() {
        if (!this.isActive.get()) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
    }
}
