package io.mantisrx.control.clutch;

import io.mantisrx.control.IActuator;
import io.mantisrx.control.clutch.Clutch;
import io.mantisrx.control.controllers.ErrorComputer;
import io.mantisrx.control.controllers.Integrator;
import io.mantisrx.control.controllers.PIDController;
import io.mantisrx.shaded.com.google.common.util.concurrent.AtomicDouble;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/control/clutch/ExperimentalControlLoop.class */
public class ExperimentalControlLoop implements Observable.Transformer<Event, Double> {
    private static final Logger log = LoggerFactory.getLogger(ExperimentalControlLoop.class);
    private final ClutchConfiguration config;
    private final IActuator actuator;
    private final AtomicDouble dampener;
    private final AtomicLong cooldownTimestamp;
    private final AtomicLong currentSize;
    private final AtomicDouble lastLag;
    private final Observable<Integer> size;
    private final IRpsMetricComputer rpsMetricComputer;
    private final IScaleComputer scaleComputer;
    private long cooldownMillis;

    /* loaded from: input_file:io/mantisrx/control/clutch/ExperimentalControlLoop$DefaultRpsMetricComputer.class */
    public static class DefaultRpsMetricComputer implements IRpsMetricComputer {
        private double lastLag = 0.0d;

        public Double apply(ClutchConfiguration clutchConfiguration, Map<Clutch.Metric, Double> map) {
            double doubleValue = map.get(Clutch.Metric.RPS).doubleValue();
            double doubleValue2 = map.get(Clutch.Metric.LAG).doubleValue();
            double doubleValue3 = map.get(Clutch.Metric.SOURCEJOB_DROP).doubleValue();
            double doubleValue4 = map.get(Clutch.Metric.DROPS).doubleValue();
            double d = doubleValue2 - this.lastLag;
            this.lastLag = doubleValue2;
            return Double.valueOf(doubleValue + d + doubleValue3 + doubleValue4);
        }
    }

    /* loaded from: input_file:io/mantisrx/control/clutch/ExperimentalControlLoop$DefaultScaleComputer.class */
    public static class DefaultScaleComputer implements IScaleComputer {
        public Double apply(ClutchConfiguration clutchConfiguration, Long l, Double d) {
            return Double.valueOf(Math.min(clutchConfiguration.maxSize, Math.max(clutchConfiguration.minSize, l.longValue() + d.doubleValue())));
        }
    }

    public ExperimentalControlLoop(ClutchConfiguration clutchConfiguration, IActuator iActuator, AtomicLong atomicLong, Observable<Long> observable, Observable<Integer> observable2) {
        this(clutchConfiguration, iActuator, atomicLong, new AtomicDouble(1.0d), observable, observable2, new DefaultRpsMetricComputer(), new DefaultScaleComputer());
    }

    public ExperimentalControlLoop(ClutchConfiguration clutchConfiguration, IActuator iActuator, AtomicLong atomicLong, AtomicDouble atomicDouble, Observable<Long> observable, Observable<Integer> observable2, IRpsMetricComputer iRpsMetricComputer, IScaleComputer iScaleComputer) {
        this.config = clutchConfiguration;
        this.actuator = iActuator;
        this.dampener = atomicDouble;
        this.cooldownMillis = clutchConfiguration.getCooldownUnits().toMillis(clutchConfiguration.cooldownInterval);
        this.cooldownTimestamp = new AtomicLong(System.currentTimeMillis());
        this.currentSize = atomicLong;
        this.lastLag = new AtomicDouble(0.0d);
        this.size = observable2;
        this.rpsMetricComputer = iRpsMetricComputer;
        this.scaleComputer = iScaleComputer;
    }

    public Observable<Double> call(Observable<Event> observable) {
        Observable share = observable.share();
        Observable mergeWith = Observable.just(new Event(Clutch.Metric.LAG, 0.0d)).mergeWith(share.filter(event -> {
            return Boolean.valueOf(event.getMetric() == Clutch.Metric.LAG);
        }));
        Observable mergeWith2 = Observable.just(new Event(Clutch.Metric.DROPS, 0.0d)).mergeWith(share.filter(event2 -> {
            return Boolean.valueOf(event2.getMetric() == Clutch.Metric.DROPS);
        }));
        Observable mergeWith3 = Observable.just(new Event(Clutch.Metric.SOURCEJOB_DROP, 0.0d)).mergeWith(share.filter(event3 -> {
            return Boolean.valueOf(event3.getMetric() == Clutch.Metric.SOURCEJOB_DROP);
        }));
        Observable filter = share.filter(event4 -> {
            return Boolean.valueOf(event4.getMetric() == Clutch.Metric.RPS);
        });
        Integrator integrator = new Integrator(0.0d, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, this.config.integralDecay);
        Observable<Integer> observable2 = this.size;
        AtomicLong atomicLong = this.currentSize;
        atomicLong.getClass();
        Subscription subscribe = observable2.doOnNext((v1) -> {
            r1.set(v1);
        }).doOnNext(num -> {
            this.cooldownTimestamp.set(System.currentTimeMillis());
        }).doOnNext(num2 -> {
            log.info("Clutch received new scheduling update with {} workers.", num2);
        }).subscribe();
        return filter.withLatestFrom(mergeWith, mergeWith2, mergeWith3, (event5, event6, event7, event8) -> {
            HashMap hashMap = new HashMap();
            hashMap.put(event5.getMetric(), Double.valueOf(event5.getValue()));
            hashMap.put(event6.getMetric(), Double.valueOf(event6.getValue()));
            hashMap.put(event7.getMetric(), Double.valueOf(event7.getValue()));
            hashMap.put(event8.getMetric(), Double.valueOf(event8.getValue()));
            return hashMap;
        }).doOnNext(map -> {
            log.info("Latest metrics: {}", map);
        }).map(map2 -> {
            return (Double) this.rpsMetricComputer.apply(this.config, map2);
        }).lift(new ErrorComputer(this.config.setPoint, true, ((Double) this.config.rope._1).doubleValue(), ((Double) this.config.rope._2).doubleValue())).lift(new PIDController(Double.valueOf(this.config.kp), Double.valueOf(this.config.ki), Double.valueOf(this.config.kd), Double.valueOf(1.0d), new AtomicDouble(1.0d), this.config.integralDecay)).doOnNext(d -> {
            log.info("PID controller output: {}", d);
        }).lift(integrator).doOnNext(d2 -> {
            log.info("Integral: {}", d2);
        }).filter(d3 -> {
            return Boolean.valueOf(this.cooldownMillis == 0 || this.cooldownTimestamp.get() <= System.currentTimeMillis() - this.cooldownMillis);
        }).map(d4 -> {
            return (Double) this.scaleComputer.apply(this.config, Long.valueOf(this.currentSize.get()), d4);
        }).doOnNext(d5 -> {
            log.info("New desired size: {}, existing size: {}", d5, Long.valueOf(this.currentSize.get()));
        }).filter(d6 -> {
            return Boolean.valueOf(this.currentSize.get() != Math.round(Math.ceil(d6.doubleValue())));
        }).lift(this.actuator).doOnNext(d7 -> {
            this.currentSize.set(Math.round(Math.ceil(d7.doubleValue())));
        }).doOnNext(d8 -> {
            integrator.setSum(0.0d);
        }).doOnNext(d9 -> {
            this.cooldownTimestamp.set(System.currentTimeMillis());
        }).doOnUnsubscribe(() -> {
            subscribe.unsubscribe();
        });
    }

    protected void setCooldownMillis(long j) {
        this.cooldownMillis = j;
    }
}
