package io.questdb.cutlass.line.tcp;

import io.questdb.Metrics;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.Job;
import io.questdb.mp.RingQueue;
import io.questdb.mp.Sequence;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.Path;
import java.io.Closeable;

/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpWriterJob.class */
class LineTcpWriterJob implements Job, Closeable {
    private static final Log LOG;
    private final int workerId;
    private final RingQueue<LineTcpMeasurementEvent> queue;
    private final Sequence sequence;
    private final Path path = new Path();
    private final ObjList<TableUpdateDetails> assignedTables = new ObjList<>();
    private final MillisecondClock millisecondClock;
    private final long commitIntervalDefault;
    private final LineTcpMeasurementScheduler scheduler;
    private long nextCommitTime;
    private final Metrics metrics;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpWriterJob(int i, RingQueue<LineTcpMeasurementEvent> ringQueue, Sequence sequence, MillisecondClock millisecondClock, long j, LineTcpMeasurementScheduler lineTcpMeasurementScheduler, Metrics metrics) {
        this.workerId = i;
        this.queue = ringQueue;
        this.sequence = sequence;
        this.millisecondClock = millisecondClock;
        this.commitIntervalDefault = j;
        this.nextCommitTime = millisecondClock.getTicks();
        this.scheduler = lineTcpMeasurementScheduler;
        this.metrics = metrics;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info().$((CharSequence) "line protocol writer closing [threadId=").$(this.workerId).$(']').$();
        for (int i = 0; i < this.queue.getCycle() && run(this.workerId); i++) {
        }
        Misc.free(this.path);
        Misc.freeObjList(this.assignedTables);
        this.assignedTables.clear();
    }

    @Override // io.questdb.mp.Job
    public boolean run(int i) {
        if (!$assertionsDisabled && this.workerId != i) {
            throw new AssertionError();
        }
        boolean drainQueue = drainQueue();
        if (!drainQueue) {
            commitTables();
            tickWriters();
        }
        return drainQueue;
    }

    private void commitTables() {
        long ticks = this.millisecondClock.getTicks();
        if (ticks > this.nextCommitTime) {
            long j = Long.MAX_VALUE;
            int size = this.assignedTables.size();
            for (int i = 0; i < size; i++) {
                try {
                    long commitIfIntervalElapsed = this.assignedTables.getQuick(i).commitIfIntervalElapsed(ticks);
                    if (commitIfIntervalElapsed < j) {
                        j = commitIfIntervalElapsed;
                    }
                } catch (Throwable th) {
                    LOG.critical().$((CharSequence) "commit failed [table=").$((CharSequence) this.assignedTables.getQuick(i).getTableNameUtf16()).$((CharSequence) ",ex=").$(th).I$();
                    this.metrics.healthCheck().incrementUnhandledErrors();
                }
            }
            this.nextCommitTime = j != Long.MAX_VALUE ? j : ticks + this.commitIntervalDefault;
        }
    }

    private boolean drainQueue() {
        boolean z = false;
        while (true) {
            long next = this.sequence.next();
            if (next >= 0) {
                z = true;
                LineTcpMeasurementEvent lineTcpMeasurementEvent = this.queue.get(next);
                try {
                    TableUpdateDetails tableUpdateDetails = lineTcpMeasurementEvent.getTableUpdateDetails();
                    boolean z2 = false;
                    if (lineTcpMeasurementEvent.getWriterWorkerId() == this.workerId) {
                        try {
                            if (tableUpdateDetails.isWriterInError()) {
                                z2 = true;
                            } else {
                                if (!tableUpdateDetails.isAssignedToJob()) {
                                    this.assignedTables.add(tableUpdateDetails);
                                    tableUpdateDetails.setAssignedToJob(true);
                                    this.nextCommitTime = this.millisecondClock.getTicks();
                                    LOG.info().$((CharSequence) "assigned table to writer thread [tableName=").$((CharSequence) tableUpdateDetails.getTableNameUtf16()).$((CharSequence) ", threadId=").$(this.workerId).I$();
                                }
                                lineTcpMeasurementEvent.append();
                            }
                        } catch (Throwable th) {
                            tableUpdateDetails.setWriterInError();
                            LOG.critical().$((CharSequence) "closing writer because of error [table=").$((CharSequence) tableUpdateDetails.getTableNameUtf16()).$((CharSequence) ",ex=").$(th).I$();
                            this.metrics.healthCheck().incrementUnhandledErrors();
                            z2 = true;
                            lineTcpMeasurementEvent.createWriterReleaseEvent(tableUpdateDetails, false);
                        }
                    } else if (lineTcpMeasurementEvent.getWriterWorkerId() == -3) {
                        z2 = true;
                    }
                    if (z2 && tableUpdateDetails.getWriter() != null) {
                        this.scheduler.processWriterReleaseEvent(lineTcpMeasurementEvent, this.workerId);
                        this.assignedTables.remove(tableUpdateDetails);
                        tableUpdateDetails.setAssignedToJob(false);
                        this.nextCommitTime = this.millisecondClock.getTicks();
                    }
                } catch (Throwable th2) {
                    LOG.error().$((CharSequence) "failed to process ILP event because of exception [ex=").$(th2).I$();
                }
                this.sequence.done(next);
            } else if (next == -1) {
                return z;
            }
        }
    }

    private void tickWriters() {
        int size = this.assignedTables.size();
        for (int i = 0; i < size; i++) {
            this.assignedTables.getQuick(i).tick();
        }
    }

    static {
        $assertionsDisabled = !LineTcpWriterJob.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(LineTcpWriterJob.class);
    }
}
