package org.apache.spark.sql.mlsql.sources.mysql.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.AbstractEventWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.DeleteRowsWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.EventInfo;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.InsertRowsWriter;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.BinlogWriteAheadLog;
import org.apache.spark.streaming.BinlogWriteAheadLog$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: BinLogSocketServerInExecutor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00115b\u0001B\u0001\u0003\u0001M\u0011ADQ5o\u0019><7k\\2lKR\u001cVM\u001d<fe&sW\t_3dkR|'O\u0003\u0002\u0004\t\u00051!-\u001b8m_\u001eT!!\u0002\u0004\u0002\u000b5L8/\u001d7\u000b\u0005\u001dA\u0011aB:pkJ\u001cWm\u001d\u0006\u0003\u0013)\tQ!\u001c7tc2T!a\u0003\u0007\u0002\u0007M\fHN\u0003\u0002\u000e\u001d\u0005)1\u000f]1sW*\u0011q\u0002E\u0001\u0007CB\f7\r[3\u000b\u0003E\t1a\u001c:h\u0007\u0001)\"\u0001F\u000e\u0014\t\u0001)rE\u000b\t\u0004-]IR\"\u0001\u0002\n\u0005a\u0011!AF*pG.,GoU3sm\u0016\u0014\u0018J\\#yK\u000e,Ho\u001c:\u0011\u0005iYB\u0002\u0001\u0003\u00069\u0001\u0011\r!\b\u0002\u0002)F\u0011a\u0004\n\t\u0003?\tj\u0011\u0001\t\u0006\u0002C\u0005)1oY1mC&\u00111\u0005\t\u0002\b\u001d>$\b.\u001b8h!\tyR%\u0003\u0002'A\t\u0019\u0011I\\=\u0011\u0005YA\u0013BA\u0015\u0003\u0005a\u0011\u0015N\u001c'pON{7m[3u'\u0016\u0014h/\u001a:TKJ$UM\u001d\t\u0003W9j\u0011\u0001\f\u0006\u0003[1\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003_1\u0012q\u0001T8hO&tw\r\u0003\u00052\u0001\t\u0005\t\u0015!\u00033\u00039!\u0018m]6D_:$X\r\u001f;SK\u001a\u00042a\r\u001f\u001a\u001b\u0005!$BA\u001b7\u0003\u0019\tGo\\7jG*\u0011q\u0007O\u0001\u000bG>t7-\u001e:sK:$(BA\u001d;\u0003\u0011)H/\u001b7\u000b\u0003m\nAA[1wC&\u0011Q\b\u000e\u0002\u0010\u0003R|W.[2SK\u001a,'/\u001a8dK\"Aq\b\u0001B\u0001B\u0003%\u0001)A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d\t\u0003\u0003\u0012s!a\b\"\n\u0005\r\u0003\u0013A\u0002)sK\u0012,g-\u0003\u0002F\r\n11\u000b\u001e:j]\u001eT!a\u0011\u0011\t\u0011!\u0003!\u0011!Q\u0001\n\u0001\u000b\u0001\u0002^5nKj|g.\u001a\u0005\t\u0015\u0002\u0011\t\u0011)A\u0005\u0017\u0006Q\u0001.\u00193p_B\u001cuN\u001c4\u0011\u00051\u000bV\"A'\u000b\u00059{\u0015\u0001B2p]\u001aT!\u0001\u0015\b\u0002\r!\fGm\\8q\u0013\t\u0011VJA\u0007D_:4\u0017nZ;sCRLwN\u001c\u0005\t)\u0002\u0011\t\u0011)A\u0005+\u0006\u0019\u0012n],sSR,\u0017\t[3bIN#xN]1hKB\u0011qDV\u0005\u0003/\u0002\u0012qAQ8pY\u0016\fg\u000eC\u0003Z\u0001\u0011\u0005!,\u0001\u0004=S:LGO\u0010\u000b\u00077rkfl\u00181\u0011\u0007Y\u0001\u0011\u0004C\u000321\u0002\u0007!\u0007C\u0003@1\u0002\u0007\u0001\tC\u0003I1\u0002\u0007\u0001\tC\u0003K1\u0002\u00071\nC\u0004U1B\u0005\t\u0019A+\t\u000f\t\u0004\u0001\u0019!C\u0005G\u0006i1m\u001c8oK\u000e$H\u000b\u001b:fC\u0012,\u0012\u0001\u001a\t\u0003K\"l\u0011A\u001a\u0006\u0003Oj\nA\u0001\\1oO&\u0011\u0011N\u001a\u0002\u0007)\"\u0014X-\u00193\t\u000f-\u0004\u0001\u0019!C\u0005Y\u0006\t2m\u001c8oK\u000e$H\u000b\u001b:fC\u0012|F%Z9\u0015\u00055\u0004\bCA\u0010o\u0013\ty\u0007E\u0001\u0003V]&$\bbB9k\u0003\u0003\u0005\r\u0001Z\u0001\u0004q\u0012\n\u0004BB:\u0001A\u0003&A-\u0001\bd_:tWm\u0019;UQJ,\u0017\r\u001a\u0011\t\u000fU\u0004\u0001\u0019!C\u0005m\u0006y!-\u001b8befdunZ\"mS\u0016tG/F\u0001x!\rA\u00181A\u0007\u0002s*\u00111A\u001f\u0006\u0003\u000bmT!\u0001`?\u0002\rMD\u00170[6p\u0015\tqx0\u0001\u0004hSRDWO\u0019\u0006\u0003\u0003\u0003\t1aY8n\u0013\r\t)!\u001f\u0002\u0010\u0005&t\u0017M]=M_\u001e\u001cE.[3oi\"I\u0011\u0011\u0002\u0001A\u0002\u0013%\u00111B\u0001\u0014E&t\u0017M]=M_\u001e\u001cE.[3oi~#S-\u001d\u000b\u0004[\u00065\u0001\u0002C9\u0002\b\u0005\u0005\t\u0019A<\t\u000f\u0005E\u0001\u0001)Q\u0005o\u0006\u0001\"-\u001b8befdunZ\"mS\u0016tG\u000f\t\u0005\n\u0003+\u0001\u0001\u0019!C\u0005\u0003/\t\u0011cY;se\u0016tGOQ5oY><g)\u001b7f+\u0005\u0001\u0005\"CA\u000e\u0001\u0001\u0007I\u0011BA\u000f\u0003U\u0019WO\u001d:f]R\u0014\u0015N\u001c7pO\u001aKG.Z0%KF$2!\\A\u0010\u0011!\t\u0018\u0011DA\u0001\u0002\u0004\u0001\u0005bBA\u0012\u0001\u0001\u0006K\u0001Q\u0001\u0013GV\u0014(/\u001a8u\u0005&tGn\\4GS2,\u0007\u0005C\u0005\u0002(\u0001\u0001\r\u0011\"\u0003\u0002*\u0005)2-\u001e:sK:$()\u001b8m_\u001e\u0004vn]5uS>tWCAA\u0016!\ry\u0012QF\u0005\u0004\u0003_\u0001#\u0001\u0002'p]\u001eD\u0011\"a\r\u0001\u0001\u0004%I!!\u000e\u00023\r,(O]3oi\nKg\u000e\\8h!>\u001c\u0018\u000e^5p]~#S-\u001d\u000b\u0004[\u0006]\u0002\"C9\u00022\u0005\u0005\t\u0019AA\u0016\u0011!\tY\u0004\u0001Q!\n\u0005-\u0012AF2veJ,g\u000e\u001e\"j]2|w\rU8tSRLwN\u001c\u0011\t\u0013\u0005}\u0002\u00011A\u0005\n\u0005\u0005\u0013\u0001I2veJ,g\u000e\u001e\"j]2|w\rU8tSRLwN\\\"p]N,X.\u001a$mC\u001e,\u0012!\u0016\u0005\n\u0003\u000b\u0002\u0001\u0019!C\u0005\u0003\u000f\nAeY;se\u0016tGOQ5oY><\u0007k\\:ji&|gnQ8ogVlWM\u00127bO~#S-\u001d\u000b\u0004[\u0006%\u0003\u0002C9\u0002D\u0005\u0005\t\u0019A+\t\u000f\u00055\u0003\u0001)Q\u0005+\u0006\t3-\u001e:sK:$()\u001b8m_\u001e\u0004vn]5uS>t7i\u001c8tk6,g\t\\1hA!I\u0011\u0011\u000b\u0001A\u0002\u0013%\u0011\u0011F\u0001\u0013]\u0016DHOQ5oY><\u0007k\\:ji&|g\u000eC\u0005\u0002V\u0001\u0001\r\u0011\"\u0003\u0002X\u00051b.\u001a=u\u0005&tGn\\4Q_NLG/[8o?\u0012*\u0017\u000fF\u0002n\u00033B\u0011\"]A*\u0003\u0003\u0005\r!a\u000b\t\u0011\u0005u\u0003\u0001)Q\u0005\u0003W\t1C\\3yi\nKg\u000e\\8h!>\u001c\u0018\u000e^5p]\u0002B\u0011\"!\u0019\u0001\u0005\u0004%I!a\u0019\u0002\u000bE,X-^3\u0016\u0005\u0005\u0015\u0004CBA4\u0003S\ni'D\u00019\u0013\r\tY\u0007\u000f\u0002\u000b\u0003J\u0014\u0018-\u001f#fcV,\u0007c\u0001\f\u0002p%\u0019\u0011\u0011\u000f\u0002\u0003\u001dI\u000bwOQ5oY><WI^3oi\"A\u0011Q\u000f\u0001!\u0002\u0013\t)'\u0001\u0004rk\u0016,X\r\t\u0005\n\u0003s\u0002!\u0019!C\u0005\u0003w\nQb\u001e:ji\u0016\f\u0005.Z1e\u0019><WCAA?!\u0011\ty(!\"\u000e\u0005\u0005\u0005%bAAB\u0019\u0005I1\u000f\u001e:fC6LgnZ\u0005\u0005\u0003\u000f\u000b\tIA\nCS:dwnZ,sSR,\u0017\t[3bI2{w\r\u0003\u0005\u0002\f\u0002\u0001\u000b\u0011BA?\u000399(/\u001b;f\u0003\",\u0017\r\u001a'pO\u0002B\u0011\"a$\u0001\u0001\u0004%I!!%\u0002'\u0011\fG/\u00192bg\u0016t\u0015-\\3QCR$XM\u001d8\u0016\u0005\u0005M\u0005#B\u0010\u0002\u0016\u0006e\u0015bAALA\t1q\n\u001d;j_:\u0004B!a'\u0002\"6\u0011\u0011Q\u0014\u0006\u0004\u0003?C\u0014!\u0002:fO\u0016D\u0018\u0002BAR\u0003;\u0013q\u0001U1ui\u0016\u0014h\u000eC\u0005\u0002(\u0002\u0001\r\u0011\"\u0003\u0002*\u00069B-\u0019;bE\u0006\u001cXMT1nKB\u000bG\u000f^3s]~#S-\u001d\u000b\u0004[\u0006-\u0006\"C9\u0002&\u0006\u0005\t\u0019AAJ\u0011!\ty\u000b\u0001Q!\n\u0005M\u0015\u0001\u00063bi\u0006\u0014\u0017m]3OC6,\u0007+\u0019;uKJt\u0007\u0005C\u0005\u00024\u0002\u0001\r\u0011\"\u0003\u0002\u0012\u0006\u0001B/\u00192mK:\u000bW.\u001a)biR,'O\u001c\u0005\n\u0003o\u0003\u0001\u0019!C\u0005\u0003s\u000bA\u0003^1cY\u0016t\u0015-\\3QCR$XM\u001d8`I\u0015\fHcA7\u0002<\"I\u0011/!.\u0002\u0002\u0003\u0007\u00111\u0013\u0005\t\u0003\u007f\u0003\u0001\u0015)\u0003\u0002\u0014\u0006\tB/\u00192mK:\u000bW.\u001a)biR,'O\u001c\u0011\t\u0013\u0005\r\u0007\u00011A\u0005\n\u0005%\u0012AE7bq\nKg\u000e\\8h#V,W/Z*ju\u0016D\u0011\"a2\u0001\u0001\u0004%I!!3\u0002-5\f\u0007PQ5oY><\u0017+^3vKNK'0Z0%KF$2!\\Af\u0011%\t\u0018QYA\u0001\u0002\u0004\tY\u0003\u0003\u0005\u0002P\u0002\u0001\u000b\u0015BA\u0016\u0003Mi\u0017\r\u001f\"j]2|w-U;fk\u0016\u001c\u0016N_3!\u0011%\t\u0019\u000e\u0001a\u0001\n\u0013\t).A\u0004d_:tWm\u0019;\u0016\u0005\u0005]\u0007c\u0001\f\u0002Z&\u0019\u00111\u001c\u0002\u0003'5K8+\u0015'D_:tWm\u0019;j_:LeNZ8\t\u0013\u0005}\u0007\u00011A\u0005\n\u0005\u0005\u0018aC2p]:,7\r^0%KF$2!\\Ar\u0011%\t\u0018Q\\A\u0001\u0002\u0004\t9\u000e\u0003\u0005\u0002h\u0002\u0001\u000b\u0015BAl\u0003!\u0019wN\u001c8fGR\u0004\u0003\"CAv\u0001\t\u0007I\u0011BAw\u00039\t\u0007.Z1e\u0019><')\u001e4gKJ,\"!a<\u0011\r\u0005E\u00181_A7\u001b\u00051\u0014bAA{m\t)2i\u001c8dkJ\u0014XM\u001c;MS:\\W\r\u001a#fcV,\u0007\u0002CA}\u0001\u0001\u0006I!a<\u0002\u001f\u0005DW-\u00193M_\u001e\u0014UO\u001a4fe\u0002B\u0011\"!@\u0001\u0001\u0004%I!!\u0011\u0002\u0013M\\\u0017\u000e\u001d+bE2,\u0007\"\u0003B\u0001\u0001\u0001\u0007I\u0011\u0002B\u0002\u00035\u00198.\u001b9UC\ndWm\u0018\u0013fcR\u0019QN!\u0002\t\u0011E\fy0!AA\u0002UCqA!\u0003\u0001A\u0003&Q+\u0001\u0006tW&\u0004H+\u00192mK\u0002BCAa\u0002\u0003\u000eA\u0019qDa\u0004\n\u0007\tE\u0001E\u0001\u0005w_2\fG/\u001b7f\u0011-\u0011)\u0002\u0001a\u0001\u0002\u0004%IAa\u0006\u0002\u0019\r,(O]3oiR\u000b'\r\\3\u0016\u0005\te\u0001c\u0001\f\u0003\u001c%\u0019!Q\u0004\u0002\u0003\u0013Q\u000b'\r\\3J]\u001a|\u0007b\u0003B\u0011\u0001\u0001\u0007\t\u0019!C\u0005\u0005G\t\u0001cY;se\u0016tG\u000fV1cY\u0016|F%Z9\u0015\u00075\u0014)\u0003C\u0005r\u0005?\t\t\u00111\u0001\u0003\u001a!A!\u0011\u0006\u0001!B\u0013\u0011I\"A\u0007dkJ\u0014XM\u001c;UC\ndW\r\t\u0015\u0005\u0005O\u0011i\u0001C\u0005\u00030\u0001\u0001\r\u0011\"\u0003\u00032\u0005IQ.\u0019:l\u00072|7/Z\u000b\u0003\u0005g\u00012a\rB\u001b\u0013\r\u00119\u0004\u000e\u0002\u000e\u0003R|W.[2C_>dW-\u00198\t\u0013\tm\u0002\u00011A\u0005\n\tu\u0012!D7be.\u001cEn\\:f?\u0012*\u0017\u000fF\u0002n\u0005\u007fA\u0011\"\u001dB\u001d\u0003\u0003\u0005\rAa\r\t\u0011\t\r\u0003\u0001)Q\u0005\u0005g\t!\"\\1sW\u000ecwn]3!Q\u0011\u0011\tE!\u0004\t\u0013\t%\u0003\u00011A\u0005\n\tE\u0012!C7be.\u0004\u0016-^:f\u0011%\u0011i\u0005\u0001a\u0001\n\u0013\u0011y%A\u0007nCJ\\\u0007+Y;tK~#S-\u001d\u000b\u0004[\nE\u0003\"C9\u0003L\u0005\u0005\t\u0019\u0001B\u001a\u0011!\u0011)\u0006\u0001Q!\n\tM\u0012AC7be.\u0004\u0016-^:fA!\"!1\u000bB\u0007\u0011%\u0011Y\u0006\u0001b\u0001\n\u0013\u0011i&A\u0006d_:tWm\u0019;j_:\u001cXC\u0001B0!\u0019\u0011\tGa\u001b\u0003p5\u0011!1\r\u0006\u0005\u0005K\u00129'A\u0004nkR\f'\r\\3\u000b\u0007\t%\u0004%\u0001\u0006d_2dWm\u0019;j_:LAA!\u001c\u0003d\tY\u0011I\u001d:bs\n+hMZ3s!\u0011\u0011\tHa\u001e\u000e\u0005\tM$b\u0001B;u\u0005\u0019a.\u001a;\n\t\te$1\u000f\u0002\u0007'>\u001c7.\u001a;\t\u0011\tu\u0004\u0001)A\u0005\u0005?\nAbY8o]\u0016\u001cG/[8og\u0002B\u0011B!!\u0001\u0005\u0004%IAa!\u0002!\r,(O]3oiF+X-^3TSj,WC\u0001BC!\r\u0019$qQ\u0005\u0004\u0005\u0013#$AC!u_6L7\rT8oO\"A!Q\u0012\u0001!\u0002\u0013\u0011))A\tdkJ\u0014XM\u001c;Rk\u0016,XmU5{K\u0002B\u0011B!%\u0001\u0005\u0004%\tAa%\u0002!U\u0004H-\u0019;f%><8o\u0016:ji\u0016\u0014XC\u0001BK!\u0011\u00119J!(\u000e\u0005\te%b\u0001BN\u0005\u0005\u0011\u0011n\\\u0005\u0005\u0005?\u0013IJ\u0001\tVa\u0012\fG/\u001a*poN<&/\u001b;fe\"A!1\u0015\u0001!\u0002\u0013\u0011)*A\tva\u0012\fG/\u001a*poN<&/\u001b;fe\u0002B\u0011Ba*\u0001\u0005\u0004%\tA!+\u0002!\u0011,G.\u001a;f%><8o\u0016:ji\u0016\u0014XC\u0001BV!\u0011\u00119J!,\n\t\t=&\u0011\u0014\u0002\u0011\t\u0016dW\r^3S_^\u001cxK]5uKJD\u0001Ba-\u0001A\u0003%!1V\u0001\u0012I\u0016dW\r^3S_^\u001cxK]5uKJ\u0004\u0003\"\u0003B\\\u0001\t\u0007I\u0011\u0001B]\u0003AIgn]3siJ{wo],sSR,'/\u0006\u0002\u0003<B!!q\u0013B_\u0013\u0011\u0011yL!'\u0003!%s7/\u001a:u%><8o\u0016:ji\u0016\u0014\b\u0002\u0003Bb\u0001\u0001\u0006IAa/\u0002#%t7/\u001a:u%><8o\u0016:ji\u0016\u0014\b\u0005C\u0004\u0003H\u0002!\t!!\u0011\u0002\u0011%\u001c8\t\\8tK\u0012DqAa3\u0001\t\u0003\u0011i-A\u000btKRl\u0015\r\u001f\"j]2|w-U;fk\u0016\u001c\u0016N_3\u0015\u00075\u0014y\r\u0003\u0005\u0003R\n%\u0007\u0019AA\u0016\u0003\u00151\u0018\r\\;f\u0011%\u0011)\u000e\u0001b\u0001\n\u0013\u00119.\u0001\buC\ndW-\u00138g_\u000e\u000b7\r[3\u0016\u0005\te\u0007\u0003CAy\u00057\u0014yN!\u0007\n\u0007\tugGA\tD_:\u001cWO\u001d:f]RD\u0015m\u001d5NCB\u00042A\u0006Bq\u0013\r\u0011\u0019O\u0001\u0002\u0012)\u0006\u0014G.Z%oM>\u001c\u0015m\u00195f\u0017\u0016L\b\u0002\u0003Bt\u0001\u0001\u0006IA!7\u0002\u001fQ\f'\r\\3J]\u001a|7)Y2iK\u0002BqAa;\u0001\t\u0003\u0011i/A\u0006bgN,'\u000f\u001e+bE2,W#A7\t\u000f\tE\b\u0001\"\u0001\u0003t\u0006ia\r\\;tQ\u0006CW-\u00193M_\u001e,\u0012\u0001\n\u0005\b\u0005o\u0004A\u0011\u0001B}\u0003%\tG\r\u001a*fG>\u0014H\r\u0006\u0005\u0003|\u000e\u00051qBB\n!\ry\"Q`\u0005\u0004\u0005\u007f\u0004#AB!osZ\u000bG\u000e\u0003\u0005\u0004\u0004\tU\b\u0019AB\u0003\u0003\u0015)g/\u001a8u!\u0011\u00199aa\u0003\u000e\u0005\r%!bAB\u0002s&!1QBB\u0005\u0005\u0015)e/\u001a8u\u0011\u001d\u0019\tB!>A\u0002\u0001\u000baBY5o\u0019><g)\u001b7f]\u0006lW\rC\u0004\u0004\u0016\tU\b\u0019\u0001!\u0002\u0013\u00154XM\u001c;UsB,\u0007\"CB\r\u0001\u0001\u0007I\u0011AB\u000e\u0003myg.T=T#2\u001bu.\\7v]&\u001c\u0017\r^5p]\u001a\u000b\u0017\u000e\\;sKV\u00111Q\u0004\t\u0007?\r}11E7\n\u0007\r\u0005\u0002EA\u0005Gk:\u001cG/[8ocA!1QEB\u001b\u001d\u0011\u00199c!\r\u000f\t\r%2qF\u0007\u0003\u0007WQ1a!\f\u0013\u0003\u0019a$o\\8u}%\t\u0011%C\u0002\u00044\u0001\nq\u0001]1dW\u0006<W-\u0003\u0003\u00048\re\"!C#yG\u0016\u0004H/[8o\u0015\r\u0019\u0019\u0004\t\u0005\n\u0007{\u0001\u0001\u0019!C\u0001\u0007\u007f\tqd\u001c8NsN\u000bFjQ8n[Vt\u0017nY1uS>tg)Y5mkJ,w\fJ3r)\ri7\u0011\t\u0005\nc\u000em\u0012\u0011!a\u0001\u0007;A\u0001b!\u0012\u0001A\u0003&1QD\u0001\u001d_:l\u0015pU)M\u0007>lW.\u001e8jG\u0006$\u0018n\u001c8GC&dWO]3!\u0011%\u0019I\u0005\u0001a\u0001\n\u0003\u0019Y%\u0001\bp]6K8+\u0015'D_:tWm\u0019;\u0016\u0005\r5\u0003\u0003B\u0010\u0004P5L1a!\u0015!\u0005%1UO\\2uS>t\u0007\u0007C\u0005\u0004V\u0001\u0001\r\u0011\"\u0001\u0004X\u0005\u0011rN\\'z'Fc5i\u001c8oK\u000e$x\fJ3r)\ri7\u0011\f\u0005\nc\u000eM\u0013\u0011!a\u0001\u0007\u001bB\u0001b!\u0018\u0001A\u0003&1QJ\u0001\u0010_:l\u0015pU)M\u0007>tg.Z2uA!I1\u0011\r\u0001A\u0002\u0013\u000511J\u0001\u0012_:l\u0015pU)M\t&\u001c8i\u001c8oK\u000e$\b\"CB3\u0001\u0001\u0007I\u0011AB4\u0003Uyg.T=T#2#\u0015n]\"p]:,7\r^0%KF$2!\\B5\u0011%\t81MA\u0001\u0002\u0004\u0019i\u0005\u0003\u0005\u0004n\u0001\u0001\u000b\u0015BB'\u0003Iyg.T=T#2#\u0015n]\"p]:,7\r\u001e\u0011\t\u0013\rE\u0004\u00011A\u0005\u0002\rm\u0011AI8o\u001bf\u001c\u0016\u000bT#wK:$H)Z:fe&\fG.\u001b>bi&|gNR1jYV\u0014X\rC\u0005\u0004v\u0001\u0001\r\u0011\"\u0001\u0004x\u00051sN\\'z'FcUI^3oi\u0012+7/\u001a:jC2L'0\u0019;j_:4\u0015-\u001b7ve\u0016|F%Z9\u0015\u00075\u001cI\bC\u0005r\u0007g\n\t\u00111\u0001\u0004\u001e!A1Q\u0010\u0001!B\u0013\u0019i\"A\u0012p]6K8+\u0015'Fm\u0016tG\u000fR3tKJL\u0017\r\\5{CRLwN\u001c$bS2,(/\u001a\u0011\t\u000f\r\u0005\u0005\u0001\"\u0003\u0004\u0004\u0006iqlY8o]\u0016\u001cG/T=T#2#2!\\BC\u0011!\t\u0019na A\u0002\u0005]\u0007bBBE\u0001\u0011\u000511R\u0001\u000fY>\fGmU2iK6\f\u0017J\u001c4p)\u0019\u0019ii!'\u0004\u001eB!1qRBK\u001b\t\u0019\tJC\u0002\u0004\u0014*\tQ\u0001^=qKNLAaa&\u0004\u0012\nQ1\u000b\u001e:vGR$\u0016\u0010]3\t\u0011\rm5q\u0011a\u0001\u0003/\fabY8o]\u0016\u001cG/[8o\u0013:4w\u000e\u0003\u0005\u0004 \u000e\u001d\u0005\u0019\u0001Bp\u0003\u0015!\u0018M\u00197f\u0011\u001d\u0019\u0019\u000b\u0001C\u0001\u0007K\u000bAbY8o]\u0016\u001cG/T=T#2#R!\\BT\u0007WC\u0001b!+\u0004\"\u0002\u0007\u0011q[\u0001\t?\u000e|gN\\3di\"I1QVBQ!\u0003\u0005\r!V\u0001\u0006CNLhn\u0019\u0005\b\u0007c\u0003A\u0011BBZ\u0003!!xn\u00144gg\u0016$H\u0003BA\u0016\u0007kC\u0001ba.\u00040\u0002\u0007\u0011QN\u0001\u000fe\u0006<()\u001b8m_\u001e,e/\u001a8u\u0011\u001d\u0019Y\f\u0001C\u0001\u0007{\u000b1dY8om\u0016\u0014HOU1x\u0005&tGn\\4Fm\u0016tGOU3d_J$G\u0003BB`\u0007\u0013\u0004b!a\u001a\u0004B\u000e\u0015\u0017bABbq\t!A*[:u!\r)7qY\u0005\u0003\u000b\u001aD\u0001ba.\u0004:\u0002\u0007\u0011Q\u000e\u0005\b\u0007\u001b\u0004A\u0011ABh\u0003M!(/_,ji\"|W\u000f^#yG\u0016\u0004H/[8o)\ri7\u0011\u001b\u0005\t\u0007'\u001cY\r1\u0001\u0004N\u0005\u0019a-\u001e8\t\u000f\r]\u0007\u0001\"\u0001\u0003n\u0006)\u0001/Y;tK\"911\u001c\u0001\u0005\u0002\t5\u0018A\u0002:fgVlW\rC\u0004\u0004`\u0002!\te!9\u0002\u000b\rdwn]3\u0015\u00035Dqa!:\u0001\t\u0003\u00199/\u0001\tiC:$G.Z\"p]:,7\r^5p]R\u0019Qn!;\t\u0011\r-81\u001da\u0001\u0005_\naa]8dW\u0016$\b\"CBx\u0001E\u0005I\u0011ABy\u0003Y\u0019wN\u001c8fGRl\u0015pU)MI\u0011,g-Y;mi\u0012\u0012TCABzU\r)6Q_\u0016\u0003\u0007o\u0004Ba!?\u0005\u00045\u001111 \u0006\u0005\u0007{\u001cy0A\u0005v]\u000eDWmY6fI*\u0019A\u0011\u0001\u0011\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0005\u0006\rm(!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u001e9A\u0011\u0002\u0002\t\u0002\u0011-\u0011\u0001\b\"j]2{wmU8dW\u0016$8+\u001a:wKJLe.\u0012=fGV$xN\u001d\t\u0004-\u00115aAB\u0001\u0003\u0011\u0003!ya\u0005\u0003\u0005\u000e\u0011E\u0001cA\u0010\u0005\u0014%\u0019AQ\u0003\u0011\u0003\r\u0005s\u0017PU3g\u0011\u001dIFQ\u0002C\u0001\t3!\"\u0001b\u0003\t\u0015\u0011uAQ\u0002b\u0001\n\u0003!y\"A\tG\u00132+uLT!N\u000b~su\nV0T\u000bR+\"a!2\t\u0013\u0011\rBQ\u0002Q\u0001\n\r\u0015\u0017A\u0005$J\u0019\u0016{f*Q'F?:{EkX*F)\u0002B!\u0002b\n\u0005\u000eE\u0005I\u0011\u0001C\u0015\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%kU!1\u0011\u001fC\u0016\t\u0019aBQ\u0005b\u0001;\u0001")
/* loaded from: input_file:org/apache/spark/sql/mlsql/sources/mysql/binlog/BinLogSocketServerInExecutor.class */
public class BinLogSocketServerInExecutor<T> extends SocketServerInExecutor<T> implements BinLogSocketServerSerDer, Logging {
    private final boolean isWriteAheadStorage;
    private Thread connectThread;
    private BinaryLogClient org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient;
    private String org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile;
    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition;
    private boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag;
    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition;
    private final ArrayDeque<RawBinlogEvent> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue;
    private final BinlogWriteAheadLog org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog;
    private Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern;
    private Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern;
    private long maxBinlogQueueSize;
    private MySQLConnectionInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect;
    private final ConcurrentLinkedDeque<RawBinlogEvent> aheadLogBuffer;
    private volatile boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable;
    private volatile TableInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable;
    private volatile AtomicBoolean markClose;
    private volatile AtomicBoolean markPause;
    private final ArrayBuffer<Socket> connections;
    private final AtomicLong currentQueueSize;
    private final UpdateRowsWriter updateRowsWriter;
    private final DeleteRowsWriter deleteRowsWriter;
    private final InsertRowsWriter insertRowsWriter;
    private final ConcurrentHashMap<TableInfoCacheKey, TableInfo> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache;
    private Function1<Exception, BoxedUnit> onMySQLCommunicationFailure;
    private Function0<BoxedUnit> onMySQLConnect;
    private Function0<BoxedUnit> onMySQLDisConnect;
    private Function1<Exception, BoxedUnit> onMySQLEventDeserializationFailure;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String FILE_NAME_NOT_SET() {
        return BinLogSocketServerInExecutor$.MODULE$.FILE_NAME_NOT_SET();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public Request readRequest(DataInputStream dataInputStream) {
        return BinLogSocketServerSerDer.Cclass.readRequest(this, dataInputStream);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void sendRequest(DataOutputStream dataOutputStream, Request request) {
        BinLogSocketServerSerDer.Cclass.sendRequest(this, dataOutputStream, request);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void sendResponse(DataOutputStream dataOutputStream, Response response) {
        BinLogSocketServerSerDer.Cclass.sendResponse(this, dataOutputStream, response);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public Response readResponse(DataInputStream dataInputStream) {
        return BinLogSocketServerSerDer.Cclass.readResponse(this, dataInputStream);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void sendMark(DataOutputStream dataOutputStream, int i) {
        BinLogSocketServerSerDer.Cclass.sendMark(this, dataOutputStream, i);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public void iterativeSendData(DataOutputStream dataOutputStream, Response response) {
        BinLogSocketServerSerDer.Cclass.iterativeSendData(this, dataOutputStream, response);
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer
    public Iterator<Response> readIteratedResponse(DataInputStream dataInputStream) {
        return BinLogSocketServerSerDer.Cclass.readIteratedResponse(this, dataInputStream);
    }

    private Thread connectThread() {
        return this.connectThread;
    }

    private void connectThread_$eq(Thread thread) {
        this.connectThread = thread;
    }

    public BinaryLogClient org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient_$eq(BinaryLogClient binaryLogClient) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient = binaryLogClient;
    }

    private String org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(String str) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile = str;
    }

    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(long j) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition = j;
    }

    private boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag_$eq(boolean z) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag = z;
    }

    private long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition_$eq(long j) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition = j;
    }

    public ArrayDeque<RawBinlogEvent> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue;
    }

    public BinlogWriteAheadLog org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog;
    }

    public Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern_$eq(Option<Pattern> option) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern = option;
    }

    public Option<Pattern> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern_$eq(Option<Pattern> option) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern = option;
    }

    private long maxBinlogQueueSize() {
        return this.maxBinlogQueueSize;
    }

    private void maxBinlogQueueSize_$eq(long j) {
        this.maxBinlogQueueSize = j;
    }

    public MySQLConnectionInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect;
    }

    private void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect_$eq(MySQLConnectionInfo mySQLConnectionInfo) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect = mySQLConnectionInfo;
    }

    private ConcurrentLinkedDeque<RawBinlogEvent> aheadLogBuffer() {
        return this.aheadLogBuffer;
    }

    public boolean org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable_$eq(boolean z) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable = z;
    }

    public TableInfo org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(TableInfo tableInfo) {
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable = tableInfo;
    }

    private AtomicBoolean markClose() {
        return this.markClose;
    }

    private void markClose_$eq(AtomicBoolean atomicBoolean) {
        this.markClose = atomicBoolean;
    }

    private AtomicBoolean markPause() {
        return this.markPause;
    }

    private void markPause_$eq(AtomicBoolean atomicBoolean) {
        this.markPause = atomicBoolean;
    }

    private ArrayBuffer<Socket> connections() {
        return this.connections;
    }

    private AtomicLong currentQueueSize() {
        return this.currentQueueSize;
    }

    public UpdateRowsWriter updateRowsWriter() {
        return this.updateRowsWriter;
    }

    public DeleteRowsWriter deleteRowsWriter() {
        return this.deleteRowsWriter;
    }

    public InsertRowsWriter insertRowsWriter() {
        return this.insertRowsWriter;
    }

    public boolean isClosed() {
        return markClose().get();
    }

    public void setMaxBinlogQueueSize(long j) {
        maxBinlogQueueSize_$eq(j);
    }

    public ConcurrentHashMap<TableInfoCacheKey, TableInfo> org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache() {
        return this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache;
    }

    public void assertTable() {
        if (org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() == null) {
            throw new RuntimeException("No table information is available for this event, cannot process further.");
        }
    }

    public synchronized Object flushAheadLog() {
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        RawBinlogEvent poll = aheadLogBuffer().poll();
        while (true) {
            RawBinlogEvent rawBinlogEvent = poll;
            if (rawBinlogEvent == null) {
                break;
            }
            arrayBuffer.$plus$eq(rawBinlogEvent);
            poll = aheadLogBuffer().poll();
        }
        return arrayBuffer.isEmpty() ? BoxedUnit.UNIT : org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().write(arrayBuffer);
    }

    public Object addRecord(Event event, String str, String str2) {
        assertTable();
        RawBinlogEvent rawBinlogEvent = new RawBinlogEvent(event, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable(), str, str2, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition());
        if (this.isWriteAheadStorage && aheadLogBuffer().size() >= 1000) {
            flushAheadLog();
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().cleanupOldBlocks(System.currentTimeMillis() - 3600000, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().cleanupOldBlocks$default$2());
        }
        if (this.isWriteAheadStorage) {
            BoxesRunTime.boxToBoolean(aheadLogBuffer().offer(rawBinlogEvent));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (this.isWriteAheadStorage) {
            return BoxedUnit.UNIT;
        }
        if (currentQueueSize().get() > maxBinlogQueueSize() && !markPause().get()) {
            pause();
        } else if (currentQueueSize().get() < maxBinlogQueueSize() / 2 && markPause().get()) {
            resume();
        }
        currentQueueSize().incrementAndGet();
        return BoxesRunTime.boxToBoolean(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().offer(rawBinlogEvent));
    }

    public Function1<Exception, BoxedUnit> onMySQLCommunicationFailure() {
        return this.onMySQLCommunicationFailure;
    }

    public void onMySQLCommunicationFailure_$eq(Function1<Exception, BoxedUnit> function1) {
        this.onMySQLCommunicationFailure = function1;
    }

    public Function0<BoxedUnit> onMySQLConnect() {
        return this.onMySQLConnect;
    }

    public void onMySQLConnect_$eq(Function0<BoxedUnit> function0) {
        this.onMySQLConnect = function0;
    }

    public Function0<BoxedUnit> onMySQLDisConnect() {
        return this.onMySQLDisConnect;
    }

    public void onMySQLDisConnect_$eq(Function0<BoxedUnit> function0) {
        this.onMySQLDisConnect = function0;
    }

    public Function1<Exception, BoxedUnit> onMySQLEventDeserializationFailure() {
        return this.onMySQLEventDeserializationFailure;
    }

    public void onMySQLEventDeserializationFailure_$eq(Function1<Exception, BoxedUnit> function1) {
        this.onMySQLEventDeserializationFailure = function1;
    }

    public void org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(final MySQLConnectionInfo mySQLConnectionInfo) {
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient_$eq(new BinaryLogClient(mySQLConnectionInfo.host(), mySQLConnectionInfo.port(), mySQLConnectionInfo.userName(), mySQLConnectionInfo.password()));
        Some binlogFileName = mySQLConnectionInfo.binlogFileName();
        if (binlogFileName instanceof Some) {
            String str = (String) binlogFileName.x();
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setBinlogFilename(str);
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(str);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Map map = (Map) mySQLConnectionInfo.properties().get();
        if (map.contains("heartbeatInterval")) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setHeartbeatInterval(new StringOps(Predef$.MODULE$.augmentString((String) map.apply("heartbeatInterval"))).toLong());
        }
        if (map.contains("blocking")) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setBlocking(new StringOps(Predef$.MODULE$.augmentString((String) map.apply("blocking"))).toBoolean());
        }
        if (map.contains("connectTimeout")) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setConnectTimeout(new StringOps(Predef$.MODULE$.augmentString((String) map.apply("connectTimeout"))).toLong());
        }
        if (map.contains("keepAlive")) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setKeepAlive(new StringOps(Predef$.MODULE$.augmentString((String) map.apply("keepAlive"))).toBoolean());
        }
        Some recordPos = mySQLConnectionInfo.recordPos();
        if (recordPos instanceof Some) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setBinlogPosition(BoxesRunTime.unboxToLong(recordPos.x()));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getHeartbeatInterval();
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, new EventDeserializer.CompatibilityMode[0]);
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().setEventDeserializer(eventDeserializer);
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().registerLifecycleListener(new BinaryLogClient.LifecycleListener(this) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$3
            private final /* synthetic */ BinLogSocketServerInExecutor $outer;

            public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                this.$outer.onMySQLCommunicationFailure().apply(exc);
            }

            public void onConnect(BinaryLogClient binaryLogClient) {
                this.$outer.onMySQLConnect().apply$mcV$sp();
            }

            public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                this.$outer.onMySQLEventDeserializationFailure().apply(exc);
            }

            public void onDisconnect(BinaryLogClient binaryLogClient) {
                this.$outer.onMySQLDisConnect().apply$mcV$sp();
            }

            {
                if (this == 0) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().registerEventListener(new BinaryLogClient.EventListener(this, mySQLConnectionInfo) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$4
            private final /* synthetic */ BinLogSocketServerInExecutor $outer;
            private final MySQLConnectionInfo connect$1;

            public void onEvent(Event event) {
                BoxedUnit boxedUnit5;
                BoxedUnit boxedUnit6;
                BoxedUnit boxedUnit7;
                BoxedUnit boxedUnit8;
                EventHeaderV4 header = event.getHeader();
                EventType eventType = header.getEventType();
                EventType eventType2 = EventType.ROTATE;
                if (eventType != null ? !eventType.equals(eventType2) : eventType2 != null) {
                    EventType eventType3 = EventType.FORMAT_DESCRIPTION;
                    if (eventType != null ? !eventType.equals(eventType3) : eventType3 != null) {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(header.getPosition());
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag_$eq(true);
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition_$eq(header.getNextPosition());
                    }
                }
                if (EventType.TABLE_MAP.equals(eventType)) {
                    TableMapEventData data = event.getData();
                    this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable_$eq((BoxesRunTime.unboxToBoolean(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern().map(new BinLogSocketServerInExecutor$$anon$4$$anonfun$onEvent$3(this, data)).getOrElse(new BinLogSocketServerInExecutor$$anon$4$$anonfun$onEvent$1(this))) && BoxesRunTime.unboxToBoolean(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern().map(new BinLogSocketServerInExecutor$$anon$4$$anonfun$onEvent$4(this, data)).getOrElse(new BinLogSocketServerInExecutor$$anon$4$$anonfun$onEvent$2(this)))) ? false : true);
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(null);
                        boxedUnit8 = BoxedUnit.UNIT;
                    } else {
                        TableInfoCacheKey tableInfoCacheKey = new TableInfoCacheKey(data.getDatabase(), data.getTable(), data.getTableId());
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache().get(tableInfoCacheKey));
                        if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable() == null) {
                            TableInfo tableInfo = new TableInfo(tableInfoCacheKey.databaseName(), tableInfoCacheKey.tableName(), Predef$.MODULE$.long2Long(tableInfoCacheKey.tableId()), this.$outer.loadSchemaInfo(this.connect$1, tableInfoCacheKey).json());
                            this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache().put(tableInfoCacheKey, tableInfo);
                            this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentTable_$eq(tableInfo);
                            boxedUnit8 = BoxedUnit.UNIT;
                        } else {
                            boxedUnit8 = BoxedUnit.UNIT;
                        }
                    }
                    return;
                }
                if (EventType.isWrite(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit7 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.INSERT_EVENT);
                        boxedUnit7 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (EventType.isUpdate(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit6 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.UPDATE_EVENT);
                        boxedUnit6 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (EventType.isDelete(eventType)) {
                    if (this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable()) {
                        boxedUnit5 = BoxedUnit.UNIT;
                    } else {
                        this.$outer.addRecord(event, this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().getBinlogFilename(), EventInfo.DELETE_EVENT);
                        boxedUnit5 = BoxedUnit.UNIT;
                    }
                    return;
                }
                if (!EventType.ROTATE.equals(eventType)) {
                    BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                    return;
                }
                RotateEventData data2 = event.getData();
                this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile_$eq(data2.getBinlogFilename());
                this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition_$eq(data2.getBinlogPosition());
                BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
            }

            /* JADX WARN: Multi-variable type inference failed */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.connect$1 = mySQLConnectionInfo;
            }
        });
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().connect();
    }

    public StructType loadSchemaInfo(MySQLConnectionInfo mySQLConnectionInfo, TableInfoCacheKey tableInfoCacheKey) {
        return JDBCRDD$.MODULE$.resolveTable(new JDBCOptions(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("url"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"jdbc:mysql://", ":", "?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&tinyInt1isBit=false"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{mySQLConnectionInfo.host(), BoxesRunTime.boxToInteger(mySQLConnectionInfo.port())}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), mySQLConnectionInfo.userName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("password"), mySQLConnectionInfo.password()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("dbtable"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{tableInfoCacheKey.databaseName(), tableInfoCacheKey.tableName()}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("driver"), "com.mysql.jdbc.Driver")}))));
    }

    public void connectMySQL(MySQLConnectionInfo mySQLConnectionInfo, boolean z) {
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect_$eq(mySQLConnectionInfo);
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern_$eq(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().databaseNamePattern().map(new BinLogSocketServerInExecutor$$anonfun$connectMySQL$1(this)));
        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern_$eq(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().tableNamePattern().map(new BinLogSocketServerInExecutor$$anonfun$connectMySQL$2(this)));
        if (!z) {
            org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect());
        } else {
            connectThread_$eq(new Thread(this) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$1
                private final /* synthetic */ BinLogSocketServerInExecutor $outer;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect());
                    } catch (Exception e) {
                        throw e;
                    }
                }

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"connect mysql(", ", ", ") "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().host(), BoxesRunTime.boxToInteger(this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().port())})));
                    if (this == 0) {
                        throw null;
                    }
                    this.$outer = this;
                    setDaemon(true);
                }
            });
            connectThread().start();
        }
    }

    public boolean connectMySQL$default$2() {
        return true;
    }

    public long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(RawBinlogEvent rawBinlogEvent) {
        return BinlogOffset$.MODULE$.fromFileAndPos(rawBinlogEvent.getBinlogFilename(), Predef$.MODULE$.Long2long(rawBinlogEvent.getPos())).offset();
    }

    public List<String> convertRawBinlogEventRecord(RawBinlogEvent rawBinlogEvent) {
        List<String> arrayList;
        AbstractEventWriter deleteRowsWriter;
        String eventType = rawBinlogEvent.getEventType();
        try {
            if ("insert".equals(eventType)) {
                deleteRowsWriter = insertRowsWriter();
            } else if ("update".equals(eventType)) {
                deleteRowsWriter = updateRowsWriter();
            } else {
                if (!"delete".equals(eventType)) {
                    throw new MatchError(eventType);
                }
                deleteRowsWriter = deleteRowsWriter();
            }
            arrayList = deleteRowsWriter.writeEvent(rawBinlogEvent);
        } catch (Exception e) {
            logError(new BinLogSocketServerInExecutor$$anonfun$5(this), e);
            arrayList = new ArrayList<>();
        }
        return arrayList;
    }

    public void tryWithoutException(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Exception e) {
        }
    }

    public void pause() {
        if (!markPause().compareAndSet(false, true) || org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() == null) {
            return;
        }
        tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$pause$1(this));
    }

    public void resume() {
        if (markPause().compareAndSet(true, false)) {
            connectThread_$eq(new Thread(this) { // from class: org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$2
                private final /* synthetic */ BinLogSocketServerInExecutor $outer;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient().connect();
                    } catch (Exception e) {
                        throw e;
                    }
                }

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"connect mysql(", ", ", ") "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().host(), BoxesRunTime.boxToInteger(this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect().port())})));
                    if (this == 0) {
                        throw null;
                    }
                    this.$outer = this;
                    setDaemon(true);
                }
            });
            connectThread().start();
        }
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor
    public void close() {
        if (markClose().compareAndSet(false, true)) {
            logInfo(new BinLogSocketServerInExecutor$$anonfun$close$5(this));
            connections().foreach(new BinLogSocketServerInExecutor$$anonfun$close$6(this));
            connections().clear();
            if (org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient() != null) {
                tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$1(this));
            }
            if (server() != null) {
                tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$2(this));
            }
            tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$3(this));
            tryWithoutException(new BinLogSocketServerInExecutor$$anonfun$close$4(this));
        }
    }

    @Override // org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor
    public void handleConnection(Socket socket) {
        int i;
        BoxedUnit boxedUnit;
        connections().$plus$eq(socket);
        socket.setKeepAlive(true);
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        while (true) {
            Request readRequest = readRequest(dataInputStream);
            if (readRequest instanceof ShutdownBinlogServer) {
                close();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else if (readRequest instanceof RequestQueueSize) {
                sendResponse(dataOutputStream, new QueueSizeResponse(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().size()));
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else if (readRequest instanceof RequestOffset) {
                int i2 = 5;
                while (true) {
                    i = i2;
                    if (org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag() || i <= 0) {
                        break;
                    }
                    Thread.sleep(1000L);
                    i2 = i - 1;
                }
                long org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition = org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition() == 4 ? org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition() : org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition();
                if (i <= 0) {
                    logInfo(new BinLogSocketServerInExecutor$$anonfun$handleConnection$1(this, org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition));
                }
                flushAheadLog();
                sendResponse(dataOutputStream, new OffsetResponse(BinlogOffset$.MODULE$.fromFileAndPos(org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile(), org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition).offset()));
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            } else {
                if (!(readRequest instanceof RequestData)) {
                    throw new MatchError(readRequest);
                }
                RequestData requestData = (RequestData) readRequest;
                long startOffset = requestData.startOffset();
                long endOffset = requestData.endOffset();
                try {
                    if (this.isWriteAheadStorage) {
                        sendMark(dataOutputStream, SocketReplyMark$.MODULE$.HEAD());
                        org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog().read(new BinLogSocketServerInExecutor$$anonfun$handleConnection$2(this, dataOutputStream, startOffset, endOffset));
                        sendMark(dataOutputStream, SocketReplyMark$.MODULE$.END());
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        sendMark(dataOutputStream, SocketReplyMark$.MODULE$.HEAD());
                        RawBinlogEvent poll = org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().poll();
                        while (poll != null && org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(poll) >= startOffset && org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$toOffset(poll) < endOffset) {
                            iterativeSendData(dataOutputStream, new DataResponse(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(convertRawBinlogEventRecord(poll)).asScala()).toList()));
                            poll = org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue().poll();
                            currentQueueSize().decrementAndGet();
                        }
                        sendMark(dataOutputStream, SocketReplyMark$.MODULE$.END());
                        boxedUnit = BoxedUnit.UNIT;
                    }
                } catch (Exception e) {
                    logError(new BinLogSocketServerInExecutor$$anonfun$handleConnection$3(this), e);
                    boxedUnit = BoxedUnit.UNIT;
                }
                logError(new BinLogSocketServerInExecutor$$anonfun$handleConnection$3(this), e);
                boxedUnit = BoxedUnit.UNIT;
            }
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BinLogSocketServerInExecutor(AtomicReference<T> atomicReference, String str, String str2, Configuration configuration, boolean z) {
        super(atomicReference, "binlog-socket-server-in-executor");
        this.isWriteAheadStorage = z;
        BinLogSocketServerSerDer.Cclass.$init$(this);
        Logging.class.$init$(this);
        this.connectThread = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$binaryLogClient = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogFile = null;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPosition = 4L;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$currentBinlogPositionConsumeFlag = false;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$nextBinlogPosition = 4L;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$queue = new ArrayDeque<>();
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        BinlogWriteAheadLog binlogWriteAheadLog = new BinlogWriteAheadLog(UUID.randomUUID().toString(), sparkEnv.serializerManager(), sparkEnv.conf(), configuration, str, BinlogWriteAheadLog$.MODULE$.$lessinit$greater$default$6());
        binlogWriteAheadLog.cleanupOldBlocks(System.currentTimeMillis(), true);
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$writeAheadLog = binlogWriteAheadLog;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$databaseNamePattern = None$.MODULE$;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableNamePattern = None$.MODULE$;
        this.maxBinlogQueueSize = 0L;
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$connect = null;
        this.aheadLogBuffer = new ConcurrentLinkedDeque<>();
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$skipTable = false;
        this.markClose = new AtomicBoolean(false);
        this.markPause = new AtomicBoolean(false);
        this.connections = new ArrayBuffer<>();
        this.currentQueueSize = new AtomicLong(0L);
        this.updateRowsWriter = new UpdateRowsWriter(str2, configuration);
        this.deleteRowsWriter = new DeleteRowsWriter(str2, configuration);
        this.insertRowsWriter = new InsertRowsWriter(str2, configuration);
        this.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$tableInfoCache = new ConcurrentHashMap<>();
        this.onMySQLCommunicationFailure = new BinLogSocketServerInExecutor$$anonfun$3(this);
        this.onMySQLConnect = new BinLogSocketServerInExecutor$$anonfun$1(this);
        this.onMySQLDisConnect = new BinLogSocketServerInExecutor$$anonfun$2(this);
        this.onMySQLEventDeserializationFailure = new BinLogSocketServerInExecutor$$anonfun$4(this);
    }
}
