package jp.ad.sinet.stream.plugins.mqttv5;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.SinetStreamIOException;
import jp.ad.sinet.stream.spi.PluginAsyncMessageReader;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import lombok.Generated;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.common.MqttException;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/mqttv5/Mqttv5AsyncMessageReader.class */
public class Mqttv5AsyncMessageReader extends Mqttv5AsyncBaseIO implements PluginAsyncMessageReader, Mqttv5Reader {

    @Generated
    private static final Logger log = Logger.getLogger(Mqttv5AsyncMessageReader.class.getName());
    private final List<String> topics;
    private final Duration receiveTimeout;
    private final ExecutorService executor;
    private List<Consumer<PluginMessageWrapper>> onMessageCallbacks;
    private List<Consumer<Throwable>> onFailureCallbacks;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mqttv5AsyncMessageReader(ReaderParameters readerParameters) {
        super(readerParameters.getService(), readerParameters.getConsistency(), readerParameters.getClientId(), readerParameters.getConfig(), readerParameters.getValueType(), readerParameters.isDataEncryption());
        this.onMessageCallbacks = new CopyOnWriteArrayList();
        this.onFailureCallbacks = new CopyOnWriteArrayList();
        this.topics = Collections.unmodifiableList(readerParameters.getTopics());
        this.receiveTimeout = readerParameters.getReceiveTimeout();
        this.executor = Executors.newSingleThreadExecutor();
        ((MqttAsyncClient) this.client).setCallback(new Mqttv5ReaderCallback(this));
        connect();
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.Mqttv5AsyncBaseIO, jp.ad.sinet.stream.plugins.mqttv5.AbstractMqttv5IO
    protected void doClose() {
        super.doClose();
        this.executor.shutdown();
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.Mqttv5Reader
    public void onMessageArrived(SinetMqttv5Message sinetMqttv5Message) {
        for (Consumer<PluginMessageWrapper> consumer : this.onMessageCallbacks) {
            this.executor.submit(() -> {
                consumer.accept(sinetMqttv5Message);
            });
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.Mqttv5Reader
    public void onConnectionLost(Throwable th) {
        for (Consumer<Throwable> consumer : this.onFailureCallbacks) {
            this.executor.submit(() -> {
                consumer.accept(th);
            });
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.AbstractMqttv5IO, jp.ad.sinet.stream.plugins.mqttv5.Mqttv5Reader
    public void connect() {
        super.connect();
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.Mqttv5Reader
    public void subscribe() {
        try {
            int[] iArr = new int[this.topics.size()];
            Arrays.fill(iArr, getConsistency().getQos());
            log.fine(() -> {
                return "MQTT subscribe: " + getClientId() + ": " + getTopic();
            });
            ((MqttAsyncClient) this.client).subscribe((String[]) this.topics.toArray(new String[0]), iArr).waitForCompletion();
        } catch (MqttException e) {
            throw new SinetStreamIOException(e);
        }
    }

    @Override // jp.ad.sinet.stream.plugins.mqttv5.Mqttv5Reader
    public void disconnect() throws MqttException {
        ((MqttAsyncClient) this.client).disconnect().waitForCompletion();
    }

    public String getTopic() {
        return String.join(",", this.topics);
    }

    public void addOnMessageCallback(Consumer<PluginMessageWrapper> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.add(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.add(consumer2);
        }
    }

    public void removeOnMessageCallback(Consumer<PluginMessageWrapper> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.remove(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.remove(consumer2);
        }
    }

    public void clearOnMessageCallback() {
        this.onMessageCallbacks.clear();
        this.onFailureCallbacks.clear();
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }
}
