package org.reaktivity.nukleus.http_push.internal.watcher;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.agrona.LangUtil;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.Reaktive;
import org.reaktivity.nukleus.http_push.internal.Context;
import org.reaktivity.nukleus.http_push.internal.router.Router;

@Reaktive
/* loaded from: input_file:org/reaktivity/nukleus/http_push/internal/watcher/Watcher.class */
public final class Watcher implements Nukleus {
    private final WatchService service;
    private final Path streamsPath;
    private final Set<Path> sourcePaths = new HashSet();
    private final Consumer<WatchEvent<?>> handleEvent;
    private Router router;
    private WatchKey streamsKey;

    public Watcher(Context context) {
        this.service = context.watchService();
        this.streamsPath = context.streamsPath();
        HashMap hashMap = new HashMap();
        hashMap.put(StandardWatchEventKinds.OVERFLOW, this::handleOverflow);
        hashMap.put(StandardWatchEventKinds.ENTRY_CREATE, this::handleCreate);
        hashMap.put(StandardWatchEventKinds.ENTRY_DELETE, this::handleDelete);
        this.handleEvent = watchEvent -> {
            ((Consumer) hashMap.getOrDefault(watchEvent.kind(), this::handleUnexpected)).accept(watchEvent);
        };
    }

    public void setRouter(Router router) {
        this.router = router;
    }

    public String name() {
        return "watcher";
    }

    public int process() {
        registerIfNecessary();
        int i = 0;
        WatchKey poll = this.service.poll();
        if (poll != null && poll.isValid()) {
            List<WatchEvent<?>> pollEvents = poll.pollEvents();
            i = 0 + pollEvents.size();
            pollEvents.forEach(this.handleEvent);
            poll.reset();
        }
        return i;
    }

    public void close() throws Exception {
        this.streamsKey = null;
    }

    private void handleCreate(WatchEvent<?> watchEvent) {
        handleCreatePath((Path) watchEvent.context());
    }

    private void handleCreatePath(Path path) {
        if (this.sourcePaths.add(path)) {
            this.router.onReadable(path);
        }
    }

    private void handleDelete(WatchEvent<?> watchEvent) {
        handleDeletePath((Path) watchEvent.context());
    }

    private void handleDeletePath(Path path) {
        if (this.sourcePaths.remove(path)) {
            this.router.onExpired(path);
        }
    }

    private void handleOverflow(WatchEvent<?> watchEvent) {
        syncWithFileSystem();
    }

    private void handleUnexpected(WatchEvent<?> watchEvent) {
    }

    private void registerIfNecessary() {
        if (this.streamsKey == null) {
            try {
                this.streamsPath.toFile().mkdirs();
                this.streamsKey = this.streamsPath.register(this.service, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.OVERFLOW);
                syncWithFileSystem();
            } catch (IOException e) {
                LangUtil.rethrowUnchecked(e);
            }
        }
    }

    private void syncWithFileSystem() {
        this.sourcePaths.stream().filter(path -> {
            return !path.toFile().exists();
        }).forEach(this::handleDeletePath);
        Arrays.stream(this.streamsPath.toFile().listFiles()).map(file -> {
            return file.toPath();
        }).forEach(this::handleCreatePath);
    }
}
