package streaming.core.stream;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSeqLike;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ResizableArray;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import streaming.dsl.ScriptSQLExec$;
import streaming.log.WowLog;
import tech.mlsql.common.utils.log.Logging;
import tech.mlsql.job.MLSQLJobInfo;

/* compiled from: MLSQLStreamManager.scala */
/* loaded from: input_file:streaming/core/stream/MLSQLStreamManager$.class */
public final class MLSQLStreamManager$ implements Logging, WowLog {
    public static MLSQLStreamManager$ MODULE$;
    private final ExecutorService executors;
    private final ConcurrentHashMap<String, MLSQLJobInfo> store;
    private final ConcurrentHashMap<String, ArrayBuffer<MLSQLExternalStreamListener>> _listenerStore;
    private transient Logger tech$mlsql$common$utils$log$Logging$$log_;

    static {
        new MLSQLStreamManager$();
    }

    @Override // streaming.log.WowLog
    public String format(String str, boolean z) {
        String format;
        format = format(str, z);
        return format;
    }

    @Override // streaming.log.WowLog
    public boolean format$default$2() {
        boolean format$default$2;
        format$default$2 = format$default$2();
        return format$default$2;
    }

    @Override // streaming.log.WowLog
    public String wow_format(String str) {
        String wow_format;
        wow_format = wow_format(str);
        return wow_format;
    }

    @Override // streaming.log.WowLog
    public String format_exception(Exception exc) {
        String format_exception;
        format_exception = format_exception(exc);
        return format_exception;
    }

    @Override // streaming.log.WowLog
    public String format_throwable(Throwable th, boolean z) {
        String format_throwable;
        format_throwable = format_throwable(th, z);
        return format_throwable;
    }

    @Override // streaming.log.WowLog
    public boolean format_throwable$default$2() {
        boolean format_throwable$default$2;
        format_throwable$default$2 = format_throwable$default$2();
        return format_throwable$default$2;
    }

    @Override // streaming.log.WowLog
    public String format_cause(Exception exc) {
        String format_cause;
        format_cause = format_cause(exc);
        return format_cause;
    }

    @Override // streaming.log.WowLog
    public void format_full_exception(ArrayBuffer<String> arrayBuffer, Exception exc, boolean z) {
        format_full_exception(arrayBuffer, exc, z);
    }

    @Override // streaming.log.WowLog
    public boolean format_full_exception$default$3() {
        boolean format_full_exception$default$3;
        format_full_exception$default$3 = format_full_exception$default$3();
        return format_full_exception$default$3;
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public Logger tech$mlsql$common$utils$log$Logging$$log_() {
        return this.tech$mlsql$common$utils$log$Logging$$log_;
    }

    public void tech$mlsql$common$utils$log$Logging$$log__$eq(Logger logger) {
        this.tech$mlsql$common$utils$log$Logging$$log_ = logger;
    }

    public ExecutorService executors() {
        return this.executors;
    }

    private ConcurrentHashMap<String, MLSQLJobInfo> store() {
        return this.store;
    }

    private ConcurrentHashMap<String, ArrayBuffer<MLSQLExternalStreamListener>> _listenerStore() {
        return this._listenerStore;
    }

    public ConcurrentHashMap<String, ArrayBuffer<MLSQLExternalStreamListener>> listeners() {
        return _listenerStore();
    }

    public void runEvent(Enumeration.Value value, String str, Function1<MLSQLExternalStreamListener, BoxedUnit> function1) {
        ((IterableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(_listenerStore()).asScala()).foreach(tuple2 -> {
            $anonfun$runEvent$1(value, str, function1, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public synchronized ArrayBuffer<MLSQLExternalStreamListener> addListener(String str, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        ArrayBuffer<MLSQLExternalStreamListener> orDefault = _listenerStore().getOrDefault(str, ArrayBuffer$.MODULE$.apply(Nil$.MODULE$));
        orDefault.$plus$eq(mLSQLExternalStreamListener);
        return _listenerStore().put(str, orDefault);
    }

    public void removeListener(String str) {
        ((IterableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(_listenerStore()).asScala()).foreach(tuple2 -> {
            Object obj;
            Some headOption = ((TraversableLike) ((TraversableLike) tuple2._2()).filter(mLSQLExternalStreamListener -> {
                return BoxesRunTime.boxToBoolean($anonfun$removeListener$2(str, mLSQLExternalStreamListener));
            })).headOption();
            if (headOption instanceof Some) {
                obj = ((ArrayBuffer) tuple2._2()).remove(((GenSeqLike) tuple2._2()).indexOf((MLSQLExternalStreamListener) headOption.value()));
            } else {
                if (!None$.MODULE$.equals(headOption)) {
                    throw new MatchError(headOption);
                }
                obj = BoxedUnit.UNIT;
            }
            return obj;
        });
    }

    public MLSQLJobInfo addStore(MLSQLJobInfo mLSQLJobInfo) {
        return store().put(mLSQLJobInfo.groupId(), mLSQLJobInfo);
    }

    public MLSQLJobInfo removeStore(String str) {
        return store().remove(str);
    }

    public Option<MLSQLJobInfo> getJob(String str) {
        return ((MapLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(store()).asScala()).get(str);
    }

    public void start(SparkSession sparkSession) {
        logInfo(() -> {
            return "Start streaming job monitor....";
        });
        sparkSession.streams().addListener(new MLSQLStreamingQueryListener());
    }

    public void close() {
        store().clear();
    }

    public boolean isStream() {
        return ScriptSQLExec$.MODULE$.contextGetOrForTest().execListener().env().contains("streamName");
    }

    public static final /* synthetic */ boolean $anonfun$runEvent$2(Enumeration.Value value, String str, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        Enumeration.Value eventName = mLSQLExternalStreamListener.item().eventName();
        if (eventName != null ? eventName.equals(value) : value == null) {
            String streamName = mLSQLExternalStreamListener.item().streamName();
            if (streamName != null ? streamName.equals(str) : str == null) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ void $anonfun$runEvent$1(Enumeration.Value value, String str, Function1 function1, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        ((ResizableArray) ((ArrayBuffer) tuple2._2()).filter(mLSQLExternalStreamListener -> {
            return BoxesRunTime.boxToBoolean($anonfun$runEvent$2(value, str, mLSQLExternalStreamListener));
        })).foreach(mLSQLExternalStreamListener2 -> {
            function1.apply(mLSQLExternalStreamListener2);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$removeListener$2(String str, MLSQLExternalStreamListener mLSQLExternalStreamListener) {
        String uuid = mLSQLExternalStreamListener.item().uuid();
        return uuid != null ? uuid.equals(str) : str == null;
    }

    private MLSQLStreamManager$() {
        MODULE$ = this;
        Logging.$init$(this);
        WowLog.$init$(this);
        this.executors = Executors.newFixedThreadPool(30);
        this.store = new ConcurrentHashMap<>();
        this._listenerStore = new ConcurrentHashMap<>();
    }
}
