package org.gradoop.flink.model.impl.operators.aggregation;

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.functions.epgm.ElementsOfSelectedGraphs;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.flink.model.impl.operators.aggregation.functions.AggregateTransactions;
import org.gradoop.flink.model.impl.operators.aggregation.functions.ApplyAggregateElements;
import org.gradoop.flink.model.impl.operators.aggregation.functions.CombinePartitionApplyAggregates;
import org.gradoop.flink.model.impl.operators.aggregation.functions.SetAggregateProperties;

/* loaded from: input_file:org/gradoop/flink/model/impl/operators/aggregation/ApplyAggregation.class */
public class ApplyAggregation implements ApplicableUnaryGraphToGraphOperator {
    private final Set<AggregateFunction> aggregateFunctions;

    public ApplyAggregation(AggregateFunction... aggregateFunctionArr) {
        for (AggregateFunction aggregateFunction : aggregateFunctionArr) {
            Preconditions.checkNotNull(aggregateFunction);
        }
        this.aggregateFunctions = new HashSet(Arrays.asList(aggregateFunctionArr));
    }

    @Override // org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator
    public GraphCollection executeForGVELayout(GraphCollection graphCollection) {
        DataSet<GraphHead> graphHeads = graphCollection.getGraphHeads();
        MapOperator map = graphHeads.map(new Id());
        return graphCollection.getConfig().getGraphCollectionFactory().fromDataSets((DataSet<GraphHead>) graphHeads.coGroup(aggregateVertices(graphCollection.getVertices(), map).union(aggregateEdges(graphCollection.getEdges(), map)).groupBy(new int[]{0}).reduceGroup(new CombinePartitionApplyAggregates(this.aggregateFunctions))).where(new Id()).equalTo(new int[]{0}).with(new SetAggregateProperties(this.aggregateFunctions)), graphCollection.getVertices(), graphCollection.getEdges());
    }

    @Override // org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator
    public GraphCollection executeForTxLayout(GraphCollection graphCollection) {
        return graphCollection.getConfig().getGraphCollectionFactory().fromTransactions((DataSet<GraphTransaction>) graphCollection.getGraphTransactions().map(new AggregateTransactions(this.aggregateFunctions)));
    }

    private DataSet<Tuple2<GradoopId, Map<String, PropertyValue>>> aggregateVertices(DataSet<Vertex> dataSet, DataSet<GradoopId> dataSet2) {
        return dataSet.flatMap(new ElementsOfSelectedGraphs()).withBroadcastSet(dataSet2, "graphIds").groupBy(new int[]{0}).combineGroup(new ApplyAggregateElements((Set) this.aggregateFunctions.stream().filter((v0) -> {
            return v0.isVertexAggregation();
        }).collect(Collectors.toSet())));
    }

    private DataSet<Tuple2<GradoopId, Map<String, PropertyValue>>> aggregateEdges(DataSet<Edge> dataSet, DataSet<GradoopId> dataSet2) {
        return dataSet.flatMap(new ElementsOfSelectedGraphs()).withBroadcastSet(dataSet2, "graphIds").groupBy(new int[]{0}).combineGroup(new ApplyAggregateElements((Set) this.aggregateFunctions.stream().filter((v0) -> {
            return v0.isEdgeAggregation();
        }).collect(Collectors.toSet())));
    }
}
