package org.walkmod.nsq;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.walkmod.nsq.exceptions.NSQException;
import org.walkmod.nsq.lookupd.AbstractLookupd;
import org.walkmod.nsq.lookupd.BasicLookupdJob;
import org.walkmod.nsq.util.ConnectionUtils;

/* loaded from: input_file:org/walkmod/nsq/NSQReader.class */
public abstract class NSQReader {
    protected int requeueDelay;
    protected int maxRetries;
    protected int maxInFlight;
    protected String topic;
    protected String channel;
    protected String shortHostname;
    protected String hostname;
    protected ExecutorService executor;
    protected Class<? extends Connection> connClass;
    protected ConcurrentHashMap<String, Connection> connections;
    protected ConcurrentHashMap<String, AbstractLookupd> lookupdConnections;
    private ScheduledExecutorService lookupdScheduler;
    private static final Logger log = LoggerFactory.getLogger(NSQReader.class);
    public static final ConcurrentHashMap<String, NSQReader> readerIndex = new ConcurrentHashMap<>();

    public void init(String str, String str2) {
        this.requeueDelay = 50;
        this.maxRetries = 2;
        this.maxInFlight = 1;
        this.executor = Executors.newSingleThreadExecutor();
        this.connections = new ConcurrentHashMap<>();
        this.topic = str;
        this.channel = str2;
        try {
            this.hostname = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            this.hostname = "unknown.host";
        }
        this.shortHostname = this.hostname.split("\\.")[0];
        this.connClass = BasicConnection.class;
        this.lookupdConnections = new ConcurrentHashMap<>();
        this.lookupdScheduler = Executors.newScheduledThreadPool(1);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.walkmod.nsq.NSQReader.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                NSQReader.this.shutdown();
            }
        });
        readerIndex.put(toString(), this);
    }

    public void shutdown() {
        log.info("NSQReader received shutdown signal, shutting down connections");
        Iterator<Connection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.executor.shutdown();
        this.lookupdScheduler.shutdown();
    }

    protected abstract Runnable makeRunnableFromMessage(Message message);

    public void addMessageForProcessing(Message message) {
        this.executor.execute(makeRunnableFromMessage(message));
    }

    public void requeueMessage(Message message, boolean z) {
        if (message.getAttempts() > this.maxRetries) {
            finishMessage(message);
            return;
        }
        try {
            message.getConn().send(ConnectionUtils.requeue(message.getId(), z ? 0 : this.requeueDelay * message.getAttempts()));
        } catch (NSQException e) {
            log.error("Error requeueing message to {}, will close the connection", message.getConn());
            message.getConn().close();
        }
    }

    public void finishMessage(Message message) {
        try {
            message.getConn().send(ConnectionUtils.finish(message.getId()));
        } catch (NSQException e) {
            log.error("Error finishing message {} (from {}). Will close connection.", message, message.getConn());
            message.getConn().close();
        }
    }

    public void connectToNsqd(String str, int i) throws NSQException {
        try {
            Connection newInstance = this.connClass.newInstance();
            newInstance.init(str, i, this);
            if (this.connections.putIfAbsent(newInstance.toString(), newInstance) != null) {
                return;
            }
            newInstance.connect();
            Iterator<Connection> it = this.connections.values().iterator();
            while (it.hasNext()) {
                it.next().maxInFlight = (int) Math.ceil(this.maxInFlight / this.connections.size());
            }
            newInstance.send(ConnectionUtils.subscribe(this.topic, this.channel, this.shortHostname, this.hostname));
            newInstance.send(ConnectionUtils.ready(newInstance.maxInFlight));
            newInstance.readForever();
        } catch (IllegalAccessException e) {
            throw new NSQException("Connection implementation's default constructor must be visible");
        } catch (InstantiationException e2) {
            throw new NSQException("Connection implementation must have a default constructor");
        }
    }

    public void addLookupd(AbstractLookupd abstractLookupd) {
        String addr = abstractLookupd.getAddr();
        if (this.lookupdConnections.putIfAbsent(addr, abstractLookupd) != null) {
            return;
        }
        this.lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), 30L, 30L, TimeUnit.SECONDS);
    }

    public String toString() {
        return "Reader<" + this.topic + ", " + this.channel + ">";
    }

    public String getTopic() {
        return this.topic;
    }

    public ConcurrentHashMap<String, AbstractLookupd> getLookupdConnections() {
        return this.lookupdConnections;
    }
}
