package org.apache.spark.sql.mlsql.sources.mysql.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.AbstractEventWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.DeleteRowsWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.EventInfo;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.InsertRowsWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.BinlogWriteAheadLog;
import org.apache.spark.streaming.BinlogWriteAheadLog$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: BinLogSocketServerInExecutor.scala */
@ScalaSignature(bytes = "\u0006\u0001\r\u001df\u0001B\u0001\u0003\u0001M\u0011ADQ5o\u0019><7k\\2lKR\u001cVM\u001d<fe&sW\t_3dkR|'O\u0003\u0002\u0004\t\u00051!-\u001b8m_\u001eT!!\u0002\u0004\u0002\u000b5L8/\u001d7\u000b\u0005\u001dA\u0011aB:pkJ\u001cWm\u001d\u0006\u0003\u0013)\tQ!\u001c7tc2T!a\u0003\u0007\u0002\u0007M\fHN\u0003\u0002\u000e\u001d\u0005)1\u000f]1sW*\u0011q\u0002E\u0001\u0007CB\f7\r[3\u000b\u0003E\t1a\u001c:h\u0007\u0001)\"\u0001F\u000e\u0014\t\u0001)rE\u000b\t\u0004-]IR\"\u0001\u0002\n\u0005a\u0011!AF*pG.,GoU3sm\u0016\u0014\u0018J\\#yK\u000e,Ho\u001c:\u0011\u0005iYB\u0002\u0001\u0003\u00069\u0001\u0011\r!\b\u0002\u0002)F\u0011a\u0004\n\t\u0003?\tj\u0011\u0001\t\u0006\u0002C\u0005)1oY1mC&\u00111\u0005\t\u0002\b\u001d>$\b.\u001b8h!\tyR%\u0003\u0002'A\t\u0019\u0011I\\=\u0011\u0005YA\u0013BA\u0015\u0003\u0005a\u0011\u0015N\u001c'pON{7m[3u'\u0016\u0014h/\u001a:TKJ$UM\u001d\t\u0003W9j\u0011\u0001\f\u0006\u0003[1\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003_1\u0012q\u0001T8hO&tw\r\u0003\u00052\u0001\t\u0005\t\u0015!\u00033\u00039!\u0018m]6D_:$X\r\u001f;SK\u001a\u00042a\r\u001f\u001a\u001b\u0005!$BA\u001b7\u0003\u0019\tGo\\7jG*\u0011q\u0007O\u0001\u000bG>t7-\u001e:sK:$(BA\u001d;\u0003\u0011)H/\u001b7\u000b\u0003m\nAA[1wC&\u0011Q\b\u000e\u0002\u0010\u0003R|W.[2SK\u001a,'/\u001a8dK\"Aq\b\u0001B\u0001B\u0003%\u0001)A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d\t\u0003\u0003\u0012s!a\b\"\n\u0005\r\u0003\u0013A\u0002)sK\u0012,g-\u0003\u0002F\r\n11\u000b\u001e:j]\u001eT!a\u0011\u0011\t\u0011!\u0003!\u0011!Q\u0001\n%\u000b!\u0002[1e_>\u00048i\u001c8g!\tQu*D\u0001L\u0015\taU*\u0001\u0003d_:4'B\u0001(\u000f\u0003\u0019A\u0017\rZ8pa&\u0011\u0001k\u0013\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\t\u0011I\u0003!\u0011!Q\u0001\nM\u000b1#[:Xe&$X-\u00115fC\u0012\u001cFo\u001c:bO\u0016\u0004\"a\b+\n\u0005U\u0003#a\u0002\"p_2,\u0017M\u001c\u0005\u0006/\u0002!\t\u0001W\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000beS6\fX/\u0011\u0007Y\u0001\u0011\u0004C\u00032-\u0002\u0007!\u0007C\u0003@-\u0002\u0007\u0001\tC\u0003I-\u0002\u0007\u0011\nC\u0004S-B\u0005\t\u0019A*\t\u000f}\u0003\u0001\u0019!C\u0005A\u0006i1m\u001c8oK\u000e$H\u000b\u001b:fC\u0012,\u0012!\u0019\t\u0003E\u0016l\u0011a\u0019\u0006\u0003Ij\nA\u0001\\1oO&\u0011am\u0019\u0002\u0007)\"\u0014X-\u00193\t\u000f!\u0004\u0001\u0019!C\u0005S\u0006\t2m\u001c8oK\u000e$H\u000b\u001b:fC\u0012|F%Z9\u0015\u0005)l\u0007CA\u0010l\u0013\ta\u0007E\u0001\u0003V]&$\bb\u00028h\u0003\u0003\u0005\r!Y\u0001\u0004q\u0012\n\u0004B\u00029\u0001A\u0003&\u0011-\u0001\bd_:tWm\u0019;UQJ,\u0017\r\u001a\u0011\t\u000fI\u0004\u0001\u0019!C\u0005g\u0006y!-\u001b8befdunZ\"mS\u0016tG/F\u0001u!\t)h0D\u0001w\u0015\t\u0019qO\u0003\u0002\u0006q*\u0011\u0011P_\u0001\u0007g\"L\u0018n[8\u000b\u0005md\u0018AB4ji\",(MC\u0001~\u0003\r\u0019w.\\\u0005\u0003\u007fZ\u0014qBQ5oCJLHj\\4DY&,g\u000e\u001e\u0005\n\u0003\u0007\u0001\u0001\u0019!C\u0005\u0003\u000b\t1CY5oCJLHj\\4DY&,g\u000e^0%KF$2A[A\u0004\u0011!q\u0017\u0011AA\u0001\u0002\u0004!\bbBA\u0006\u0001\u0001\u0006K\u0001^\u0001\u0011E&t\u0017M]=M_\u001e\u001cE.[3oi\u0002B\u0011\"a\u0004\u0001\u0001\u0004%I!!\u0005\u0002#\r,(O]3oi\nKg\u000e\\8h\r&dW-F\u0001A\u0011%\t)\u0002\u0001a\u0001\n\u0013\t9\"A\u000bdkJ\u0014XM\u001c;CS:dwn\u001a$jY\u0016|F%Z9\u0015\u0007)\fI\u0002\u0003\u0005o\u0003'\t\t\u00111\u0001A\u0011\u001d\ti\u0002\u0001Q!\n\u0001\u000b!cY;se\u0016tGOQ5oY><g)\u001b7fA!I\u0011\u0011\u0005\u0001A\u0002\u0013%\u00111E\u0001\u0016GV\u0014(/\u001a8u\u0005&tGn\\4Q_NLG/[8o+\t\t)\u0003E\u0002 \u0003OI1!!\u000b!\u0005\u0011auN\\4\t\u0013\u00055\u0002\u00011A\u0005\n\u0005=\u0012!G2veJ,g\u000e\u001e\"j]2|w\rU8tSRLwN\\0%KF$2A[A\u0019\u0011%q\u00171FA\u0001\u0002\u0004\t)\u0003\u0003\u0005\u00026\u0001\u0001\u000b\u0015BA\u0013\u0003Y\u0019WO\u001d:f]R\u0014\u0015N\u001c7pOB{7/\u001b;j_:\u0004\u0003\"CA\u001d\u0001\t\u0007I\u0011BA\u001e\u0003\u0015\tX/Z;f+\t\ti\u0004\u0005\u0004\u0002@\u0005\u0005\u0013QI\u0007\u0002q%\u0019\u00111\t\u001d\u0003\u0015\u0005\u0013(/Y=EKF,X\rE\u0002\u0017\u0003\u000fJ1!!\u0013\u0003\u00059\u0011\u0016m\u001e\"j]2|w-\u0012<f]RD\u0001\"!\u0014\u0001A\u0003%\u0011QH\u0001\u0007cV,W/\u001a\u0011\t\u0013\u0005E\u0003A1A\u0005\n\u0005M\u0013!D<sSR,\u0017\t[3bI2{w-\u0006\u0002\u0002VA!\u0011qKA/\u001b\t\tIFC\u0002\u0002\\1\t\u0011b\u001d;sK\u0006l\u0017N\\4\n\t\u0005}\u0013\u0011\f\u0002\u0014\u0005&tGn\\4Xe&$X-\u00115fC\u0012dun\u001a\u0005\t\u0003G\u0002\u0001\u0015!\u0003\u0002V\u0005qqO]5uK\u0006CW-\u00193M_\u001e\u0004\u0003\"CA4\u0001\u0001\u0007I\u0011BA5\u0003M!\u0017\r^1cCN,g*Y7f!\u0006$H/\u001a:o+\t\tY\u0007E\u0003 \u0003[\n\t(C\u0002\u0002p\u0001\u0012aa\u00149uS>t\u0007\u0003BA:\u0003sj!!!\u001e\u000b\u0007\u0005]\u0004(A\u0003sK\u001e,\u00070\u0003\u0003\u0002|\u0005U$a\u0002)biR,'O\u001c\u0005\n\u0003\u007f\u0002\u0001\u0019!C\u0005\u0003\u0003\u000bq\u0003Z1uC\n\f7/\u001a(b[\u0016\u0004\u0016\r\u001e;fe:|F%Z9\u0015\u0007)\f\u0019\tC\u0005o\u0003{\n\t\u00111\u0001\u0002l!A\u0011q\u0011\u0001!B\u0013\tY'\u0001\u000beCR\f'-Y:f\u001d\u0006lW\rU1ui\u0016\u0014h\u000e\t\u0005\n\u0003\u0017\u0003\u0001\u0019!C\u0005\u0003S\n\u0001\u0003^1cY\u0016t\u0015-\\3QCR$XM\u001d8\t\u0013\u0005=\u0005\u00011A\u0005\n\u0005E\u0015\u0001\u0006;bE2,g*Y7f!\u0006$H/\u001a:o?\u0012*\u0017\u000fF\u0002k\u0003'C\u0011B\\AG\u0003\u0003\u0005\r!a\u001b\t\u0011\u0005]\u0005\u0001)Q\u0005\u0003W\n\u0011\u0003^1cY\u0016t\u0015-\\3QCR$XM\u001d8!\u0011%\tY\n\u0001a\u0001\n\u0013\t\u0019#\u0001\nnCb\u0014\u0015N\u001c7pOF+X-^3TSj,\u0007\"CAP\u0001\u0001\u0007I\u0011BAQ\u0003Yi\u0017\r\u001f\"j]2|w-U;fk\u0016\u001c\u0016N_3`I\u0015\fHc\u00016\u0002$\"Ia.!(\u0002\u0002\u0003\u0007\u0011Q\u0005\u0005\t\u0003O\u0003\u0001\u0015)\u0003\u0002&\u0005\u0019R.\u0019=CS:dwnZ)vKV,7+\u001b>fA!I\u00111\u0016\u0001A\u0002\u0013%\u0011QV\u0001\bG>tg.Z2u+\t\ty\u000bE\u0002\u0017\u0003cK1!a-\u0003\u0005Mi\u0015pU)M\u0007>tg.Z2uS>t\u0017J\u001c4p\u0011%\t9\f\u0001a\u0001\n\u0013\tI,A\u0006d_:tWm\u0019;`I\u0015\fHc\u00016\u0002<\"Ia.!.\u0002\u0002\u0003\u0007\u0011q\u0016\u0005\t\u0003\u007f\u0003\u0001\u0015)\u0003\u00020\u0006A1m\u001c8oK\u000e$\b\u0005C\u0005\u0002D\u0002\u0001\r\u0011\"\u0003\u0002F\u0006q\u0011\r[3bI2{wMQ;gM\u0016\u0014XCAAd!\u0019\tI-a5\u0002F5\u0011\u00111\u001a\u0006\u0005\u0003\u001b\fy-A\u0004nkR\f'\r\\3\u000b\u0007\u0005E\u0007%\u0001\u0006d_2dWm\u0019;j_:LA!!6\u0002L\nY\u0011I\u001d:bs\n+hMZ3s\u0011%\tI\u000e\u0001a\u0001\n\u0013\tY.\u0001\nbQ\u0016\fG\rT8h\u0005V4g-\u001a:`I\u0015\fHc\u00016\u0002^\"Ia.a6\u0002\u0002\u0003\u0007\u0011q\u0019\u0005\t\u0003C\u0004\u0001\u0015)\u0003\u0002H\u0006y\u0011\r[3bI2{wMQ;gM\u0016\u0014\b\u0005C\u0005\u0002f\u0002\u0001\r\u0011\"\u0003\u0002h\u0006I1o[5q)\u0006\u0014G.Z\u000b\u0002'\"I\u00111\u001e\u0001A\u0002\u0013%\u0011Q^\u0001\u000eg.L\u0007\u000fV1cY\u0016|F%Z9\u0015\u0007)\fy\u000f\u0003\u0005o\u0003S\f\t\u00111\u0001T\u0011\u001d\t\u0019\u0010\u0001Q!\nM\u000b!b]6jaR\u000b'\r\\3!Q\u0011\t\t0a>\u0011\u0007}\tI0C\u0002\u0002|\u0002\u0012\u0001B^8mCRLG.\u001a\u0005\f\u0003\u007f\u0004\u0001\u0019!a\u0001\n\u0013\u0011\t!\u0001\u0007dkJ\u0014XM\u001c;UC\ndW-\u0006\u0002\u0003\u0004A\u0019aC!\u0002\n\u0007\t\u001d!AA\u0005UC\ndW-\u00138g_\"Y!1\u0002\u0001A\u0002\u0003\u0007I\u0011\u0002B\u0007\u0003A\u0019WO\u001d:f]R$\u0016M\u00197f?\u0012*\u0017\u000fF\u0002k\u0005\u001fA\u0011B\u001cB\u0005\u0003\u0003\u0005\rAa\u0001\t\u0011\tM\u0001\u0001)Q\u0005\u0005\u0007\tQbY;se\u0016tG\u000fV1cY\u0016\u0004\u0003\u0006\u0002B\t\u0003oD\u0011B!\u0007\u0001\u0001\u0004%IAa\u0007\u0002\u00135\f'o[\"m_N,WC\u0001B\u000f!\r\u0019$qD\u0005\u0004\u0005C!$!D!u_6L7MQ8pY\u0016\fg\u000eC\u0005\u0003&\u0001\u0001\r\u0011\"\u0003\u0003(\u0005iQ.\u0019:l\u00072|7/Z0%KF$2A\u001bB\u0015\u0011%q'1EA\u0001\u0002\u0004\u0011i\u0002\u0003\u0005\u0003.\u0001\u0001\u000b\u0015\u0002B\u000f\u0003)i\u0017M]6DY>\u001cX\r\t\u0015\u0005\u0005W\t9\u0010C\u0005\u00034\u0001\u0001\r\u0011\"\u0003\u0003\u001c\u0005IQ.\u0019:l!\u0006,8/\u001a\u0005\n\u0005o\u0001\u0001\u0019!C\u0005\u0005s\tQ\"\\1sWB\u000bWo]3`I\u0015\fHc\u00016\u0003<!IaN!\u000e\u0002\u0002\u0003\u0007!Q\u0004\u0005\t\u0005\u007f\u0001\u0001\u0015)\u0003\u0003\u001e\u0005QQ.\u0019:l!\u0006,8/\u001a\u0011)\t\tu\u0012q\u001f\u0005\n\u0005\u000b\u0002!\u0019!C\u0005\u0005\u000f\n1bY8o]\u0016\u001cG/[8ogV\u0011!\u0011\n\t\u0007\u0003\u0013\f\u0019Na\u0013\u0011\t\t5#1K\u0007\u0003\u0005\u001fR1A!\u0015;\u0003\rqW\r^\u0005\u0005\u0005+\u0012yE\u0001\u0004T_\u000e\\W\r\u001e\u0005\t\u00053\u0002\u0001\u0015!\u0003\u0003J\u0005a1m\u001c8oK\u000e$\u0018n\u001c8tA!I!Q\f\u0001C\u0002\u0013%!qL\u0001\u0011GV\u0014(/\u001a8u#V,W/Z*ju\u0016,\"A!\u0019\u0011\u0007M\u0012\u0019'C\u0002\u0003fQ\u0012!\"\u0011;p[&\u001cGj\u001c8h\u0011!\u0011I\u0007\u0001Q\u0001\n\t\u0005\u0014!E2veJ,g\u000e^)vKV,7+\u001b>fA!I!Q\u000e\u0001C\u0002\u0013\u0005!qN\u0001\u0011kB$\u0017\r^3S_^\u001cxK]5uKJ,\"A!\u001d\u0011\t\tM$\u0011P\u0007\u0003\u0005kR1Aa\u001e\u0003\u0003\tIw.\u0003\u0003\u0003|\tU$\u0001E+qI\u0006$XMU8xg^\u0013\u0018\u000e^3s\u0011!\u0011y\b\u0001Q\u0001\n\tE\u0014!E;qI\u0006$XMU8xg^\u0013\u0018\u000e^3sA!I!1\u0011\u0001C\u0002\u0013\u0005!QQ\u0001\u0011I\u0016dW\r^3S_^\u001cxK]5uKJ,\"Aa\"\u0011\t\tM$\u0011R\u0005\u0005\u0005\u0017\u0013)H\u0001\tEK2,G/\u001a*poN<&/\u001b;fe\"A!q\u0012\u0001!\u0002\u0013\u00119)A\teK2,G/\u001a*poN<&/\u001b;fe\u0002B\u0011Ba%\u0001\u0005\u0004%\tA!&\u0002!%t7/\u001a:u%><8o\u0016:ji\u0016\u0014XC\u0001BL!\u0011\u0011\u0019H!'\n\t\tm%Q\u000f\u0002\u0011\u0013:\u001cXM\u001d;S_^\u001cxK]5uKJD\u0001Ba(\u0001A\u0003%!qS\u0001\u0012S:\u001cXM\u001d;S_^\u001cxK]5uKJ\u0004\u0003b\u0002BR\u0001\u0011\u0005\u0011q]\u0001\tSN\u001cEn\\:fI\"9!q\u0015\u0001\u0005\u0002\t%\u0016!F:fi6\u000b\u0007PQ5oY><\u0017+^3vKNK'0\u001a\u000b\u0004U\n-\u0006\u0002\u0003BW\u0005K\u0003\r!!\n\u0002\u000bY\fG.^3\t\u0013\tE\u0006A1A\u0005\n\tM\u0016A\u0004;bE2,\u0017J\u001c4p\u0007\u0006\u001c\u0007.Z\u000b\u0003\u0005k\u0003\u0002Ba.\u0003:\nu&1A\u0007\u0002m%\u0019!1\u0018\u001c\u0003#\r{gnY;se\u0016tG\u000fS1tQ6\u000b\u0007\u000fE\u0002\u0017\u0005\u007fK1A!1\u0003\u0005E!\u0016M\u00197f\u0013:4wnQ1dQ\u0016\\U-\u001f\u0005\t\u0005\u000b\u0004\u0001\u0015!\u0003\u00036\u0006yA/\u00192mK&sgm\\\"bG\",\u0007\u0005C\u0004\u0003J\u0002!\tAa3\u0002\u0017\u0005\u001c8/\u001a:u)\u0006\u0014G.Z\u000b\u0002U\"9!q\u001a\u0001\u0005\u0002\t-\u0017!\u00044mkND\u0017\t[3bI2{w\rC\u0004\u0003T\u0002!\tA!6\u0002\u0013\u0005$GMU3d_J$G\u0003\u0003Bl\u0005;\u0014YOa<\u0011\u0007}\u0011I.C\u0002\u0003\\\u0002\u0012a!\u00118z-\u0006d\u0007\u0002\u0003Bp\u0005#\u0004\rA!9\u0002\u000b\u00154XM\u001c;\u0011\t\t\r(q]\u0007\u0003\u0005KT1Aa8w\u0013\u0011\u0011IO!:\u0003\u000b\u00153XM\u001c;\t\u000f\t5(\u0011\u001ba\u0001\u0001\u0006q!-\u001b8M_\u001e4\u0015\u000e\\3oC6,\u0007b\u0002By\u0005#\u0004\r\u0001Q\u0001\nKZ,g\u000e\u001e+za\u0016DqA!>\u0001\t\u0013\u001190A\u0007`G>tg.Z2u\u001bf\u001c\u0016\u000b\u0014\u000b\u0004U\ne\b\u0002CAV\u0005g\u0004\r!a,\t\u000f\tu\b\u0001\"\u0001\u0003��\u0006qAn\\1e'\u000eDW-\\1J]\u001a|GCBB\u0001\u0007\u001b\u0019\t\u0002\u0005\u0003\u0004\u0004\r%QBAB\u0003\u0015\r\u00199AC\u0001\u0006if\u0004Xm]\u0005\u0005\u0007\u0017\u0019)A\u0001\u0006TiJ,8\r\u001e+za\u0016D\u0001ba\u0004\u0003|\u0002\u0007\u0011qV\u0001\u000fG>tg.Z2uS>t\u0017J\u001c4p\u0011!\u0019\u0019Ba?A\u0002\tu\u0016!\u0002;bE2,\u0007bBB\f\u0001\u0011\u00051\u0011D\u0001\rG>tg.Z2u\u001bf\u001c\u0016\u000b\u0014\u000b\u0006U\u000em1q\u0004\u0005\t\u0007;\u0019)\u00021\u0001\u00020\u0006AqlY8o]\u0016\u001cG\u000fC\u0005\u0004\"\rU\u0001\u0013!a\u0001'\u0006)\u0011m]=oG\"91Q\u0005\u0001\u0005\n\r\u001d\u0012\u0001\u0003;p\u001f\u001a47/\u001a;\u0015\t\u0005\u00152\u0011\u0006\u0005\t\u0007W\u0019\u0019\u00031\u0001\u0002F\u0005q!/Y<CS:dwnZ#wK:$\bbBB\u0018\u0001\u0011\u00051\u0011G\u0001\u001cG>tg/\u001a:u%\u0006<()\u001b8m_\u001e,e/\u001a8u%\u0016\u001cwN\u001d3\u0015\t\rM2Q\b\t\u0007\u0003\u007f\u0019)d!\u000f\n\u0007\r]\u0002H\u0001\u0003MSN$\bc\u00012\u0004<%\u0011Qi\u0019\u0005\t\u0007W\u0019i\u00031\u0001\u0002F!91\u0011\t\u0001\u0005\u0002\r\r\u0013a\u0005;ss^KG\u000f[8vi\u0016C8-\u001a9uS>tGc\u00016\u0004F!A1qIB \u0001\u0004\u0019I%A\u0002gk:\u0004BaHB&U&\u00191Q\n\u0011\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004bBB)\u0001\u0011\u0005!1Z\u0001\u0006a\u0006,8/\u001a\u0005\b\u0007+\u0002A\u0011\u0001Bf\u0003\u0019\u0011Xm];nK\"91\u0011\f\u0001\u0005B\rm\u0013!B2m_N,G#\u00016\t\u000f\r}\u0003\u0001\"\u0001\u0004b\u0005\u0001\u0002.\u00198eY\u0016\u001cuN\u001c8fGRLwN\u001c\u000b\u0004U\u000e\r\u0004\u0002CB3\u0007;\u0002\rAa\u0013\u0002\rM|7m[3u\u0011%\u0019I\u0007AI\u0001\n\u0003\u0019Y'\u0001\fd_:tWm\u0019;NsN\u000bF\n\n3fM\u0006,H\u000e\u001e\u00133+\t\u0019iGK\u0002T\u0007_Z#a!\u001d\u0011\t\rM4QP\u0007\u0003\u0007kRAaa\u001e\u0004z\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0007w\u0002\u0013AC1o]>$\u0018\r^5p]&!1qPB;\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0004\b\u0007\u0007\u0013\u0001\u0012ABC\u0003q\u0011\u0015N\u001c'pON{7m[3u'\u0016\u0014h/\u001a:J]\u0016CXmY;u_J\u00042AFBD\r\u0019\t!\u0001#\u0001\u0004\nN!1qQBF!\ry2QR\u0005\u0004\u0007\u001f\u0003#AB!osJ+g\rC\u0004X\u0007\u000f#\taa%\u0015\u0005\r\u0015\u0005BCBL\u0007\u000f\u0013\r\u0011\"\u0001\u0004\u001a\u0006\tb)\u0013'F?:\u000bU*R0O\u001fR{6+\u0012+\u0016\u0005\re\u0002\"CBO\u0007\u000f\u0003\u000b\u0011BB\u001d\u0003I1\u0015\nT#`\u001d\u0006kUi\u0018(P)~\u001bV\t\u0016\u0011\t\u0015\r\u00056qQI\u0001\n\u0003\u0019\u0019+A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005N\u000b\u0005\u0007W\u001a)\u000b\u0002\u0004\u001d\u0007?\u0013\r!\b")
/* loaded from: input_file:org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.class */
public class BinLogSocketServerInExecutor<T> extends SocketServerInExecutor<T> implements BinLogSocketServerSerDer, Logging {
    private final boolean isWriteAheadStorage;
    private Thread connectThread;
    private BinaryLogClient org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient;
    private String org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile;
    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition;
    private final ArrayDeque<RawBinlogEvent> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue;
    private final BinlogWriteAheadLog org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog;
    private Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern;
    private Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern;
    private long maxBinlogQueueSize;
    private MySQLConnectionInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect;
    private ArrayBuffer<RawBinlogEvent> aheadLogBuffer;
    private volatile boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable;
    private volatile TableInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable;
    private volatile AtomicBoolean markClose;
    private volatile AtomicBoolean markPause;
    private final ArrayBuffer<Socket> connections;
    private final AtomicLong currentQueueSize;
    private final UpdateRowsWriter updateRowsWriter;
    private final DeleteRowsWriter deleteRowsWriter;
    private final InsertRowsWriter insertRowsWriter;
    private final ConcurrentHashMap<TableInfoCacheKey, TableInfo> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String FILE_NAME_NOT_SET() {
        return BinLogSocketServerInExecutor$.MODULE$.FILE_NAME_NOT_SET();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public Request readRequest(DataInputStream dataInputStream) {
        return BinLogSocketServerSerDer.Cclass.readRequest(this, dataInputStream);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void sendRequest(DataOutputStream dataOutputStream, Request request) {
        BinLogSocketServerSerDer.Cclass.sendRequest(this, dataOutputStream, request);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void sendResponse(DataOutputStream dataOutputStream, Response response) {
        BinLogSocketServerSerDer.Cclass.sendResponse(this, dataOutputStream, response);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public Response readResponse(DataInputStream dataInputStream) {
        return BinLogSocketServerSerDer.Cclass.readResponse(this, dataInputStream);
    }

    private Thread connectThread() {
        return this.connectThread;
    }

    private void connectThread_$eq(Thread thread) {
        this.connectThread = thread;
    }

    public BinaryLogClient org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient_$eq(BinaryLogClient binaryLogClient) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient = binaryLogClient;
    }

    private String org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(String str) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile = str;
    }

    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(long j) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition = j;
    }

    public ArrayDeque<RawBinlogEvent> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue;
    }

    public BinlogWriteAheadLog org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog;
    }

    public Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern_$eq(Option<Pattern> option) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern = option;
    }

    public Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern_$eq(Option<Pattern> option) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern = option;
    }

    private long maxBinlogQueueSize() {
        return this.maxBinlogQueueSize;
    }

    private void maxBinlogQueueSize_$eq(long j) {
        this.maxBinlogQueueSize = j;
    }

    public MySQLConnectionInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect_$eq(MySQLConnectionInfo mySQLConnectionInfo) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect = mySQLConnectionInfo;
    }

    private ArrayBuffer<RawBinlogEvent> aheadLogBuffer() {
        return this.aheadLogBuffer;
    }

    private void aheadLogBuffer_$eq(ArrayBuffer<RawBinlogEvent> arrayBuffer) {
        this.aheadLogBuffer = arrayBuffer;
    }

    public boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable_$eq(boolean z) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable = z;
    }

    public TableInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(TableInfo tableInfo) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable = tableInfo;
    }

    private AtomicBoolean markClose() {
        return this.markClose;
    }

    private void markClose_$eq(AtomicBoolean atomicBoolean) {
        this.markClose = atomicBoolean;
    }

    private AtomicBoolean markPause() {
        return this.markPause;
    }

    private void markPause_$eq(AtomicBoolean atomicBoolean) {
        this.markPause = atomicBoolean;
    }

    private ArrayBuffer<Socket> connections() {
        return this.connections;
    }

    private AtomicLong currentQueueSize() {
        return this.currentQueueSize;
    }

    public UpdateRowsWriter updateRowsWriter() {
        return this.updateRowsWriter;
    }

    public DeleteRowsWriter deleteRowsWriter() {
        return this.deleteRowsWriter;
    }

    public InsertRowsWriter insertRowsWriter() {
        return this.insertRowsWriter;
    }

    public boolean isClosed() {
        return markClose().get();
    }

    public void setMaxBinlogQueueSize(long j) {
        maxBinlogQueueSize_$eq(j);
    }

    public ConcurrentHashMap<TableInfoCacheKey, TableInfo> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache;
    }

    public void assertTable() {
        if (org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() == null) {
            throw new RuntimeException("No table information is available for this event, cannot process further.");
        }
    }

    public synchronized void flushAheadLog() {
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().write(aheadLogBuffer());
        aheadLogBuffer().clear();
    }

    public Object addRecord(Event event, String str, String str2) {
        assertTable();
        RawBinlogEvent rawBinlogEvent = new RawBinlogEvent(event, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable(), str, str2, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition());
        if (this.isWriteAheadStorage && aheadLogBuffer().size() >= 1000) {
            flushAheadLog();
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().cleanupOldBlocks(System.currentTimeMillis() - 3600000, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().cleanupOldBlocks$default$2());
        }
        if (this.isWriteAheadStorage) {
            aheadLogBuffer().$plus$eq(rawBinlogEvent);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (this.isWriteAheadStorage) {
            return BoxedUnit.UNIT;
        }
        if (currentQueueSize().get() > maxBinlogQueueSize() && !markPause().get()) {
            pause();
        } else if (currentQueueSize().get() < maxBinlogQueueSize() / 2 && markPause().get()) {
            resume();
        }
        currentQueueSize().incrementAndGet();
        return BoxesRunTime.boxToBoolean(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().offer(rawBinlogEvent));
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(final MySQLConnectionInfo mySQLConnectionInfo) {
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient_$eq(new BinaryLogClient(mySQLConnectionInfo.host(), mySQLConnectionInfo.port(), mySQLConnectionInfo.userName(), mySQLConnectionInfo.password()));
        Some binlogFileName = mySQLConnectionInfo.binlogFileName();
        if (binlogFileName instanceof Some) {
            String str = (String) binlogFileName.x();
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setBinlogFilename(str);
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(str);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Some recordPos = mySQLConnectionInfo.recordPos();
        if (recordPos instanceof Some) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setBinlogPosition(BoxesRunTime.unboxToLong(recordPos.x()));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, new EventDeserializer.CompatibilityMode[]{EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY});
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setEventDeserializer(eventDeserializer);
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().registerEventListener(new BinaryLogClient.EventListener(this, mySQLConnectionInfo) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$3
            private final /* synthetic */ BinLogSocketServerInExecutor $outer;
            private final MySQLConnectionInfo connect$1;

            public void onEvent(Event event) {
                BoxedUnit boxedUnit5;
                BoxedUnit boxedUnit6;
                BoxedUnit boxedUnit7;
                BoxedUnit boxedUnit8;
                EventHeaderV4 header = event.getHeader();
                EventType eventType = header.getEventType();
                EventType eventType2 = EventType.ROTATE;
                if (eventType != null ? !eventType.equals(eventType2) : eventType2 != null) {
                    EventType eventType3 = EventType.FORMAT_DESCRIPTION;
                    if (eventType != null ? !eventType.equals(eventType3) : eventType3 != null) {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(header.getPosition());
                    }
                }
                if (EventType.TABLE_MAP.equals(eventType)) {
                    TableMapEventData data = event.getData();
                    this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable_$eq((BoxesRunTime.unboxToBoolean(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern().map(new BinLogSocketServerInExecutor$$anon$3$$anonfun$onEvent$3(this, data)).getOrElse(new BinLogSocketServerInExecutor$$anon$3$$anonfun$onEvent$1(this))) && BoxesRunTime.unboxToBoolean(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern().map(new BinLogSocketServerInExecutor$$anon$3$$anonfun$onEvent$4(this, data)).getOrElse(new BinLogSocketServerInExecutor$$anon$3$$anonfun$onEvent$2(this)))) ? false : true);
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(null);
                        boxedUnit8 = BoxedUnit.UNIT;
                    } else {
                        TableInfoCacheKey tableInfoCacheKey = new TableInfoCacheKey(data.getDatabase(), data.getTable(), data.getTableId());
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache().get(tableInfoCacheKey));
                        if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() == null) {
                            TableInfo tableInfo = new TableInfo(tableInfoCacheKey.databaseName(), tableInfoCacheKey.tableName(), Predef$.MODULE$.long2Long(tableInfoCacheKey.tableId()), this.$outer.loadSchemaInfo(this.connect$1, tableInfoCacheKey).json());
                            this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache().put(tableInfoCacheKey, tableInfo);
                            this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(tableInfo);
                            boxedUnit8 = BoxedUnit.UNIT;
                        } else {
                            boxedUnit8 = BoxedUnit.UNIT;
                        }
                    }
                    return;
                }
                if (EventType.PRE_GA_WRITE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.WRITE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.EXT_WRITE_ROWS.equals(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit7 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.INSERT_EVENT);
                        boxedUnit7 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (EventType.PRE_GA_UPDATE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.UPDATE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.EXT_UPDATE_ROWS.equals(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit6 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.UPDATE_EVENT);
                        boxedUnit6 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (EventType.PRE_GA_DELETE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit13 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.DELETE_ROWS.equals(eventType)) {
                    BoxedUnit boxedUnit14 = BoxedUnit.UNIT;
                    return;
                }
                if (EventType.EXT_DELETE_ROWS.equals(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit5 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.DELETE_EVENT);
                        boxedUnit5 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (!EventType.ROTATE.equals(eventType)) {
                    BoxedUnit boxedUnit15 = BoxedUnit.UNIT;
                    return;
                }
                RotateEventData data2 = event.getData();
                this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(data2.getBinlogFilename());
                this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(data2.getBinlogPosition());
                BoxedUnit boxedUnit16 = BoxedUnit.UNIT;
            }

            /* JADX WARN: Multi-variable type inference failed */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.connect$1 = mySQLConnectionInfo;
            }
        });
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().connect();
    }

    public StructType loadSchemaInfo(MySQLConnectionInfo mySQLConnectionInfo, TableInfoCacheKey tableInfoCacheKey) {
        return JDBCRDD$.MODULE$.resolveTable(new JDBCOptions(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("url"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"jdbc:mysql://", ":", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{mySQLConnectionInfo.host(), BoxesRunTime.boxToInteger(mySQLConnectionInfo.port())}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), mySQLConnectionInfo.userName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("password"), mySQLConnectionInfo.password()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("dbtable"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{tableInfoCacheKey.databaseName(), tableInfoCacheKey.tableName()})))}))));
    }

    public void connectMySQL(MySQLConnectionInfo mySQLConnectionInfo, boolean z) {
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect_$eq(mySQLConnectionInfo);
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern_$eq(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().databaseNamePattern().map(new BinLogSocketServerInExecutor$$anonfun$connectMySQL$1(this)));
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern_$eq(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().tableNamePattern().map(new BinLogSocketServerInExecutor$$anonfun$connectMySQL$2(this)));
        if (!z) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect());
        } else {
            connectThread_$eq(new Thread(this) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$1
                private final /* synthetic */ BinLogSocketServerInExecutor $outer;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect());
                    } catch (Exception e) {
                        throw e;
                    }
                }

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"connect mysql(", ", ", ") "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().host(), BoxesRunTime.boxToInteger(this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().port())})));
                    if (this == 0) {
                        throw null;
                    }
                    this.$outer = this;
                    setDaemon(true);
                }
            });
            connectThread().start();
        }
    }

    public boolean connectMySQL$default$2() {
        return true;
    }

    public long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(RawBinlogEvent rawBinlogEvent) {
        return BinlogOffset$.MODULE$.fromFileAndPos(rawBinlogEvent.getBinlogFilename(), Predef$.MODULE$.Long2long(rawBinlogEvent.getPos())).offset();
    }

    public List<String> convertRawBinlogEventRecord(RawBinlogEvent rawBinlogEvent) {
        List<String> arrayList;
        AbstractEventWriter deleteRowsWriter;
        String eventType = rawBinlogEvent.getEventType();
        try {
            if ("insert".equals(eventType)) {
                deleteRowsWriter = insertRowsWriter();
            } else if ("update".equals(eventType)) {
                deleteRowsWriter = updateRowsWriter();
            } else {
                if (!"delete".equals(eventType)) {
                    throw new MatchError(eventType);
                }
                deleteRowsWriter = deleteRowsWriter();
            }
            arrayList = deleteRowsWriter.writeEvent(rawBinlogEvent);
        } catch (Exception e) {
            logError(new BinLogSocketServerInExecutor$$anonfun$1(this), e);
            arrayList = new ArrayList<>();
        }
        return arrayList;
    }

    public void tryWithoutException(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Exception e) {
        }
    }

    public void pause() {
        if (!markPause().compareAndSet(false, true) || org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() == null) {
            return;
        }
        tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$pause$1(this));
    }

    public void resume() {
        if (markPause().compareAndSet(true, false)) {
            connectThread_$eq(new Thread(this) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$2
                private final /* synthetic */ BinLogSocketServerInExecutor $outer;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().connect();
                    } catch (Exception e) {
                        throw e;
                    }
                }

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"connect mysql(", ", ", ") "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().host(), BoxesRunTime.boxToInteger(this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().port())})));
                    if (this == 0) {
                        throw null;
                    }
                    this.$outer = this;
                    setDaemon(true);
                }
            });
            connectThread().start();
        }
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor
    public void close() {
        if (markClose().compareAndSet(false, true)) {
            logInfo(new BinLogSocketServerInExecutor$$anonfun$close$5(this));
            connections().foreach(new BinLogSocketServerInExecutor$$anonfun$close$6(this));
            connections().clear();
            if (org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() != null) {
                tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$1(this));
            }
            if (server() != null) {
                tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$2(this));
            }
            tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$3(this));
            tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$4(this));
        }
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor
    public void handleConnection(Socket socket) {
        connections().$plus$eq(socket);
        socket.setKeepAlive(true);
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        while (true) {
            Request readRequest = readRequest(dataInputStream);
            if (readRequest instanceof ShutdownBinlogServer) {
                close();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (readRequest instanceof RequestQueueSize) {
                sendResponse(dataOutputStream, new QueueSizeResponse(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().size()));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else if (readRequest instanceof RequestOffset) {
                sendResponse(dataOutputStream, new OffsetResponse(BinlogOffset$.MODULE$.fromFileAndPos(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile(), org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition() + 1).offset()));
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                if (!(readRequest instanceof RequestData)) {
                    throw new MatchError(readRequest);
                }
                RequestData requestData = (RequestData) readRequest;
                long startOffset = requestData.startOffset();
                long endOffset = requestData.endOffset();
                ArrayBuffer arrayBuffer = (ArrayBuffer) ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
                try {
                    if (this.isWriteAheadStorage) {
                        flushAheadLog();
                        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().read(new BinLogSocketServerInExecutor$$anonfun$handleConnection$1(this, startOffset, endOffset, arrayBuffer));
                    } else {
                        RawBinlogEvent poll = org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().poll();
                        while (poll != null && org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(poll) >= startOffset && org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(poll) < endOffset) {
                            arrayBuffer.$plus$plus$eq((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(convertRawBinlogEventRecord(poll)).asScala());
                            poll = org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().poll();
                            currentQueueSize().decrementAndGet();
                        }
                    }
                } catch (Exception e) {
                    logError(new BinLogSocketServerInExecutor$$anonfun$handleConnection$2(this), e);
                }
                sendResponse(dataOutputStream, new DataResponse(arrayBuffer.toList()));
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                logError(new BinLogSocketServerInExecutor$$anonfun$handleConnection$2(this), e);
                sendResponse(dataOutputStream, new DataResponse(arrayBuffer.toList()));
                BoxedUnit boxedUnit42 = BoxedUnit.UNIT;
            }
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BinLogSocketServerInExecutor(AtomicReference<T> atomicReference, String str, Configuration configuration, boolean z) {
        super(atomicReference, "binlog-socket-server-in-executor");
        this.isWriteAheadStorage = z;
        BinLogSocketServerSerDer.Cclass.$init$(this);
        Logging.class.$init$(this);
        this.connectThread = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition = 4L;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue = new ArrayDeque<>();
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        BinlogWriteAheadLog binlogWriteAheadLog = new BinlogWriteAheadLog(UUID.randomUUID().toString(), sparkEnv.serializerManager(), sparkEnv.conf(), configuration, str, BinlogWriteAheadLog$.MODULE$.$lessinit$greater$default$6());
        binlogWriteAheadLog.cleanupOldBlocks(System.currentTimeMillis(), true);
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog = binlogWriteAheadLog;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern = None$.MODULE$;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern = None$.MODULE$;
        this.maxBinlogQueueSize = 0L;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect = null;
        this.aheadLogBuffer = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable = false;
        this.markClose = new AtomicBoolean(false);
        this.markPause = new AtomicBoolean(false);
        this.connections = new ArrayBuffer<>();
        this.currentQueueSize = new AtomicLong(0L);
        this.updateRowsWriter = new UpdateRowsWriter();
        this.deleteRowsWriter = new DeleteRowsWriter();
        this.insertRowsWriter = new InsertRowsWriter();
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache = new ConcurrentHashMap<>();
    }
}
