package com.netflix.mantis.samples.stage;

import com.netflix.mantis.samples.proto.RequestAggregation;
import com.netflix.mantis.samples.proto.RequestEvent;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.computation.GroupToScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observables.GroupedObservable;

/* loaded from: input_file:com/netflix/mantis/samples/stage/AggregationStage.class */
public class AggregationStage implements GroupToScalarComputation<String, RequestEvent, RequestAggregation> {
    private static final Logger log = LoggerFactory.getLogger(AggregationStage.class);
    public static final String AGGREGATION_DURATION_MSEC_PARAM = "AggregationDurationMsec";
    int aggregationDurationMsec;

    private Observable<? extends RequestAggregation> aggregate(GroupedObservable<String, MantisGroup<String, RequestEvent>> groupedObservable) {
        return groupedObservable.reduce(RequestAggregation.builder().build(), (requestAggregation, mantisGroup) -> {
            requestAggregation.setCount(requestAggregation.getCount() + ((RequestEvent) mantisGroup.getValue()).getLatency());
            requestAggregation.setPath((String) groupedObservable.getKey());
            return requestAggregation;
        }).doOnNext(requestAggregation2 -> {
            log.debug("Generated aggregate {}", requestAggregation2);
        });
    }

    public Observable<RequestAggregation> call(Context context, Observable<MantisGroup<String, RequestEvent>> observable) {
        return observable.window(this.aggregationDurationMsec, TimeUnit.MILLISECONDS).flatMap(observable2 -> {
            return observable2.groupBy((v0) -> {
                return v0.getKeyValue();
            }).flatMap(this::aggregate);
        });
    }

    public void init(Context context) {
        this.aggregationDurationMsec = ((Integer) context.getParameters().get(AGGREGATION_DURATION_MSEC_PARAM, 1000)).intValue();
    }

    public static GroupToScalar.Config<String, RequestEvent, RequestAggregation> config() {
        return new GroupToScalar.Config().description("sum events for a path").codec(RequestAggregation.requestAggregationCodec()).withParameters(getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new IntParameter().name(AGGREGATION_DURATION_MSEC_PARAM).description("window size for aggregation").validator(Validators.range(100, 10000)).defaultValue(5000).build());
        return arrayList;
    }
}
