package org.emmalanguage.api.flink;

import java.net.URI;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.io.TypeSerializerOutputFormat;
import org.apache.flink.api.scala.CrossDataSet;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.emmalanguage.api.DataBag;
import org.emmalanguage.api.FlinkDataSet;
import org.emmalanguage.api.FlinkDataSet$;
import org.emmalanguage.api.Group;
import org.emmalanguage.api.Meta;
import org.emmalanguage.api.Meta$;
import org.emmalanguage.api.Meta$Projections$;
import org.emmalanguage.api.alg.Alg;
import org.emmalanguage.api.backend.ComprehensionCombinators;
import org.emmalanguage.api.backend.Runtime;
import org.emmalanguage.api.flink.FlinkOps;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.immutable.List$;
import scala.collection.immutable.Stream$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;

/* compiled from: FlinkOps.scala */
/* loaded from: input_file:org/emmalanguage/api/flink/FlinkOps$.class */
public final class FlinkOps$ implements ComprehensionCombinators<ExecutionEnvironment>, Runtime<ExecutionEnvironment> {
    public static final FlinkOps$ MODULE$ = null;
    private final URI tempBase;
    private final Iterator<String> tempNames;

    static {
        new FlinkOps$();
    }

    @Override // org.emmalanguage.api.backend.ComprehensionCombinators
    public <A, B> DataBag<Tuple2<A, B>> cross(DataBag<A> dataBag, DataBag<B> dataBag2, final Meta<A> meta, final Meta<B> meta2, ExecutionEnvironment executionEnvironment) {
        FlinkOps.DataSetExtractor dataSetExtractor = new FlinkOps.DataSetExtractor(executionEnvironment);
        Tuple2 tuple2 = new Tuple2(dataBag, dataBag2);
        if (tuple2 != null) {
            DataBag<A> dataBag3 = (DataBag) tuple2._1();
            DataBag<A> dataBag4 = (DataBag) tuple2._2();
            Option<DataSet<A>> unapply = dataSetExtractor.unapply(dataBag3, meta);
            if (!unapply.isEmpty()) {
                DataSet dataSet = (DataSet) unapply.get();
                Option<DataSet<A>> unapply2 = dataSetExtractor.unapply(dataBag4, meta2);
                if (!unapply2.isEmpty()) {
                    DataSet dataSet2 = (DataSet) unapply2.get();
                    FlinkDataSet$ flinkDataSet$ = FlinkDataSet$.MODULE$;
                    CrossDataSet cross = dataSet.cross(dataSet2);
                    Meta$ meta$ = Meta$.MODULE$;
                    Meta$Projections$ meta$Projections$ = Meta$Projections$.MODULE$;
                    Meta$ meta$2 = Meta$.MODULE$;
                    TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
                    return flinkDataSet$.wrap(cross, meta$.apply(meta$Projections$.ttagFor(meta$2.apply(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator(meta, meta2) { // from class: org.emmalanguage.api.flink.FlinkOps$$typecreator2$1
                        private final Meta evidence$1$1;
                        private final Meta evidence$2$1;

                        public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                            Universe universe2 = mirror.universe();
                            return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{Meta$Projections$.MODULE$.ttagFor(this.evidence$1$1).in(mirror).tpe(), Meta$Projections$.MODULE$.ttagFor(this.evidence$2$1).in(mirror).tpe()})));
                        }

                        {
                            this.evidence$1$1 = meta;
                            this.evidence$2$1 = meta2;
                        }
                    })))));
                }
            }
        }
        throw new MatchError(tuple2);
    }

    @Override // org.emmalanguage.api.backend.ComprehensionCombinators
    public <A, B, K> DataBag<Tuple2<A, B>> equiJoin(Function1<A, K> function1, Function1<B, K> function12, DataBag<A> dataBag, DataBag<B> dataBag2, final Meta<A> meta, final Meta<B> meta2, Meta<K> meta3, ExecutionEnvironment executionEnvironment) {
        FlinkOps.DataSetExtractor dataSetExtractor = new FlinkOps.DataSetExtractor(executionEnvironment);
        Tuple2 tuple2 = new Tuple2(dataBag, dataBag2);
        if (tuple2 != null) {
            DataBag<A> dataBag3 = (DataBag) tuple2._1();
            DataBag<A> dataBag4 = (DataBag) tuple2._2();
            Option<DataSet<A>> unapply = dataSetExtractor.unapply(dataBag3, meta);
            if (!unapply.isEmpty()) {
                DataSet dataSet = (DataSet) unapply.get();
                Option<DataSet<A>> unapply2 = dataSetExtractor.unapply(dataBag4, meta2);
                if (!unapply2.isEmpty()) {
                    DataSet dataSet2 = (DataSet) unapply2.get();
                    FlinkDataSet$ flinkDataSet$ = FlinkDataSet$.MODULE$;
                    DataSet<A> dataSet3 = (DataSet) dataSet.join(dataSet2).where(function1, FlinkDataSet$.MODULE$.typeInfoForType(meta3)).equalTo(function12, FlinkDataSet$.MODULE$.typeInfoForType(meta3));
                    Meta$ meta$ = Meta$.MODULE$;
                    Meta$Projections$ meta$Projections$ = Meta$Projections$.MODULE$;
                    Meta$ meta$2 = Meta$.MODULE$;
                    TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
                    return flinkDataSet$.wrap(dataSet3, meta$.apply(meta$Projections$.ttagFor(meta$2.apply(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator(meta, meta2) { // from class: org.emmalanguage.api.flink.FlinkOps$$typecreator7$1
                        private final Meta evidence$3$1;
                        private final Meta evidence$4$1;

                        public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                            Universe universe2 = mirror.universe();
                            return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{Meta$Projections$.MODULE$.ttagFor(this.evidence$3$1).in(mirror).tpe(), Meta$Projections$.MODULE$.ttagFor(this.evidence$4$1).in(mirror).tpe()})));
                        }

                        {
                            this.evidence$3$1 = meta;
                            this.evidence$4$1 = meta2;
                        }
                    })))));
                }
            }
        }
        throw new MatchError(tuple2);
    }

    @Override // org.emmalanguage.api.backend.Runtime
    public <A> DataBag<A> cache(DataBag<A> dataBag, Meta<A> meta, ExecutionEnvironment executionEnvironment) {
        DataBag<A> dataBag2;
        if (dataBag instanceof FlinkDataSet) {
            FlinkDataSet flinkDataSet = (FlinkDataSet) dataBag;
            String sink = sink(flinkDataSet.rep(), meta, executionEnvironment);
            flinkDataSet.env().execute(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"emma-cache-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{sink})));
            dataBag2 = FlinkDataSet$.MODULE$.wrap(source(sink, meta, executionEnvironment), meta);
        } else {
            dataBag2 = dataBag;
        }
        return dataBag2;
    }

    @Override // org.emmalanguage.api.backend.Runtime
    public <A, B, K> DataBag<Group<K, B>> foldGroup(DataBag<A> dataBag, Function1<A, K> function1, Alg<A, B> alg, Meta<A> meta, final Meta<B> meta2, final Meta<K> meta3, ExecutionEnvironment executionEnvironment) {
        if (!(dataBag instanceof FlinkDataSet)) {
            throw new MatchError(dataBag);
        }
        FlinkDataSet$ flinkDataSet$ = FlinkDataSet$.MODULE$;
        DataSet<A> rep = ((FlinkDataSet) dataBag).rep();
        FlinkOps$$anonfun$foldGroup$1 flinkOps$$anonfun$foldGroup$1 = new FlinkOps$$anonfun$foldGroup$1(function1, alg);
        FlinkDataSet$ flinkDataSet$2 = FlinkDataSet$.MODULE$;
        Meta$ meta$ = Meta$.MODULE$;
        Meta$Projections$ meta$Projections$ = Meta$Projections$.MODULE$;
        Meta$ meta$2 = Meta$.MODULE$;
        TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
        TypeInformation typeInfoForType = flinkDataSet$2.typeInfoForType(meta$.apply(meta$Projections$.ttagFor(meta$2.apply(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator(meta2, meta3) { // from class: org.emmalanguage.api.flink.FlinkOps$$typecreator8$1
            private final Meta evidence$9$1;
            private final Meta evidence$10$1;

            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("org.emmalanguage.api").asModule().moduleClass()), mirror.staticClass("org.emmalanguage.api.Group"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{Meta$Projections$.MODULE$.ttagFor(this.evidence$10$1).in(mirror).tpe(), Meta$Projections$.MODULE$.ttagFor(this.evidence$9$1).in(mirror).tpe()})));
            }

            {
                this.evidence$9$1 = meta2;
                this.evidence$10$1 = meta3;
            }
        })))));
        Meta$Projections$ meta$Projections$2 = Meta$Projections$.MODULE$;
        Meta$ meta$3 = Meta$.MODULE$;
        Meta$Projections$ meta$Projections$3 = Meta$Projections$.MODULE$;
        Meta$ meta$4 = Meta$.MODULE$;
        TypeTags universe2 = scala.reflect.runtime.package$.MODULE$.universe();
        DataSet<A> reduce = rep.map(flinkOps$$anonfun$foldGroup$1, typeInfoForType, meta$Projections$2.ctagFor(meta$3.apply(meta$Projections$3.ttagFor(meta$4.apply(universe2.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator(meta2, meta3) { // from class: org.emmalanguage.api.flink.FlinkOps$$typecreator9$1
            private final Meta evidence$9$1;
            private final Meta evidence$10$1;

            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe3 = mirror.universe();
                return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().ThisType(mirror.staticPackage("org.emmalanguage.api").asModule().moduleClass()), mirror.staticClass("org.emmalanguage.api.Group"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{Meta$Projections$.MODULE$.ttagFor(this.evidence$10$1).in(mirror).tpe(), Meta$Projections$.MODULE$.ttagFor(this.evidence$9$1).in(mirror).tpe()})));
            }

            {
                this.evidence$9$1 = meta2;
                this.evidence$10$1 = meta3;
            }
        })))))).groupBy("key", Predef$.MODULE$.wrapRefArray(new String[0])).reduce(new FlinkOps$$anonfun$foldGroup$2(alg));
        Meta$ meta$5 = Meta$.MODULE$;
        Meta$Projections$ meta$Projections$4 = Meta$Projections$.MODULE$;
        Meta$ meta$6 = Meta$.MODULE$;
        TypeTags universe3 = scala.reflect.runtime.package$.MODULE$.universe();
        return flinkDataSet$.wrap(reduce, meta$5.apply(meta$Projections$4.ttagFor(meta$6.apply(universe3.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator(meta2, meta3) { // from class: org.emmalanguage.api.flink.FlinkOps$$typecreator12$1
            private final Meta evidence$9$1;
            private final Meta evidence$10$1;

            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe4 = mirror.universe();
                return universe4.internal().reificationSupport().TypeRef(universe4.internal().reificationSupport().ThisType(mirror.staticPackage("org.emmalanguage.api").asModule().moduleClass()), mirror.staticClass("org.emmalanguage.api.Group"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{Meta$Projections$.MODULE$.ttagFor(this.evidence$10$1).in(mirror).tpe(), Meta$Projections$.MODULE$.ttagFor(this.evidence$9$1).in(mirror).tpe()})));
            }

            {
                this.evidence$9$1 = meta2;
                this.evidence$10$1 = meta3;
            }
        })))));
    }

    private <A> String sink(DataSet<A> dataSet, Meta<A> meta, ExecutionEnvironment executionEnvironment) {
        TypeInformation typeInfoForType = FlinkDataSet$.MODULE$.typeInfoForType(meta);
        String str = (String) tempNames().next();
        TypeSerializerOutputFormat typeSerializerOutputFormat = new TypeSerializerOutputFormat();
        typeSerializerOutputFormat.setInputType(typeInfoForType, executionEnvironment.getConfig());
        typeSerializerOutputFormat.setSerializer(typeInfoForType.createSerializer(executionEnvironment.getConfig()));
        dataSet.write(typeSerializerOutputFormat, tempPath(str), FileSystem.WriteMode.OVERWRITE);
        return str;
    }

    private <A> DataSet<A> source(String str, Meta<A> meta, ExecutionEnvironment executionEnvironment) {
        String tempPath = tempPath(str);
        TypeSerializerInputFormat typeSerializerInputFormat = new TypeSerializerInputFormat(FlinkDataSet$.MODULE$.typeInfoForType(meta));
        typeSerializerInputFormat.setFilePath(tempPath);
        return executionEnvironment.readFile(typeSerializerInputFormat, tempPath, Meta$Projections$.MODULE$.ctagFor(meta), FlinkDataSet$.MODULE$.typeInfoForType(meta));
    }

    private URI tempBase() {
        return this.tempBase;
    }

    public Iterator<String> tempNames() {
        return this.tempNames;
    }

    public String tempPath(String str) {
        return tempBase().resolve(str).toURL().toString();
    }

    private FlinkOps$() {
        MODULE$ = this;
        this.tempBase = new URI(System.getProperty("emma.flink.temp-base", "file:///tmp/emma/flink-temp/"));
        this.tempNames = ((IterableLike) scala.package$.MODULE$.Stream().iterate(BoxesRunTime.boxToInteger(0), new FlinkOps$$anonfun$1()).map(new FlinkOps$$anonfun$2(), Stream$.MODULE$.canBuildFrom())).toIterator();
    }
}
