package io.activej.fs.cluster;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.common.Checks;
import io.activej.common.CollectorsEx;
import io.activej.common.api.WithInitializer;
import io.activej.common.collection.Try;
import io.activej.common.exception.UncheckedException;
import io.activej.common.ref.RefInt;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.process.ChannelByteRanger;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.fs.FileMetadata;
import io.activej.fs.exception.FsIOException;
import io.activej.fs.exception.scalar.PathContainsFileException;
import io.activej.fs.util.RemoteFsUtils;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.promise.jmx.PromiseStats;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/fs/cluster/ClusterRepartitionController.class */
public final class ClusterRepartitionController implements WithInitializer<ClusterRepartitionController>, EventloopJmxBeanEx, EventloopService {
    private final Object localPartitionId;
    private final ActiveFs localFs;
    private final FsPartitions partitions;
    private Iterator<String> repartitionPlan;
    private boolean isRepartitioning;
    private long lastPlanRecalculation;

    @Nullable
    private SettablePromise<Void> closeCallback;
    private static final Logger logger = LoggerFactory.getLogger(ClusterRepartitionController.class);
    private static final boolean CHECK = Checks.isEnabled(ClusterRepartitionController.class);
    private static final Duration DEFAULT_PLAN_RECALCULATION_INTERVAL = Duration.ofMinutes(1);
    private static final Comparator<InfoResults> INFO_RESULTS_COMPARATOR = Comparator.comparingLong(infoResults -> {
        return infoResults.remoteMetadata.stream().filter((v0) -> {
            return Objects.isNull(v0);
        }).count() + (infoResults.isLocalMetaTheBest() ? 1 : 0);
    }).thenComparingLong(infoResults2 -> {
        return infoResults2.remoteMetadata.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).findAny().orElse(infoResults2.localMetadata).getSize();
    });
    private final AsyncSupplier<Void> repartition = AsyncSuppliers.reuse(this::doRepartition);
    private final List<String> processedFiles = new ArrayList();
    private String glob = "**";
    private Predicate<String> negativeGlobPredicate = str -> {
        return true;
    };
    private int replicationCount = 1;
    private long planRecalculationInterval = DEFAULT_PLAN_RECALCULATION_INTERVAL.toMillis();
    private int allFiles = 0;
    private int ensuredFiles = 0;
    private int failedFiles = 0;
    private Set<Object> lastAlivePartitionIds = Collections.emptySet();
    private final PromiseStats repartitionPromiseStats = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats singleFileRepartitionPromiseStats = PromiseStats.create(Duration.ofMinutes(5));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/fs/cluster/ClusterRepartitionController$InfoResults.class */
    public final class InfoResults implements Comparable<InfoResults> {
        final String name;
        final FileMetadata localMetadata;
        final List<FileMetadata> remoteMetadata;

        private InfoResults(@NotNull String str, @NotNull FileMetadata fileMetadata) {
            this.remoteMetadata = new ArrayList();
            this.name = str;
            this.localMetadata = fileMetadata;
        }

        public String getName() {
            return this.name;
        }

        boolean shouldBeProcessed() {
            return shouldBeUploaded() || shouldBeDeleted();
        }

        boolean shouldBeUploaded() {
            return isLocalMetaTheBest() && this.remoteMetadata.stream().anyMatch(fileMetadata -> {
                return fileMetadata == null || fileMetadata.getSize() < this.localMetadata.getSize();
            });
        }

        boolean shouldBeDeleted() {
            return this.remoteMetadata.size() == ClusterRepartitionController.this.replicationCount && this.remoteMetadata.stream().noneMatch(fileMetadata -> {
                return fileMetadata == null || fileMetadata.getSize() < this.localMetadata.getSize();
            });
        }

        boolean isLocalMetaTheBest() {
            return this.localMetadata.getSize() >= this.remoteMetadata.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).mapToLong((v0) -> {
                return v0.getSize();
            }).max().orElse(0L);
        }

        @Override // java.lang.Comparable
        public int compareTo(@NotNull InfoResults infoResults) {
            return ClusterRepartitionController.INFO_RESULTS_COMPARATOR.compare(this, infoResults);
        }
    }

    private ClusterRepartitionController(Object obj, ActiveFs activeFs, FsPartitions fsPartitions) {
        this.localPartitionId = obj;
        this.localFs = activeFs;
        this.partitions = fsPartitions;
    }

    public static ClusterRepartitionController create(Object obj, FsPartitions fsPartitions) {
        return new ClusterRepartitionController(obj, fsPartitions.getPartitions().get(obj), fsPartitions);
    }

    public ClusterRepartitionController withGlob(@NotNull String str) {
        this.glob = str;
        return this;
    }

    public ClusterRepartitionController withNegativeGlob(@NotNull String str) {
        if (str.isEmpty()) {
            return this;
        }
        if (RemoteFsUtils.isWildcard(str)) {
            PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("glob:" + str);
            this.negativeGlobPredicate = str2 -> {
                return !pathMatcher.matches(Paths.get(str2, new String[0]));
            };
        } else {
            this.negativeGlobPredicate = str3 -> {
                return !str3.equals(str);
            };
        }
        return this;
    }

    public ClusterRepartitionController withReplicationCount(int i) {
        this.replicationCount = i;
        return this;
    }

    public ClusterRepartitionController withPlanRecalculationInterval(Duration duration) {
        this.planRecalculationInterval = duration.toMillis();
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.partitions.getEventloop();
    }

    public Object getLocalPartitionId() {
        return this.localPartitionId;
    }

    public ActiveFs getLocalFs() {
        return this.localFs;
    }

    public FsPartitions getPartitions() {
        return this.partitions;
    }

    public int getReplicationCount() {
        return this.replicationCount;
    }

    @NotNull
    public Promise<Void> repartition() {
        return this.repartition.get();
    }

    @NotNull
    private Promise<Void> doRepartition() {
        if (CHECK) {
            Checks.checkState(this.partitions.getEventloop().inEventloopThread(), "Should be called from eventloop thread");
        }
        this.isRepartitioning = true;
        this.processedFiles.clear();
        return recalculatePlan().then(() -> {
            return Promises.repeat(() -> {
                return recalculatePlanIfNeeded().then(() -> {
                    if (!this.repartitionPlan.hasNext()) {
                        return Promise.of(false);
                    }
                    String next = this.repartitionPlan.next();
                    return this.localFs.info(next).then(fileMetadata -> {
                        if (fileMetadata != null) {
                            return repartitionFile(next, fileMetadata);
                        }
                        logger.warn("File '{}' that should be repartitioned has been deleted", next);
                        return Promise.of(false);
                    }).whenComplete(this.singleFileRepartitionPromiseStats.recordStats()).then(bool -> {
                        this.processedFiles.add(next);
                        if (bool.booleanValue()) {
                            this.ensuredFiles++;
                        } else {
                            this.failedFiles++;
                        }
                        return Promise.complete();
                    }).map(r2 -> {
                        return true;
                    });
                });
            });
        }).whenComplete(() -> {
            this.isRepartitioning = false;
        }).whenComplete(this.repartitionPromiseStats.recordStats()).thenEx((r9, th) -> {
            if (th != null) {
                logger.warn("forced repartition finish, {} files ensured, {} errored, {} untouched", new Object[]{Integer.valueOf(this.ensuredFiles), Integer.valueOf(this.failedFiles), Integer.valueOf((this.allFiles - this.ensuredFiles) - this.failedFiles), th});
            } else {
                logger.info("repartition finished, {} files ensured, {} errored", Integer.valueOf(this.ensuredFiles), Integer.valueOf(this.failedFiles));
            }
            if (this.closeCallback != null) {
                this.closeCallback.accept(r9, th);
            }
            return Promise.complete();
        });
    }

    private Promise<Void> recalculatePlanIfNeeded() {
        if (!updateLastAlivePartitionIds() && getEventloop().currentTimeMillis() - this.lastPlanRecalculation <= this.planRecalculationInterval) {
            return Promise.complete();
        }
        return recalculatePlan();
    }

    private Promise<Void> recalculatePlan() {
        return this.localFs.list(this.glob).then(map -> {
            checkEnoughAlivePartitions();
            this.allFiles = map.size();
            Map map = (Map) map.entrySet().stream().filter(entry -> {
                return this.negativeGlobPredicate.test((String) entry.getKey());
            }).filter(entry2 -> {
                return !this.processedFiles.contains(entry2.getKey());
            }).collect(CollectorsEx.toMap());
            HashMap hashMap = new HashMap();
            for (String str : map.keySet()) {
                List<Object> subList = this.partitions.select(str).subList(0, this.replicationCount);
                subList.remove(this.localPartitionId);
                Iterator<Object> it = subList.iterator();
                while (it.hasNext()) {
                    ((Set) hashMap.computeIfAbsent(it.next(), obj -> {
                        return new HashSet();
                    })).add(str);
                }
            }
            return Promises.reduce((Map) map.entrySet().stream().map(entry3 -> {
                return new InfoResults((String) entry3.getKey(), (FileMetadata) entry3.getValue());
            }).collect(Collectors.toMap((v0) -> {
                return v0.getName();
            }, Function.identity())), (map2, map3) -> {
                map.keySet().forEach(str2 -> {
                    ((InfoResults) map2.get(str2)).remoteMetadata.add((FileMetadata) map3.get(str2));
                });
            }, (v0) -> {
                return v0.values();
            }, hashMap.size(), hashMap.entrySet().stream().map(entry4 -> {
                return this.partitions.get(entry4.getKey()).infoAll((Set) entry4.getValue()).whenException(th -> {
                    this.partitions.markIfDead(entry4.getKey(), th);
                });
            }).iterator()).whenResult(collection -> {
                this.repartitionPlan = collection.stream().sorted().filter((v0) -> {
                    return v0.shouldBeProcessed();
                }).map((v0) -> {
                    return v0.getName();
                }).iterator();
                this.lastPlanRecalculation = getEventloop().currentTimeMillis();
                updateLastAlivePartitionIds();
            }).thenEx((collection2, th) -> {
                if (th == null) {
                    return Promise.complete();
                }
                logger.warn("Failed to recalculate repartition plan, retrying in 1 second", th);
                return Promises.delay(Duration.ofSeconds(1L)).then(this::recalculatePlan);
            });
        });
    }

    private Promise<Boolean> repartitionFile(String str, FileMetadata fileMetadata) {
        this.partitions.markAlive(this.localPartitionId);
        checkEnoughAlivePartitions();
        List<Object> subList = this.partitions.select(str).subList(0, this.replicationCount);
        ArrayList arrayList = new ArrayList(subList);
        boolean remove = arrayList.remove(this.localPartitionId);
        return getInfoResults(str, fileMetadata, arrayList).then(infoResults -> {
            if (infoResults == null) {
                return Promise.of(false);
            }
            if (infoResults.shouldBeDeleted()) {
                logger.trace("deleting file {} locally", fileMetadata);
                return this.localFs.delete(str).map(r7 -> {
                    logger.info("handled file {} (ensured on {})", fileMetadata, arrayList);
                    return true;
                });
            }
            if (!infoResults.shouldBeUploaded()) {
                logger.info("handled file {} (ensured on {})", fileMetadata, arrayList);
                return Promise.of(true);
            }
            logger.trace("uploading file {} to partitions {}...", fileMetadata, infoResults);
            long asLong = infoResults.remoteMetadata.stream().mapToLong(fileMetadata2 -> {
                if (fileMetadata2 == null) {
                    return 0L;
                }
                return fileMetadata2.getSize();
            }).min().getAsLong();
            ChannelByteSplitter channelByteSplitter = (ChannelByteSplitter) ChannelByteSplitter.create(1).withInput(ChannelSupplier.ofPromise(this.localFs.download(str, asLong, fileMetadata.getSize())));
            RefInt refInt = new RefInt(0);
            return Promises.toList(infoResults.remoteMetadata.stream().map(fileMetadata3 -> {
                int i = refInt.value;
                refInt.value = i + 1;
                Object obj = arrayList.get(i);
                if (fileMetadata3 != null && fileMetadata3.getSize() >= fileMetadata.getSize()) {
                    return Promise.of(Try.of((Object) null));
                }
                ActiveFs activeFs = this.partitions.get(obj);
                return activeFs == null ? Promise.ofException(new FsIOException(ClusterRepartitionController.class, "File system '" + obj + "' is not alive")) : Promise.ofCallback(settablePromise -> {
                    channelByteSplitter.addOutput().set(ChannelConsumer.ofPromise(Promise.complete().then(() -> {
                        return fileMetadata3 == null ? activeFs.upload(str, fileMetadata.getSize()) : activeFs.append(str, fileMetadata3.getSize()).map(channelConsumer -> {
                            return (ChannelConsumer) channelConsumer.transformWith(ChannelByteRanger.drop(fileMetadata3.getSize() - asLong));
                        });
                    }).whenException(th -> {
                        if (th instanceof PathContainsFileException) {
                            logger.error("Cluster contains files with clashing paths", th);
                        }
                    })).withAcknowledgement(promise -> {
                        return promise.whenResult(() -> {
                            logger.trace("file {} uploaded to '{}'", fileMetadata, obj);
                        }).whenException(th2 -> {
                            logger.warn("failed uploading to partition {}", obj, th2);
                            this.partitions.markIfDead(obj, th2);
                        }).whenComplete(settablePromise);
                    }));
                });
            }).map((v0) -> {
                return v0.toTry();
            })).then(list -> {
                if (!list.stream().allMatch((v0) -> {
                    return v0.isSuccess();
                })) {
                    logger.warn("failed uploading file {}, skipping", fileMetadata);
                    return Promise.of(false);
                }
                if (remove) {
                    logger.info("handled file {} (ensured on {}, uploaded to {})", new Object[]{fileMetadata, subList, infoResults});
                    return Promise.of(true);
                }
                logger.trace("deleting file {} on {}", fileMetadata, this.localPartitionId);
                return this.localFs.delete(str).map(r10 -> {
                    logger.info("handled file {} (ensured on {}, uploaded to {})", new Object[]{fileMetadata, subList, infoResults});
                    return true;
                });
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "repartitionFile", new Object[]{fileMetadata}));
    }

    private Promise<InfoResults> getInfoResults(String str, FileMetadata fileMetadata, List<Object> list) {
        InfoResults infoResults = new InfoResults(str, fileMetadata);
        return Promises.toList(list.stream().map(obj -> {
            return this.partitions.get(obj).info(str).whenComplete((fileMetadata2, th) -> {
                if (th == null) {
                    infoResults.remoteMetadata.add(fileMetadata2);
                } else {
                    logger.warn("failed connecting to partition {}", obj, th);
                    this.partitions.markIfDead(obj, th);
                }
            }).toTry();
        })).map(list2 -> {
            if (list2.stream().allMatch((v0) -> {
                return v0.isSuccess();
            })) {
                return infoResults;
            }
            logger.warn("failed figuring out partitions for file {}, skipping", fileMetadata);
            return null;
        });
    }

    private boolean updateLastAlivePartitionIds() {
        HashSet hashSet = new HashSet(this.partitions.getAlivePartitions().keySet());
        if (this.lastAlivePartitionIds.equals(hashSet)) {
            return false;
        }
        this.lastAlivePartitionIds = hashSet;
        return true;
    }

    private void checkEnoughAlivePartitions() {
        if (this.partitions.getAlivePartitions().size() < this.replicationCount) {
            throw new UncheckedException(new FsIOException(ClusterRepartitionController.class, "Not enough alive partitions"));
        }
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return isRepartitioning() ? Promise.ofCallback(settablePromise -> {
            this.closeCallback = settablePromise;
        }) : Promise.complete();
    }

    @JmxOperation(description = "start repartitioning")
    public void startRepartition() {
        repartition();
    }

    @JmxAttribute
    public boolean isRepartitioning() {
        return this.isRepartitioning;
    }

    @JmxAttribute
    public PromiseStats getRepartitionPromiseStats() {
        return this.repartitionPromiseStats;
    }

    @JmxAttribute
    public PromiseStats getSingleFileRepartitionPromiseStats() {
        return this.singleFileRepartitionPromiseStats;
    }

    @JmxAttribute
    public int getLastFilesToRepartition() {
        return this.allFiles;
    }

    @JmxAttribute
    public int getLastEnsuredFiles() {
        return this.ensuredFiles;
    }

    @JmxAttribute
    public int getLastFailedFiles() {
        return this.failedFiles;
    }
}
