package streaming.core.stream;

import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import streaming.log.WowLog;
import tech.mlsql.common.utils.log.Logging;
import tech.mlsql.job.JobManager$;
import tech.mlsql.job.MLSQLJobInfo;
import tech.mlsql.job.MLSQLJobType$;

/* compiled from: MLSQLStreamManager.scala */
@ScalaSignature(bytes = "\u0006\u0001I4Aa\u0002\u0005\u0001\u001f!)\u0001\u0007\u0001C\u0001c!)A\u0007\u0001C\u0001k!)1\n\u0001C\u0001\u0019\")\u0001\f\u0001C!3\")a\r\u0001C!O\")A\u000e\u0001C![\nYR\nT*R\u0019N#(/Z1nS:<\u0017+^3ss2K7\u000f^3oKJT!!\u0003\u0006\u0002\rM$(/Z1n\u0015\tYA\"\u0001\u0003d_J,'\"A\u0007\u0002\u0013M$(/Z1nS:<7\u0001A\n\u0005\u0001Ai2\u0006\u0005\u0002\u001275\t!C\u0003\u0002\u000e')\u0011A#F\u0001\u0004gFd'B\u0001\f\u0018\u0003\u0015\u0019\b/\u0019:l\u0015\tA\u0012$\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u00025\u0005\u0019qN]4\n\u0005q\u0011\"AF*ue\u0016\fW.\u001b8h#V,'/\u001f'jgR,g.\u001a:\u0011\u0005yIS\"A\u0010\u000b\u0005\u0001\n\u0013a\u00017pO*\u0011!eI\u0001\u0006kRLGn\u001d\u0006\u0003I\u0015\naaY8n[>t'B\u0001\u0014(\u0003\u0015iGn]9m\u0015\u0005A\u0013\u0001\u0002;fG\"L!AK\u0010\u0003\u000f1{wmZ5oOB\u0011AFL\u0007\u0002[)\u0011\u0001\u0005D\u0005\u0003_5\u0012aaV8x\u0019><\u0017A\u0002\u001fj]&$h\bF\u00013!\t\u0019\u0004!D\u0001\t\u0003\u0011\u0019\u0018P\\2\u0015\u0007Yb\u0014\n\u0005\u00028u5\t\u0001HC\u0001:\u0003\u0015\u00198-\u00197b\u0013\tY\u0004HA\u0002B]fDQ!\u0010\u0002A\u0002y\nAA\\1nKB\u0011qH\u0012\b\u0003\u0001\u0012\u0003\"!\u0011\u001d\u000e\u0003\tS!a\u0011\b\u0002\rq\u0012xn\u001c;?\u0013\t)\u0005(\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u000f\"\u0013aa\u0015;sS:<'BA#9\u0011\u0015Q%\u00011\u0001?\u0003\tIG-\u0001\u0004hKRTuN\u0019\u000b\u0003\u001bZ\u00032a\u000e(Q\u0013\ty\u0005H\u0001\u0004PaRLwN\u001c\t\u0003#Rk\u0011A\u0015\u0006\u0003'\u0016\n1A[8c\u0013\t)&K\u0001\u0007N\u0019N\u000bFJS8c\u0013:4w\u000eC\u0003X\u0007\u0001\u0007a(A\u0004he>,\b/\u00133\u0002\u001d=t\u0017+^3ssN#\u0018M\u001d;fIR\u0011!,\u0018\t\u0003omK!\u0001\u0018\u001d\u0003\tUs\u0017\u000e\u001e\u0005\u0006=\u0012\u0001\raX\u0001\u0006KZ,g\u000e\u001e\t\u0003A\u000et!!E1\n\u0005\t\u0014\u0012AF*ue\u0016\fW.\u001b8h#V,'/\u001f'jgR,g.\u001a:\n\u0005\u0011,'!E)vKJL8\u000b^1si\u0016$WI^3oi*\u0011!ME\u0001\u0010_:\fV/\u001a:z!J|wM]3tgR\u0011!\f\u001b\u0005\u0006=\u0016\u0001\r!\u001b\t\u0003A*L!a[3\u0003%E+XM]=Qe><'/Z:t\u000bZ,g\u000e^\u0001\u0012_:\fV/\u001a:z)\u0016\u0014X.\u001b8bi\u0016$GC\u0001.o\u0011\u0015qf\u00011\u0001p!\t\u0001\u0007/\u0003\u0002rK\n!\u0012+^3ssR+'/\\5oCR,G-\u0012<f]R\u0004")
/* loaded from: input_file:streaming/core/stream/MLSQLStreamingQueryListener.class */
public class MLSQLStreamingQueryListener extends StreamingQueryListener implements Logging, WowLog {
    private transient Logger tech$mlsql$common$utils$log$Logging$$log_;

    @Override // streaming.log.WowLog
    public String format(String str, boolean z) {
        String format;
        format = format(str, z);
        return format;
    }

    @Override // streaming.log.WowLog
    public boolean format$default$2() {
        boolean format$default$2;
        format$default$2 = format$default$2();
        return format$default$2;
    }

    @Override // streaming.log.WowLog
    public String wow_format(String str) {
        String wow_format;
        wow_format = wow_format(str);
        return wow_format;
    }

    @Override // streaming.log.WowLog
    public String format_exception(Exception exc) {
        String format_exception;
        format_exception = format_exception(exc);
        return format_exception;
    }

    @Override // streaming.log.WowLog
    public String format_throwable(Throwable th, boolean z) {
        String format_throwable;
        format_throwable = format_throwable(th, z);
        return format_throwable;
    }

    @Override // streaming.log.WowLog
    public boolean format_throwable$default$2() {
        boolean format_throwable$default$2;
        format_throwable$default$2 = format_throwable$default$2();
        return format_throwable$default$2;
    }

    @Override // streaming.log.WowLog
    public String format_cause(Exception exc) {
        String format_cause;
        format_cause = format_cause(exc);
        return format_cause;
    }

    @Override // streaming.log.WowLog
    public void format_full_exception(ArrayBuffer<String> arrayBuffer, Exception exc, boolean z) {
        format_full_exception(arrayBuffer, exc, z);
    }

    @Override // streaming.log.WowLog
    public boolean format_full_exception$default$3() {
        boolean format_full_exception$default$3;
        format_full_exception$default$3 = format_full_exception$default$3();
        return format_full_exception$default$3;
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public Logger tech$mlsql$common$utils$log$Logging$$log_() {
        return this.tech$mlsql$common$utils$log$Logging$$log_;
    }

    public void tech$mlsql$common$utils$log$Logging$$log__$eq(Logger logger) {
        this.tech$mlsql$common$utils$log$Logging$$log_ = logger;
    }

    public Object sync(String str, String str2) {
        BoxedUnit addJobManually;
        BoxedUnit addJobManually2;
        Some headOption = ((TraversableLike) JobManager$.MODULE$.getJobInfo().filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$sync$1(str, tuple2));
        })).headOption();
        if (headOption instanceof Some) {
            Tuple2 tuple22 = (Tuple2) headOption.value();
            String groupId = ((MLSQLJobInfo) tuple22._2()).groupId();
            if (groupId != null ? groupId.equals(str2) : str2 == null) {
                addJobManually2 = BoxedUnit.UNIT;
            } else {
                logInfo(() -> {
                    return this.format(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(154).append("\n               |JobManager:").append(((MLSQLJobInfo) tuple22._2()).jobName()).append("\n               |Spark streams: ").append(str).append("\n               |Action: sync\n               |Reason:: Job is not synced before.\n             ").toString())).stripMargin(), this.format$default$2());
                });
                addJobManually2 = JobManager$.MODULE$.addJobManually(((MLSQLJobInfo) tuple22._2()).copy(((MLSQLJobInfo) tuple22._2()).copy$default$1(), ((MLSQLJobInfo) tuple22._2()).copy$default$2(), ((MLSQLJobInfo) tuple22._2()).copy$default$3(), ((MLSQLJobInfo) tuple22._2()).copy$default$4(), str2, ((MLSQLJobInfo) tuple22._2()).copy$default$6(), ((MLSQLJobInfo) tuple22._2()).copy$default$7(), ((MLSQLJobInfo) tuple22._2()).copy$default$8()));
            }
            addJobManually = addJobManually2;
        } else {
            if (!None$.MODULE$.equals(headOption)) {
                throw new MatchError(headOption);
            }
            Some job = MLSQLStreamManager$.MODULE$.getJob(str2);
            if (!(job instanceof Some)) {
                if (None$.MODULE$.equals(job)) {
                    throw new RuntimeException(new StringBuilder(26).append("MLSQL have unsync stream: ").append(str).toString());
                }
                throw new MatchError(job);
            }
            MLSQLJobInfo mLSQLJobInfo = (MLSQLJobInfo) job.value();
            logInfo(() -> {
                return this.format(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(188).append("\n                 |JobManager:").append(mLSQLJobInfo.jobName()).append("\n                 |Spark streams: ").append(str).append("\n                 |Action: sync\n                 |Reason:: Job is not in JobManager but in MLSQLStreamManager.\n             ").toString())).stripMargin(), this.format$default$2());
            });
            addJobManually = JobManager$.MODULE$.addJobManually(mLSQLJobInfo);
        }
        return addJobManually;
    }

    public Option<MLSQLJobInfo> getJob(String str) {
        return ((TraversableLike) ((TraversableLike) JobManager$.MODULE$.getJobInfo().filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$getJob$1(str, tuple2));
        })).map(tuple22 -> {
            return (MLSQLJobInfo) tuple22._2();
        }, Iterable$.MODULE$.canBuildFrom())).headOption();
    }

    public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
        sync(queryStartedEvent.name(), queryStartedEvent.id().toString());
        Some headOption = Option$.MODULE$.option2Iterable(getJob(queryStartedEvent.id().toString())).headOption();
        if (headOption instanceof Some) {
            MLSQLJobInfo mLSQLJobInfo = (MLSQLJobInfo) headOption.value();
            MLSQLStreamManager$.MODULE$.runEvent(MLSQLStreamEventName$.MODULE$.started(), mLSQLJobInfo.jobName(), mLSQLExternalStreamListener -> {
                $anonfun$onQueryStarted$1(mLSQLJobInfo, mLSQLExternalStreamListener);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(headOption)) {
                throw new MatchError(headOption);
            }
            logError(() -> {
                return this.format(new StringBuilder(64).append("Stream job [").append(queryStartedEvent.id().toString()).append("] is started. But we can not found it in JobManager.").toString(), this.format$default$2());
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
        String uuid = queryProgressEvent.progress().id().toString();
        sync(queryProgressEvent.progress().name(), uuid);
        Some headOption = Option$.MODULE$.option2Iterable(getJob(queryProgressEvent.progress().id().toString())).headOption();
        if (headOption instanceof Some) {
            MLSQLJobInfo mLSQLJobInfo = (MLSQLJobInfo) headOption.value();
            MLSQLStreamManager$.MODULE$.runEvent(MLSQLStreamEventName$.MODULE$.progress(), mLSQLJobInfo.jobName(), mLSQLExternalStreamListener -> {
                $anonfun$onQueryProgress$1(mLSQLJobInfo, queryProgressEvent, mLSQLExternalStreamListener);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(headOption)) {
                throw new MatchError(headOption);
            }
            logError(() -> {
                return this.format(new StringBuilder(64).append("Stream job [").append(uuid).append("] is running. But we can not found it in JobManager.").toString(), this.format$default$2());
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
        Option option;
        String uuid = queryTerminatedEvent.id().toString();
        ObjectRef create = ObjectRef.create(new ArrayBuffer());
        Some headOption = Option$.MODULE$.option2Iterable(getJob(uuid)).headOption();
        if (headOption instanceof Some) {
            MLSQLJobInfo mLSQLJobInfo = (MLSQLJobInfo) headOption.value();
            MLSQLStreamManager$.MODULE$.runEvent(MLSQLStreamEventName$.MODULE$.terminated(), mLSQLJobInfo.jobName(), mLSQLExternalStreamListener -> {
                $anonfun$onQueryTerminated$1(mLSQLJobInfo, mLSQLExternalStreamListener);
                return BoxedUnit.UNIT;
            });
            option = ((MapLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(MLSQLStreamManager$.MODULE$.listeners()).asScala()).get(mLSQLJobInfo.owner()).map(arrayBuffer -> {
                return (ArrayBuffer) ((TraversableLike) arrayBuffer.filter(mLSQLExternalStreamListener2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$onQueryTerminated$3(mLSQLJobInfo, mLSQLExternalStreamListener2));
                })).map(mLSQLExternalStreamListener3 -> {
                    return ((ArrayBuffer) create.elem).$plus$eq(mLSQLExternalStreamListener3.item().uuid());
                }, ArrayBuffer$.MODULE$.canBuildFrom());
            });
        } else {
            if (!None$.MODULE$.equals(headOption)) {
                throw new MatchError(headOption);
            }
            logError(() -> {
                return this.format(new StringBuilder(67).append("Stream job [").append(uuid).append("] is terminated. But we can not found it in JobManager.").toString(), this.format$default$2());
            });
            option = BoxedUnit.UNIT;
        }
        ((ArrayBuffer) create.elem).foreach(str -> {
            $anonfun$onQueryTerminated$6(str);
            return BoxedUnit.UNIT;
        });
        MLSQLStreamManager$.MODULE$.removeStore(uuid);
        Some headOption2 = ((TraversableLike) JobManager$.MODULE$.getJobInfo().filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$onQueryTerminated$7(uuid, tuple2));
        })).headOption();
        if (headOption2 instanceof Some) {
            JobManager$.MODULE$.removeJobManually((String) ((Tuple2) headOption2.value())._1());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(headOption2)) {
                throw new MatchError(headOption2);
            }
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$sync$1(String str, Tuple2 tuple2) {
        String jobType = ((MLSQLJobInfo) tuple2._2()).jobType();
        String STREAM = MLSQLJobType$.MODULE$.STREAM();
        if (jobType != null ? jobType.equals(STREAM) : STREAM == null) {
            String jobName = ((MLSQLJobInfo) tuple2._2()).jobName();
            if (jobName != null ? jobName.equals(str) : str == null) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$getJob$1(String str, Tuple2 tuple2) {
        String groupId = ((MLSQLJobInfo) tuple2._2()).groupId();
        return groupId != null ? groupId.equals(str) : str == null;
    }

    public static final /* synthetic */ void $anonfun$onQueryStarted$1(MLSQLJobInfo mLSQLJobInfo, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        mLSQLExternalStreamListener.send((Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("streamName"), mLSQLJobInfo.jobName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jsonContent"), "{}")})));
    }

    public static final /* synthetic */ void $anonfun$onQueryProgress$1(MLSQLJobInfo mLSQLJobInfo, StreamingQueryListener.QueryProgressEvent queryProgressEvent, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        mLSQLExternalStreamListener.send((Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("streamName"), mLSQLJobInfo.jobName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jsonContent"), queryProgressEvent.progress().json())})));
    }

    public static final /* synthetic */ void $anonfun$onQueryTerminated$1(MLSQLJobInfo mLSQLJobInfo, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        mLSQLExternalStreamListener.send((Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("streamName"), mLSQLJobInfo.jobName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jsonContent"), "{}")})));
    }

    public static final /* synthetic */ boolean $anonfun$onQueryTerminated$3(MLSQLJobInfo mLSQLJobInfo, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        String streamName = mLSQLExternalStreamListener.item().streamName();
        String jobName = mLSQLJobInfo.jobName();
        return streamName != null ? streamName.equals(jobName) : jobName == null;
    }

    public static final /* synthetic */ void $anonfun$onQueryTerminated$6(String str) {
        MLSQLStreamManager$.MODULE$.removeListener(str);
    }

    public static final /* synthetic */ boolean $anonfun$onQueryTerminated$7(String str, Tuple2 tuple2) {
        String jobType = ((MLSQLJobInfo) tuple2._2()).jobType();
        String STREAM = MLSQLJobType$.MODULE$.STREAM();
        if (jobType != null ? jobType.equals(STREAM) : STREAM == null) {
            String groupId = ((MLSQLJobInfo) tuple2._2()).groupId();
            if (groupId != null ? groupId.equals(str) : str == null) {
                return true;
            }
        }
        return false;
    }

    public MLSQLStreamingQueryListener() {
        Logging.$init$(this);
        WowLog.$init$(this);
    }
}
