package bio.ferlab.datalake.spark3.loader;

import bio.ferlab.datalake.spark3.elasticsearch.ElasticSearchClient;
import bio.ferlab.datalake.spark3.elasticsearch.EsWriteOptions$;
import java.time.LocalDate;
import org.apache.hadoop.shaded.org.apache.http.HttpResponse;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.sql.package$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;
import scala.util.Try$;

/* compiled from: ElasticsearchLoader.scala */
/* loaded from: input_file:bio/ferlab/datalake/spark3/loader/ElasticsearchLoader$.class */
public final class ElasticsearchLoader$ implements Loader {
    public static ElasticsearchLoader$ MODULE$;
    private final Logger log;

    static {
        new ElasticsearchLoader$();
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Map<String, String> overwritePartition$default$7() {
        Map<String, String> overwritePartition$default$7;
        overwritePartition$default$7 = overwritePartition$default$7();
        return overwritePartition$default$7;
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Logger log() {
        return this.log;
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public void bio$ferlab$datalake$spark3$loader$Loader$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> read(String str, String str2, Map<String, String> map, Option<String> option, Option<String> option2, SparkSession sparkSession) {
        return sparkSession.sqlContext().read().format("es").load(str);
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> overwritePartition(String str, String str2, String str3, Dataset<Row> dataset, List<String> list, String str4, Map<String, String> map, SparkSession sparkSession) {
        ElasticSearchClient elasticSearchClient = new ElasticSearchClient((String) map.apply(EsWriteOptions$.MODULE$.ES_URL()), map.get(EsWriteOptions$.MODULE$.ES_USERNAME()), map.get(EsWriteOptions$.MODULE$.ES_PASSWORD()));
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.write.operation"), "index")}));
        map.get(EsWriteOptions$.MODULE$.ES_INDEX_TEMPLATE_PATH()).foreach(str5 -> {
            $anonfun$overwritePartition$1(str3, sparkSession, elasticSearchClient, str5);
            return BoxedUnit.UNIT;
        });
        package$.MODULE$.sparkDataFrameFunctions(dataset).saveToEs(new StringBuilder(5).append(str).append("/_doc").toString(), apply);
        publish(str3, str, publish$default$3(), elasticSearchClient);
        return dataset;
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> writeOnce(String str, String str2, String str3, Dataset<Row> dataset, List<String> list, String str4, Map<String, String> map, SparkSession sparkSession) {
        throw Predef$.MODULE$.$qmark$qmark$qmark();
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> insert(String str, String str2, String str3, Dataset<Row> dataset, List<String> list, String str4, Map<String, String> map, SparkSession sparkSession) {
        throw Predef$.MODULE$.$qmark$qmark$qmark();
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> upsert(String str, String str2, String str3, Dataset<Row> dataset, Seq<String> seq, List<String> list, String str4, Map<String, String> map, SparkSession sparkSession) {
        throw Predef$.MODULE$.$qmark$qmark$qmark();
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> scd1(String str, String str2, String str3, Dataset<Row> dataset, Seq<String> seq, String str4, String str5, String str6, List<String> list, String str7, Map<String, String> map, SparkSession sparkSession) {
        throw Predef$.MODULE$.$qmark$qmark$qmark();
    }

    @Override // bio.ferlab.datalake.spark3.loader.Loader
    public Dataset<Row> scd2(String str, String str2, String str3, Dataset<Row> dataset, Seq<String> seq, String str4, String str5, String str6, List<String> list, String str7, String str8, String str9, Map<String, String> map, LocalDate localDate, LocalDate localDate2, SparkSession sparkSession) {
        throw Predef$.MODULE$.$qmark$qmark$qmark();
    }

    public void publish(String str, String str2, Option<String> option, ElasticSearchClient elasticSearchClient) {
        Try$.MODULE$.apply(() -> {
            return elasticSearchClient.setAlias(new $colon.colon(str2, Nil$.MODULE$), Nil$.MODULE$, str);
        }).foreach(httpResponse -> {
            $anonfun$publish$2(str2, str, httpResponse);
            return BoxedUnit.UNIT;
        });
        Try$.MODULE$.apply(() -> {
            return elasticSearchClient.setAlias(Nil$.MODULE$, option.toList(), str);
        }).foreach(httpResponse2 -> {
            $anonfun$publish$4(option, str, httpResponse2);
            return BoxedUnit.UNIT;
        });
    }

    public Option<String> publish$default$3() {
        return None$.MODULE$;
    }

    public void setupIndex(String str, String str2, SparkSession sparkSession, ElasticSearchClient elasticSearchClient) {
        Try$.MODULE$.apply(() -> {
            MODULE$.log().info(new StringBuilder(36).append("ElasticSearch 'isRunning' status: [").append(elasticSearchClient.isRunning()).append("]").toString());
            MODULE$.log().info(new StringBuilder(37).append("ElasticSearch 'checkNodes' status: [").append(elasticSearchClient.checkNodeRoles()).append("]").toString());
            HttpResponse deleteIndex = elasticSearchClient.deleteIndex(str);
            MODULE$.log().info(new StringBuilder(20).append("DELETE INDEX[").append(str).append("] : ").append(deleteIndex.getStatusLine().getStatusCode()).append(" : ").append(deleteIndex.getStatusLine().getReasonPhrase()).toString());
        });
        HttpResponse template = elasticSearchClient.setTemplate(str2, sparkSession);
        log().info(new StringBuilder(20).append("SET TEMPLATE[").append(str2).append("] : ").append(template.getStatusLine().getStatusCode()).append(" : ").append(template.getStatusLine().getReasonPhrase()).toString());
    }

    public static final /* synthetic */ void $anonfun$overwritePartition$1(String str, SparkSession sparkSession, ElasticSearchClient elasticSearchClient, String str2) {
        MODULE$.setupIndex(str, str2, sparkSession, elasticSearchClient);
    }

    public static final /* synthetic */ void $anonfun$publish$2(String str, String str2, HttpResponse httpResponse) {
        MODULE$.log().info(new StringBuilder(10).append(str).append(" added to ").append(str2).toString());
    }

    public static final /* synthetic */ void $anonfun$publish$4(Option option, String str, HttpResponse httpResponse) {
        MODULE$.log().info(new StringBuilder(14).append(option.toList().mkString()).append(" removed from ").append(str).toString());
    }

    private ElasticsearchLoader$() {
        MODULE$ = this;
        bio$ferlab$datalake$spark3$loader$Loader$_setter_$log_$eq(LoggerFactory.getLogger(getClass().getCanonicalName()));
    }
}
