package org.mixql.engine.core;

import com.github.nscala_time.time.Imports$;
import com.github.nscala_time.time.RichReadableInstant$;
import com.github.nscala_time.time.RichReadableInterval$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.joda.time.DateTime;
import org.mixql.engine.core.logger.ModuleLogger;
import org.mixql.remote.RemoteMessageConverter;
import org.mixql.remote.messages.Message;
import org.mixql.remote.messages.broker.IBrokerSender;
import org.mixql.remote.messages.broker.PlatformPongHeartBeat;
import org.mixql.remote.messages.client.Execute;
import org.mixql.remote.messages.client.ExecuteFunction;
import org.mixql.remote.messages.client.GetDefinedFunctions;
import org.mixql.remote.messages.client.IModuleReceiver;
import org.mixql.remote.messages.client.IWorkerReceiver;
import org.mixql.remote.messages.client.ShutDown;
import org.mixql.remote.messages.module.ExecuteResult;
import org.mixql.remote.messages.module.ExecuteResultFailed;
import org.mixql.remote.messages.module.ExecutedFunctionResult;
import org.mixql.remote.messages.module.ExecutedFunctionResultFailed;
import org.mixql.remote.messages.module.GetDefinedFunctionsError;
import org.mixql.remote.messages.module.IModuleSendToClient;
import org.mixql.remote.messages.module.toBroker.EngineFailed;
import org.mixql.remote.messages.module.toBroker.EngineIsReady;
import org.mixql.remote.messages.module.toBroker.EnginePingHeartBeat;
import org.mixql.remote.messages.module.toBroker.IBrokerReceiverFromModule;
import org.mixql.remote.messages.module.worker.IWorkerSendToClient;
import org.mixql.remote.messages.module.worker.SendMsgToPlatform;
import org.mixql.remote.messages.module.worker.WorkerFinished;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import scala.Function0;
import scala.Function2;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Random$;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.matching.Regex;

/* compiled from: Module.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u0005g\u0001\u0002\u001b6\u0001yB\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\t\u0015\u0002\u0011\t\u0011)A\u0005\u0017\"Aa\u000b\u0001B\u0001B\u0003%1\n\u0003\u0005X\u0001\t\u0005\t\u0015!\u0003Y\u0011!Y\u0006A!A!\u0002\u0017a\u0006\"B1\u0001\t\u0003\u0011\u0007b\u00026\u0001\u0005\u0004%\ta\u001b\u0005\u0007k\u0002\u0001\u000b\u0011\u00027\t\u000fY\u0004\u0001\u0019!C\u0001o\"I\u0011Q\u0001\u0001A\u0002\u0013\u0005\u0011q\u0001\u0005\b\u0003'\u0001\u0001\u0015)\u0003y\u0011%\t)\u0002\u0001a\u0001\n\u0007\t9\u0002C\u0005\u0002 \u0001\u0001\r\u0011\"\u0001\u0002\"!A\u0011Q\u0005\u0001!B\u0013\tI\u0002C\u0005\u0002(\u0001\u0001\r\u0011\"\u0001\u0002*!I\u0011\u0011\u0007\u0001A\u0002\u0013\u0005\u00111\u0007\u0005\t\u0003o\u0001\u0001\u0015)\u0003\u0002,!I\u0011\u0011\b\u0001A\u0002\u0013\u0005\u0011\u0011\u0006\u0005\n\u0003w\u0001\u0001\u0019!C\u0001\u0003{A\u0001\"!\u0011\u0001A\u0003&\u00111\u0006\u0005\n\u0003\u0007\u0002!\u0019!C\u0001\u0003\u000bB\u0001\"!\u0014\u0001A\u0003%\u0011q\t\u0005\n\u0003\u001f\u0002!\u0019!C\u0001\u0003\u000bB\u0001\"!\u0015\u0001A\u0003%\u0011q\t\u0005\n\u0003'\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0016\u0001A\u0003%\u0011q\t\u0005\n\u0003/\u0002\u0001\u0019!C\u0005\u00033B\u0011\"a\"\u0001\u0001\u0004%I!!#\t\u0011\u00055\u0005\u0001)Q\u0005\u00037B\u0011\"a$\u0001\u0005\u0004%I!!%\t\u000f\u0005M\u0005\u0001)A\u00051\"I\u0011Q\u0013\u0001A\u0002\u0013%\u0011\u0011\u0013\u0005\n\u0003/\u0003\u0001\u0019!C\u0005\u00033Cq!!(\u0001A\u0003&\u0001\fC\u0004\u0002 \u0002!\t!!)\t\u000f\u0005\r\u0006\u0001\"\u0003\u0002&\"9\u00111\u0018\u0001\u0005\n\u0005u\u0006bBAg\u0001\u0011\u0005\u0011q\u001a\u0005\b\u0003C\u0004A\u0011AAr\u0011\u001d\ti\u000f\u0001C\u0001\u0003_Dq!!?\u0001\t\u0003\tY\u0010C\u0004\u00034\u0001!\tA!\u000e\t\u000f\t-\u0003\u0001\"\u0001\u0003N!9!1\f\u0001\u0005\u0002\tu\u0003\"\u0003B1\u0001\t\u0007I\u0011\u0001B2\u0011!\u0011)\b\u0001Q\u0001\n\t\u0015\u0004\"\u0003B<\u0001\t\u0007I\u0011\u0001B=\u0011!\u00119\t\u0001Q\u0001\n\tm\u0004b\u0002BE\u0001\u0011\u0005!1\u0012\u0005\b\u0005\u001b\u0003A\u0011AAQ\u0011\u001d\u0011y\t\u0001C\u0001\u0005#\u0013a!T8ek2,'B\u0001\u001c8\u0003\u0011\u0019wN]3\u000b\u0005aJ\u0014AB3oO&tWM\u0003\u0002;w\u0005)Q.\u001b=rY*\tA(A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001\u007fA\u0011\u0001iQ\u0007\u0002\u0003*\t!)A\u0003tG\u0006d\u0017-\u0003\u0002E\u0003\n1\u0011I\\=SK\u001a\f\u0001\"\u001a=fGV$xN\u001d\t\u0003\u000f\"k\u0011!N\u0005\u0003\u0013V\u0012q\"S'pIVdW-\u0012=fGV$xN]\u0001\tS\u0012,g\u000e^5usB\u0011Aj\u0015\b\u0003\u001bF\u0003\"AT!\u000e\u0003=S!\u0001U\u001f\u0002\rq\u0012xn\u001c;?\u0013\t\u0011\u0016)\u0001\u0004Qe\u0016$WMZ\u0005\u0003)V\u0013aa\u0015;sS:<'B\u0001*B\u0003\u0011Awn\u001d;\u0002\tA|'\u000f\u001e\t\u0003\u0001fK!AW!\u0003\u0007%sG/\u0001\u0004m_\u001e<WM\u001d\t\u0003;~k\u0011A\u0018\u0006\u00037VJ!\u0001\u00190\u0003\u00195{G-\u001e7f\u0019><w-\u001a:\u0002\rqJg.\u001b;?)\u0015\u0019gm\u001a5j)\t!W\r\u0005\u0002H\u0001!)1L\u0002a\u00029\")QI\u0002a\u0001\r\")!J\u0002a\u0001\u0017\")aK\u0002a\u0001\u0017\")qK\u0002a\u00011\u000611m\u001c8gS\u001e,\u0012\u0001\u001c\t\u0003[Nl\u0011A\u001c\u0006\u0003U>T!\u0001]9\u0002\u0011QL\b/Z:bM\u0016T\u0011A]\u0001\u0004G>l\u0017B\u0001;o\u0005\u0019\u0019uN\u001c4jO\u000691m\u001c8gS\u001e\u0004\u0013aA2uqV\t\u0001\u0010\u0005\u0002z\u007f:\u0011!0`\u0007\u0002w*\u0011ApO\u0001\u0007u\u0016\u0014x.\\9\n\u0005y\\\u0018a\u0001.N#&!\u0011\u0011AA\u0002\u0005\u001d\u0019uN\u001c;fqRT!A`>\u0002\u000f\r$\bp\u0018\u0013fcR!\u0011\u0011BA\b!\r\u0001\u00151B\u0005\u0004\u0003\u001b\t%\u0001B+oSRD\u0001\"!\u0005\u000b\u0003\u0003\u0005\r\u0001_\u0001\u0004q\u0012\n\u0014\u0001B2uq\u0002\naa]3sm\u0016\u0014XCAA\r!\rI\u00181D\u0005\u0005\u0003;\t\u0019A\u0001\u0004T_\u000e\\W\r^\u0001\u000bg\u0016\u0014h/\u001a:`I\u0015\fH\u0003BA\u0005\u0003GA\u0011\"!\u0005\u000e\u0003\u0003\u0005\r!!\u0007\u0002\u000fM,'O^3sA\u00051\u0001o\u001c7mKJ,\"!a\u000b\u0011\u0007e\fi#\u0003\u0003\u00020\u0005\r!A\u0002)pY2,'/\u0001\u0006q_2dWM]0%KF$B!!\u0003\u00026!I\u0011\u0011\u0003\t\u0002\u0002\u0003\u0007\u00111F\u0001\ba>dG.\u001a:!\u000319xN]6feB{G\u000e\\3s\u0003A9xN]6feB{G\u000e\\3s?\u0012*\u0017\u000f\u0006\u0003\u0002\n\u0005}\u0002\"CA\t'\u0005\u0005\t\u0019AA\u0016\u000359xN]6feB{G\u000e\\3sA\u0005i\u0001o\u001c7mKJ$\u0016.\\3pkR,\"!a\u0012\u0011\u0007\u0001\u000bI%C\u0002\u0002L\u0005\u0013A\u0001T8oO\u0006q\u0001o\u001c7mKJ$\u0016.\\3pkR\u0004\u0013aE<pe.,'\u000fU8mY\u0016\u0014H+[7f_V$\u0018\u0001F<pe.,'\u000fU8mY\u0016\u0014H+[7f_V$\b%A\tiK\u0006\u0014HOQ3bi&sG/\u001a:wC2\f!\u0003[3beR\u0014U-\u0019;J]R,'O^1mA\u0005a\u0001O]8dKN\u001c8\u000b^1siV\u0011\u00111\f\t\u0005\u0003;\nyH\u0004\u0003\u0002`\u0005ed\u0002BA1\u0003grA!a\u0019\u0002n9!\u0011QMA5\u001d\rq\u0015qM\u0005\u0002e&\u0019\u00111N9\u0002\r\u001dLG\u000f[;c\u0013\u0011\ty'!\u001d\u0002\u00179\u001c8-\u00197b?RLW.\u001a\u0006\u0004\u0003W\n\u0018\u0002BA;\u0003o\nA\u0001^5nK*!\u0011qNA9\u0013\u0011\tY(! \u0002\u000f%k\u0007o\u001c:ug*!\u0011QOA<\u0013\u0011\t\t)a!\u0003\u0011\u0011\u000bG/\u001a+j[\u0016LA!!\"\u0002~\tYA+\u001f9f\u00136\u0004xN\u001d;t\u0003A\u0001(o\\2fgN\u001cF/\u0019:u?\u0012*\u0017\u000f\u0006\u0003\u0002\n\u0005-\u0005\"CA\t9\u0005\u0005\t\u0019AA.\u00035\u0001(o\\2fgN\u001cF/\u0019:uA\u0005aA.\u001b<f]\u0016\u001c8/\u00138jiV\t\u0001,A\u0007mSZ,g.Z:t\u0013:LG\u000fI\u0001\tY&4XM\\3tg\u0006aA.\u001b<f]\u0016\u001c8o\u0018\u0013fcR!\u0011\u0011BAN\u0011!\t\t\"IA\u0001\u0002\u0004A\u0016!\u00037jm\u0016tWm]:!\u0003-\u0019H/\u0019:u'\u0016\u0014h/\u001a:\u0015\u0005\u0005%\u0011\u0001\u0007:fC\u000e$xJ\u001c*fG\u0016Lg/\u001a3Ce>\\WM]'tOR!\u0011\u0011BAT\u0011\u001d\tI\u000b\na\u0001\u0003W\u000bq!\\3tg\u0006<W\r\u0005\u0003\u0002.\u0006]VBAAX\u0015\u0011\t\t,a-\u0002\u00115,7o]1hKNT1!!.:\u0003\u0019\u0011X-\\8uK&!\u0011\u0011XAX\u0005\u001diUm]:bO\u0016\f1D]3bGR|eNU3dK&4X\rZ'tO\u001a{'/\u00128hS:,G\u0003BA\u0005\u0003\u007fCq!!+&\u0001\u0004\t\t\r\u0005\u0003\u0002D\u0006%WBAAc\u0015\u0011\t9-a,\u0002\r\rd\u0017.\u001a8u\u0013\u0011\tY-!2\u0003\u001f%ku\u000eZ;mKJ+7-Z5wKJ\f1c]3oI6+7o]1hKR{wk\u001c:lKJ$B!!5\u0002XB\u0019\u0001)a5\n\u0007\u0005U\u0017IA\u0004C_>dW-\u00198\t\u000f\u0005eg\u00051\u0001\u0002\\\u0006\u0019Qn]4\u0011\t\u0005\r\u0017Q\\\u0005\u0005\u0003?\f)MA\bJ/>\u00148.\u001a:SK\u000e,\u0017N^3s\u0003i\u0011X-Y2u\u001f:,\u00050Z2vi\u0016lUm]:bO\u0016\f5/\u001f8d)\u0011\tI!!:\t\u000f\u0005ew\u00051\u0001\u0002hB!\u00111YAu\u0013\u0011\tY/!2\u0003\u000f\u0015CXmY;uK\u0006\u0011#/Z1di>sW\t_3dkR,g)\u001e8di&|g.T3tg\u0006<W-Q:z]\u000e$B!!\u0003\u0002r\"9\u0011\u0011\u001c\u0015A\u0002\u0005M\b\u0003BAb\u0003kLA!a>\u0002F\nyQ\t_3dkR,g)\u001e8di&|g.A\rsK\u0006\u001cGo\u00148SK6|G/Z'fgN\fw-Z!ts:\u001cGCCA\u0005\u0003{\u0014\tA!\u0005\u0003\u001c!1\u0011q`\u0015A\u0002-\u000bQb\u00197jK:$\u0018\t\u001a3sKN\u001c\bb\u0002B\u0002S\u0001\u0007!QA\u0001\fKb,7-\u001e;f\rVt7\r\u0005\u0005A\u0005\u000fY%1BAV\u0013\r\u0011I!\u0011\u0002\n\rVt7\r^5p]J\u00022a\u0012B\u0007\u0013\r\u0011y!\u000e\u0002\u0010!2\fGOZ8s[\u000e{g\u000e^3yi\"9!1C\u0015A\u0002\tU\u0011!C8o'V\u001c7-Z:t!)\u0001%qCAV\u00033Y\u0015\u0011B\u0005\u0004\u00053\t%!\u0003$v]\u000e$\u0018n\u001c84\u0011\u001d\u0011i\"\u000ba\u0001\u0005?\t\u0011b\u001c8GC&dWO]3\u0011\u0015\u0001\u00139B!\t\u0002\u001a-\u000bI\u0001\u0005\u0003\u0003$\t5b\u0002\u0002B\u0013\u0005Sq1A\u0014B\u0014\u0013\u0005\u0011\u0015b\u0001B\u0016\u0003\u00069\u0001/Y2lC\u001e,\u0017\u0002\u0002B\u0018\u0005c\u0011\u0011\u0002\u00165s_^\f'\r\\3\u000b\u0007\t-\u0012)A\ftK:$Wj]4U_Bc\u0017\r\u001e4pe6\u0014%o\\6feR1\u0011\u0011\u001bB\u001c\u0005\u0013Bq!!7+\u0001\u0004\u0011I\u0004\u0005\u0003\u0003<\t\u0015SB\u0001B\u001f\u0015\u0011\u0011yD!\u0011\u0002\u0011Q|'I]8lKJTAAa\u0011\u00020\u00061Qn\u001c3vY\u0016LAAa\u0012\u0003>\tI\u0012J\u0011:pW\u0016\u0014(+Z2fSZ,'O\u0012:p[6{G-\u001e7f\u0011\u0015Y&\u00061\u0001]\u0003=\u0019XM\u001c3Ng\u001e$vn\u00117jK:$HCBAi\u0005\u001f\u0012I\u0006C\u0004\u0002Z.\u0002\rA!\u0015\u0011\t\tM#QK\u0007\u0003\u0005\u0003JAAa\u0016\u0003B\t\u0019\u0012*T8ek2,7+\u001a8e)>\u001cE.[3oi\")1l\u000ba\u00019\u00069\"/Z1e\u001bN<gI]8n'\u0016\u0014h/\u001a:Ce>\\WM\u001d\u000b\u0005\u0003W\u0013y\u0006C\u0003\\Y\u0001\u0007A,\u0001\u0006x_J\\WM]:NCB,\"A!\u001a\u0011\u000f\t\u001d$\u0011O&\u0002\u001a5\u0011!\u0011\u000e\u0006\u0005\u0005W\u0012i'A\u0004nkR\f'\r\\3\u000b\u0007\t=\u0014)\u0001\u0006d_2dWm\u0019;j_:LAAa\u001d\u0003j\t\u0019Q*\u00199\u0002\u0017]|'o[3sg6\u000b\u0007\u000fI\u0001\u0002eV\u0011!1\u0010\b\u0005\u0005{\u0012\u0019)\u0004\u0002\u0003��)\u0019!\u0011Q!\u0002\tU$\u0018\u000e\\\u0005\u0005\u0005\u000b\u0013y(\u0001\u0004SC:$w.\\\u0001\u0003e\u0002\n\u0011dZ3oKJ\fG/Z+okN,GmV8sW\u0016\u00148OT1nKR\t1*A\u0003dY>\u001cX-\u0001\bsk:<\u0016\u000e\u001e5US6,w.\u001e;\u0016\t\tM%\u0011\u0015\u000b\u0005\u0005+\u0013i\f\u0006\u0003\u0003\u0018\nM\u0006#\u0002!\u0003\u001a\nu\u0015b\u0001BN\u0003\n1q\n\u001d;j_:\u0004BAa(\u0003\"2\u0001Aa\u0002BRg\t\u0007!Q\u0015\u0002\u0002)F!!q\u0015BW!\r\u0001%\u0011V\u0005\u0004\u0005W\u000b%a\u0002(pi\"Lgn\u001a\t\u0004\u0001\n=\u0016b\u0001BY\u0003\n\u0019\u0011I\\=\t\u0011\tU6\u0007\"a\u0001\u0005o\u000b\u0011A\u001a\t\u0006\u0001\ne&QT\u0005\u0004\u0005w\u000b%\u0001\u0003\u001fcs:\fW.\u001a \t\u000f\t}6\u00071\u0001\u0002H\u0005IA/[7f_V$Xj\u001d")
/* loaded from: input_file:org/mixql/engine/core/Module.class */
public class Module {
    private final IModuleExecutor executor;
    private final String identity;
    private final String host;
    private final int port;
    private final ModuleLogger logger;
    private final Config config = ConfigFactory.load();
    private ZMQ.Context ctx = null;
    private ZMQ.Socket server = null;
    private ZMQ.Poller poller = null;
    private ZMQ.Poller workerPoller = null;
    private final long pollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.pollerTimeout");
    }).getOrElse(() -> {
        return 100L;
    }));
    private final long workerPollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.workerPollerTimeout");
    }).getOrElse(() -> {
        return 95L;
    }));
    private final long heartBeatInterval = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
        return this.config().getLong("org.mixql.engine.module.heartBeatInterval");
    }).getOrElse(() -> {
        return 16500L;
    }));
    private DateTime processStart = null;
    private final int livenessInit = BoxesRunTime.unboxToInt(Try$.MODULE$.apply(() -> {
        return this.config().getInt("org.mixql.engine.module.liveness");
    }).getOrElse(() -> {
        return 3;
    }));
    private int liveness = livenessInit();
    private final Map<String, ZMQ.Socket> workersMap = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
    private final Random$ r = Random$.MODULE$;

    public Config config() {
        return this.config;
    }

    public ZMQ.Context ctx() {
        return this.ctx;
    }

    public void ctx_$eq(ZMQ.Context context) {
        this.ctx = context;
    }

    public ZMQ.Socket server() {
        return this.server;
    }

    public void server_$eq(ZMQ.Socket socket) {
        this.server = socket;
    }

    public ZMQ.Poller poller() {
        return this.poller;
    }

    public void poller_$eq(ZMQ.Poller poller) {
        this.poller = poller;
    }

    public ZMQ.Poller workerPoller() {
        return this.workerPoller;
    }

    public void workerPoller_$eq(ZMQ.Poller poller) {
        this.workerPoller = poller;
    }

    public long pollerTimeout() {
        return this.pollerTimeout;
    }

    public long workerPollerTimeout() {
        return this.workerPollerTimeout;
    }

    private long heartBeatInterval() {
        return this.heartBeatInterval;
    }

    private DateTime processStart() {
        return this.processStart;
    }

    private void processStart_$eq(DateTime dateTime) {
        this.processStart = dateTime;
    }

    private int livenessInit() {
        return this.livenessInit;
    }

    private int liveness() {
        return this.liveness;
    }

    private void liveness_$eq(int i) {
        this.liveness = i;
    }

    public void startServer() {
        this.logger.logInfo("Starting main client");
        this.logger.logInfo(new StringBuilder(13).append("host of server is ").append(this.host).append(" and port is ").append(Integer.toString(this.port)).toString());
        try {
            try {
                try {
                    ctx_$eq(ZMQ.context(1));
                    server_$eq(ctx().socket(SocketType.DEALER));
                    server().setIdentity(this.identity.getBytes());
                    this.logger.logInfo(new StringBuilder(0).append("connected: ").append(server().connect(new StringBuilder(7).append("tcp://").append(this.host).append(":").append(Integer.toString(this.port)).toString())).toString());
                    this.logger.logInfo("Connection established.");
                    this.logger.logDebug("Setting processStart for timer");
                    processStart_$eq(Imports$.MODULE$.DateTime().now());
                    this.logger.logInfo("Setting poller");
                    poller_$eq(ctx().poller(1));
                    this.logger.logInfo("Setting workers poller");
                    workerPoller_$eq(ctx().poller(14));
                    this.logger.logInfo("Register server's socket pollin in poller");
                    int register = poller().register(server(), 1);
                    this.logger.logInfo("Sending READY message to server's broker");
                    sendMsgToPlatformBroker(new EngineIsReady(Predef$.MODULE$.long2Long(heartBeatInterval()), Predef$.MODULE$.long2Long(pollerTimeout())), this.logger);
                    while (true) {
                        poller().poll(pollerTimeout());
                        int i = -1;
                        if (workerPoller().getSize() != 0) {
                            i = workerPoller().poll(workerPollerTimeout());
                        }
                        if (poller().pollin(register)) {
                            this.logger.logDebug("Setting processStart for timer, as message was received");
                            Message readMsgFromServerBroker = readMsgFromServerBroker(this.logger);
                            if (readMsgFromServerBroker instanceof IBrokerSender) {
                                this.logger.logDebug("got broker's service message");
                                reactOnReceivedBrokerMsg((IBrokerSender) readMsgFromServerBroker);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            } else {
                                if (!(readMsgFromServerBroker instanceof IModuleReceiver)) {
                                    throw new MatchError(readMsgFromServerBroker);
                                }
                                reactOnReceivedMsgForEngine((IModuleReceiver) readMsgFromServerBroker);
                                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                            }
                            processStart_$eq(Imports$.MODULE$.DateTime().now());
                            liveness_$eq(livenessInit());
                        } else {
                            long millis$extension = RichReadableInterval$.MODULE$.millis$extension(Imports$.MODULE$.richReadableInterval(RichReadableInstant$.MODULE$.to$extension(Imports$.MODULE$.richReadableInstant(processStart()), Imports$.MODULE$.DateTime().now())));
                            this.logger.logDebug(new StringBuilder(0).append("elapsed: ").append(millis$extension).toString());
                            if (millis$extension >= heartBeatInterval()) {
                                processStart_$eq(Imports$.MODULE$.DateTime().now());
                                this.logger.logDebug(new StringBuilder(0).append("heartbeat work. Sending heart beat. Liveness: ").append(liveness()).toString());
                                sendMsgToPlatformBroker(new EnginePingHeartBeat(), this.logger);
                                liveness_$eq(liveness() - 1);
                                this.logger.logDebug(new StringBuilder(0).append("heartbeat work. After sending heart beat. Liveness: ").append(liveness()).toString());
                            }
                            if (liveness() < 0) {
                                this.logger.logError("heartbeat failure, can't reach server's broker. Shutting down");
                                throw new BrakeException();
                            }
                        }
                        if (i > 0) {
                            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), workerPoller().getSize()).foreach(obj -> {
                                return $anonfun$startServer$1(this, BoxesRunTime.unboxToInt(obj));
                            });
                        }
                    }
                } catch (BrakeException unused) {
                    this.logger.logDebug("BrakeException");
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    close();
                    this.logger.logInfo("Stopped.");
                }
            } catch (Exception e) {
                this.logger.logError(new StringBuilder(0).append("Error: ").append(e.getMessage()).toString());
                BoxesRunTime.boxToBoolean(sendMsgToPlatformBroker(new EngineFailed(new StringBuilder(0).append(new StringBuilder(32).append("Module ").append(this.identity).append(" to broker: fatal error: ").toString()).append(e.getMessage()).toString()), this.logger));
                close();
                this.logger.logInfo("Stopped.");
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    private void reactOnReceivedBrokerMsg(Message message) {
        if (!(message instanceof PlatformPongHeartBeat)) {
            throw new MatchError(message);
        }
        this.logger.logDebug("got pong heart beat message from broker server");
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private void reactOnReceivedMsgForEngine(IModuleReceiver iModuleReceiver) {
        BoxedUnit boxedUnit;
        if (iModuleReceiver instanceof Execute) {
            reactOnExecuteMessageAsync((Execute) iModuleReceiver);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (iModuleReceiver instanceof ShutDown) {
            this.logger.logInfo("Started shutdown");
            try {
                this.executor.reactOnShutDown(this.identity, iModuleReceiver.clientIdentity(), this.logger);
            } catch (Throwable th) {
                this.logger.logWarn(new StringBuilder(43).append("Warning: error while reacting on shutdown: ").append(th.getMessage()).toString());
            }
            throw new BrakeException();
        }
        if (iModuleReceiver instanceof ExecuteFunction) {
            reactOnExecuteFunctionMessageAsync((ExecuteFunction) iModuleReceiver);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        if (!(iModuleReceiver instanceof GetDefinedFunctions)) {
            if (!(iModuleReceiver instanceof IWorkerReceiver)) {
                throw new MatchError(iModuleReceiver);
            }
            sendMessageToWorker((IWorkerReceiver) iModuleReceiver);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            return;
        }
        try {
            sendMsgToClient(this.executor.reactOnGetDefinedFunctions(this.identity, iModuleReceiver.clientIdentity(), this.logger), this.logger);
            boxedUnit = BoxedUnit.UNIT;
        } catch (Throwable th2) {
            sendMsgToClient(new GetDefinedFunctionsError(new StringBuilder(15).append(new StringBuilder(44).append("Module ").append(this.identity).append(" to ").append(iModuleReceiver.clientIdentity()).append(": error while reacting on getting").toString()).append(" functions list").append(th2.getMessage()).toString(), iModuleReceiver.clientIdentity()), this.logger);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public boolean sendMessageToWorker(IWorkerReceiver iWorkerReceiver) {
        String workerIdentity = iWorkerReceiver.workerIdentity();
        this.logger.logInfo(new StringBuilder(20).append(new StringBuilder(51).append("received message ").append(iWorkerReceiver.type()).append(" from platfrom to workers-future-").append(workerIdentity).append(" ").toString()).append("Sending it to worker").toString());
        return ((ZMQ.Socket) workersMap().apply(workerIdentity)).send(iWorkerReceiver.toByteArray());
    }

    public void reactOnExecuteMessageAsync(Execute execute) {
        reactOnRemoteMessageAsync(execute.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(39).append("[workers-future-").append(str).append("]: triggering onExecute").toString());
            return this.executor.reactOnExecuteAsync(execute, this.identity, execute.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            $anonfun$reactOnExecuteMessageAsync$2(execute, message, socket, str2);
            return BoxedUnit.UNIT;
        }, (th, socket2, str3) -> {
            $anonfun$reactOnExecuteMessageAsync$3(this, execute, th, socket2, str3);
            return BoxedUnit.UNIT;
        });
    }

    public void reactOnExecuteFunctionMessageAsync(ExecuteFunction executeFunction) {
        reactOnRemoteMessageAsync(executeFunction.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(47).append("[workers-future-").append(str).append("]: triggering onExecuteFunction").toString());
            return this.executor.reactOnExecuteFunctionAsync(executeFunction, this.identity, executeFunction.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            $anonfun$reactOnExecuteFunctionMessageAsync$2(executeFunction, message, socket, str2);
            return BoxedUnit.UNIT;
        }, (th, socket2, str3) -> {
            $anonfun$reactOnExecuteFunctionMessageAsync$3(this, executeFunction, th, socket2, str3);
            return BoxedUnit.UNIT;
        });
    }

    public void reactOnRemoteMessageAsync(String str, Function2<String, PlatformContext, Message> function2, Function3<Message, ZMQ.Socket, String, BoxedUnit> function3, Function3<Throwable, ZMQ.Socket, String, BoxedUnit> function32) {
        String generateUnusedWorkersName = generateUnusedWorkersName();
        this.logger.logInfo(new StringBuilder(16).append("Creating worker ").append(generateUnusedWorkersName).toString());
        this.logger.logInfo(new StringBuilder(0).append("Register module's pair socket pollin in workersPoller for worker ").append(generateUnusedWorkersName).toString());
        ZMQ.Socket socket = ctx().socket(SocketType.PAIR);
        workerPoller().register(socket, 1);
        socket.bind(new StringBuilder(9).append("inproc://").append(generateUnusedWorkersName).toString());
        workersMap().put(generateUnusedWorkersName, socket);
        ObjectRef create = ObjectRef.create((Object) null);
        Future$.MODULE$.apply(() -> {
            this.logger.logInfo(new StringBuilder(78).append("[workers-future-").append(generateUnusedWorkersName).append("]: Creating future's pair socket for communicating with module").toString());
            create.elem = this.ctx().socket(SocketType.PAIR);
            this.logger.logInfo(new StringBuilder(57).append("[workers-future-").append(generateUnusedWorkersName).append("]: Bind future's pair socket in inproc://").append(generateUnusedWorkersName).toString());
            ((ZMQ.Socket) create.elem).connect(new StringBuilder(9).append("inproc://").append(generateUnusedWorkersName).toString());
            return (Message) function2.apply(generateUnusedWorkersName, new PlatformContext((ZMQ.Socket) create.elem, generateUnusedWorkersName, str, this.logger));
        }, ExecutionContext$Implicits$.MODULE$.global()).onComplete(r12 -> {
            $anonfun$reactOnRemoteMessageAsync$2(this, function3, create, generateUnusedWorkersName, function32, r12);
            return BoxedUnit.UNIT;
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public boolean sendMsgToPlatformBroker(IBrokerReceiverFromModule iBrokerReceiverFromModule, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToPlatformBroker: Send msg to server ");
        return server().send(iBrokerReceiverFromModule.toByteArray());
    }

    public boolean sendMsgToClient(IModuleSendToClient iModuleSendToClient, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToClient: Send msg to server ");
        return server().send(iModuleSendToClient.toByteArray());
    }

    public Message readMsgFromServerBroker(ModuleLogger moduleLogger) {
        moduleLogger.logDebug(new StringBuilder(0).append("readMsgFromServerBroker: received Identity of engine ").append(new String(server().recv(0))).toString());
        byte[] recv = server().recv(0);
        moduleLogger.logDebug(new StringBuilder(0).append("have received message from server: ").append(new String(recv)).toString());
        return RemoteMessageConverter.unpackAnyMsgFromArray(recv);
    }

    public Map<String, ZMQ.Socket> workersMap() {
        return this.workersMap;
    }

    public Random$ r() {
        return this.r;
    }

    public String generateUnusedWorkersName() {
        Regex r$extension = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("[0-9]+"));
        Iterable iterable = (Iterable) workersMap().keys().map(str -> {
            return BoxesRunTime.boxToInteger($anonfun$generateUnusedWorkersName$1(r$extension, str));
        });
        boolean z = false;
        IntRef create = IntRef.create(-1);
        while (!z) {
            create.elem = RichInt$.MODULE$.abs$extension(Predef$.MODULE$.intWrapper(r().nextInt()));
            Option find = iterable.find(i -> {
                return i == create.elem;
            });
            if (find instanceof Some) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!None$.MODULE$.equals(find)) {
                    throw new MatchError(find);
                }
                z = true;
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
        return new StringBuilder(6).append("worker").append(create.elem).toString();
    }

    public void close() {
        Try$.MODULE$.apply(() -> {
            if (this.server() == null) {
                return BoxedUnit.UNIT;
            }
            this.logger.logInfo("finally close server");
            return this.runWithTimeout(5000L, () -> {
                this.server().close();
            });
        });
        if (workersMap().nonEmpty()) {
            workersMap().foreach(tuple2 -> {
                return Try$.MODULE$.apply(() -> {
                    return this.runWithTimeout(5000L, () -> {
                        ((ZMQ.Socket) tuple2._2()).close();
                    });
                });
            });
        }
        Try$.MODULE$.apply(() -> {
            if (this.poller() == null) {
                return BoxedUnit.UNIT;
            }
            this.logger.logInfo("finally close poller");
            return this.runWithTimeout(5000L, () -> {
                this.poller().close();
            });
        });
        Try$.MODULE$.apply(() -> {
            if (this.workerPoller() == null) {
                return BoxedUnit.UNIT;
            }
            this.logger.logInfo("finally close workerPoller");
            return this.runWithTimeout(5000L, () -> {
                this.workerPoller().close();
            });
        });
        try {
            if (ctx() != null) {
                this.logger.logInfo("finally close context");
                runWithTimeout(5000L, () -> {
                    this.ctx().close();
                });
            }
        } catch (Throwable unused) {
            this.logger.logError("tiemout of closing context exceeded:(");
        }
    }

    public <T> Option<T> runWithTimeout(long j, Function0<T> function0) {
        return new Some(Await$.MODULE$.result(Future$.MODULE$.apply(function0, ExecutionContext$Implicits$.MODULE$.global()), new package.DurationLong(package$.MODULE$.DurationLong(j)).milliseconds()));
    }

    public static final /* synthetic */ Object $anonfun$startServer$1(Module module, int i) {
        BoxedUnit boxToBoolean;
        if (!module.workerPoller().pollin(i)) {
            return BoxedUnit.UNIT;
        }
        ZMQ.Socket socket = module.workerPoller().getSocket(i);
        Message unpackAnyMsgFromArray = RemoteMessageConverter.unpackAnyMsgFromArray(socket.recv(0));
        if (unpackAnyMsgFromArray instanceof WorkerFinished) {
            WorkerFinished workerFinished = (WorkerFinished) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(74).append("Received message WorkerFinished from worker ").append(workerFinished.workerIdentity()).append(" Remove socket from workersMap").toString());
            module.workersMap().remove(workerFinished.Id);
            module.logger.logInfo(new StringBuilder(45).append("Unregister worker's ").append(workerFinished.workerIdentity()).append(" socket from workerPoller").toString());
            module.workerPoller().unregister(socket);
            module.logger.logInfo(new StringBuilder(24).append("Closing worker's ").append(workerFinished.workerIdentity()).append(" socket").toString());
            socket.close();
            boxToBoolean = BoxedUnit.UNIT;
        } else if (unpackAnyMsgFromArray instanceof SendMsgToPlatform) {
            SendMsgToPlatform sendMsgToPlatform = (SendMsgToPlatform) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(71).append("Received message SendMsgToPlatform from worker ").append(sendMsgToPlatform.workerIdentity()).append(" and send it to platform").toString());
            boxToBoolean = BoxesRunTime.boxToBoolean(module.sendMsgToClient(sendMsgToPlatform.msg, module.logger));
        } else {
            if (!(unpackAnyMsgFromArray instanceof IWorkerSendToClient)) {
                throw new MatchError(unpackAnyMsgFromArray);
            }
            IWorkerSendToClient iWorkerSendToClient = (IWorkerSendToClient) unpackAnyMsgFromArray;
            module.logger.logInfo(new StringBuilder(59).append("Received message of type IWorkerSendToPlatform from worker ").append(iWorkerSendToClient.workerIdentity()).append(new StringBuilder(34).append(" and proxy it (type: ").append(iWorkerSendToClient.type()).append(") to platform").toString()).toString());
            boxToBoolean = BoxesRunTime.boxToBoolean(module.sendMsgToClient(iWorkerSendToClient, module.logger));
        }
        return boxToBoolean;
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteMessageAsync$2(Execute execute, Message message, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecuteResult(execute.statement, message, execute.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteMessageAsync$3(Module module, Execute execute, Throwable th, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecuteResultFailed(new StringBuilder(0).append(new StringBuilder(46).append("Module ").append(module.identity).append(" to ").append(execute.clientIdentity()).append(": error while reacting on execute: ").toString()).append(th.getMessage()).toString(), execute.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteFunctionMessageAsync$2(ExecuteFunction executeFunction, Message message, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecutedFunctionResult(executeFunction.name, message, executeFunction.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnExecuteFunctionMessageAsync$3(Module module, ExecuteFunction executeFunction, Throwable th, ZMQ.Socket socket, String str) {
        socket.send(new SendMsgToPlatform(new ExecutedFunctionResultFailed(new StringBuilder(0).append(new StringBuilder(53).append("Module ").append(module.identity).append(" to ").append(executeFunction.clientIdentity()).append(": error while reacting on execute function").toString()).append(new StringBuilder(2).append(executeFunction.name).append(": ").toString()).append(th.getMessage()).toString(), executeFunction.clientIdentity()), str).toByteArray());
    }

    public static final /* synthetic */ void $anonfun$reactOnRemoteMessageAsync$2(Module module, Function3 function3, ObjectRef objectRef, String str, Function3 function32, Try r10) {
        if (r10 instanceof Success) {
            function3.apply((Message) ((Success) r10).value(), (ZMQ.Socket) objectRef.elem, str);
            module.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(str).append("]: Sending WorkerFinished to inproc://").append(str).toString());
            ((ZMQ.Socket) objectRef.elem).send(new WorkerFinished(str).toByteArray());
            module.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(str).append("]: Close future's pair socket inproc://").append(str).toString());
            ((ZMQ.Socket) objectRef.elem).close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r10 instanceof Failure)) {
            throw new MatchError(r10);
        }
        function32.apply(((Failure) r10).exception(), (ZMQ.Socket) objectRef.elem, str);
        module.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(str).append("]: Sending WorkerFinished to inproc://").append(str).toString());
        ((ZMQ.Socket) objectRef.elem).send(new WorkerFinished(str).toByteArray());
        module.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(str).append("]: Close future's pair socket inproc://").append(str).toString());
        ((ZMQ.Socket) objectRef.elem).close();
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ int $anonfun$generateUnusedWorkersName$1(Regex regex, String str) {
        return StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString((String) regex.findFirstIn(str).get()));
    }

    public Module(IModuleExecutor iModuleExecutor, String str, String str2, int i, ModuleLogger moduleLogger) {
        this.executor = iModuleExecutor;
        this.identity = str;
        this.host = str2;
        this.port = i;
        this.logger = moduleLogger;
    }
}
