package io.questdb.cutlass.line.udp;

import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cutlass.line.CairoLineProtoParser;
import io.questdb.cutlass.line.LineProtoLexer;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.Job;
import io.questdb.network.Net;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Misc;
import java.io.Closeable;

/* loaded from: input_file:io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.class */
public class LinuxLineProtoReceiver implements Closeable, Job {
    private static final Log LOG = LogFactory.getLog(LinuxLineProtoReceiver.class);
    private final int msgCount;
    private final LineProtoLexer lexer;
    private final CairoLineProtoParser parser;
    private final NetworkFacade nf;
    private long fd;
    private long msgVec;
    private int commitRate;
    private long totalCount = 0;

    public LinuxLineProtoReceiver(LineUdpReceiverConfiguration lineUdpReceiverConfiguration, CairoEngine cairoEngine, CairoSecurityContext cairoSecurityContext) {
        this.nf = lineUdpReceiverConfiguration.getNetworkFacade();
        this.fd = this.nf.socketUdp();
        if (this.fd < 0) {
            int errno = this.nf.errno();
            LOG.error().$((CharSequence) "cannot open UDP socket [errno=").$(errno).$(']').$();
            throw CairoException.instance(errno).put("Cannot open UDP socket");
        }
        try {
            if (!this.nf.bindUdp(this.fd, 0, lineUdpReceiverConfiguration.getPort())) {
                LOG.error().$((CharSequence) "cannot bind socket [errno=").$(this.nf.errno()).$((CharSequence) ", fd=").$(this.fd).$((CharSequence) ", bind=").$(lineUdpReceiverConfiguration.getBindIPv4Address()).$((CharSequence) ", port=").$(lineUdpReceiverConfiguration.getPort()).$(']').$();
                throw CairoException.instance(this.nf.errno()).put("Cannot bind to ").put(lineUdpReceiverConfiguration.getBindIPv4Address()).put(':').put(lineUdpReceiverConfiguration.getPort());
            }
            if (!this.nf.join(this.fd, lineUdpReceiverConfiguration.getBindIPv4Address(), lineUdpReceiverConfiguration.getGroupIPv4Address())) {
                LOG.error().$((CharSequence) "cannot join group [errno=").$(this.nf.errno()).$((CharSequence) ", fd=").$(this.fd).$((CharSequence) ", bind=").$(lineUdpReceiverConfiguration.getBindIPv4Address()).$((CharSequence) ", group=").$(lineUdpReceiverConfiguration.getGroupIPv4Address()).$(']').$();
                throw CairoException.instance(this.nf.errno()).put("Cannot join group ").put(lineUdpReceiverConfiguration.getGroupIPv4Address()).put(" [bindTo=").put(lineUdpReceiverConfiguration.getBindIPv4Address()).put(']');
            }
            this.commitRate = lineUdpReceiverConfiguration.getCommitRate();
            this.msgCount = lineUdpReceiverConfiguration.getMsgCount();
            if (lineUdpReceiverConfiguration.getReceiveBufferSize() != -1 && this.nf.setRcvBuf(this.fd, lineUdpReceiverConfiguration.getReceiveBufferSize()) != 0) {
                LOG.error().$((CharSequence) "cannot set receive buffer size [fd=").$(this.fd).$((CharSequence) ", size=").$(lineUdpReceiverConfiguration.getReceiveBufferSize()).$(']').$();
            }
            this.msgVec = this.nf.msgHeaders(lineUdpReceiverConfiguration.getMsgBufferSize(), this.msgCount);
            this.lexer = new LineProtoLexer(lineUdpReceiverConfiguration.getMsgBufferSize());
            this.parser = new CairoLineProtoParser(cairoEngine, cairoSecurityContext);
            this.lexer.withParser(this.parser);
            LOG.info().$((CharSequence) "started [fd=").$(this.fd).$((CharSequence) ", bind=").$(lineUdpReceiverConfiguration.getBindIPv4Address()).$((CharSequence) ", group=").$(lineUdpReceiverConfiguration.getGroupIPv4Address()).$((CharSequence) ", port=").$(lineUdpReceiverConfiguration.getPort()).$((CharSequence) ", batch=").$(this.msgCount).$((CharSequence) ", commitRate=").$(this.commitRate).$(']').$();
        } catch (CairoException e) {
            close();
            throw e;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.fd > -1) {
            if (this.nf.close(this.fd) != 0) {
                LOG.error().$((CharSequence) "failed to close [fd=").$(this.fd).$((CharSequence) ", errno=").$(this.nf.errno()).$(']').$();
            } else {
                LOG.info().$((CharSequence) "closed [fd=").$(this.fd).$(']').$();
            }
            if (this.msgVec != 0) {
                this.nf.freeMsgHeaders(this.msgVec);
            }
            if (this.parser != null) {
                this.parser.commitAll();
                this.parser.close();
            }
            Misc.free(this.lexer);
            LOG.info().$((CharSequence) "closed [fd=").$(this.fd).$(']').$();
            this.fd = -1L;
        }
    }

    @Override // io.questdb.mp.Job
    public boolean run() {
        boolean z = false;
        while (true) {
            int recvmmsg = this.nf.recvmmsg(this.fd, this.msgVec, this.msgCount);
            if (recvmmsg <= 0) {
                this.parser.commitAll();
                return z;
            }
            long j = this.msgVec;
            for (int i = 0; i < recvmmsg; i++) {
                long mMsgBuf = this.nf.getMMsgBuf(j);
                this.lexer.parse(mMsgBuf, mMsgBuf + this.nf.getMMsgBufLen(j));
                this.lexer.parseLast();
                j += Net.MMSGHDR_SIZE;
            }
            this.totalCount += recvmmsg;
            if (this.totalCount > this.commitRate) {
                this.totalCount = 0L;
                this.parser.commitAll();
            }
            if (!z) {
                z = true;
            }
        }
    }
}
