package jp.ad.sinet.stream.api;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.jdeferred2.impl.DefaultDeferredManager;

/* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamAsyncWrapperMessageReader.class */
public class SinetStreamAsyncWrapperMessageReader<T> implements AsyncMessageReader<T> {
    private final MessageReader<T> syncReader;
    private List<Consumer<Message<T>>> onMessageCallbacks = new CopyOnWriteArrayList();
    private List<Consumer<Throwable>> onFailureCallbacks = new CopyOnWriteArrayList();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final DefaultDeferredManager manager = new DefaultDeferredManager(Executors.newSingleThreadExecutor());

    public SinetStreamAsyncWrapperMessageReader(MessageReader<T> messageReader) {
        this.syncReader = messageReader;
        this.manager.when(() -> {
            while (true) {
                try {
                    Message<T> read = this.syncReader.read();
                    if (!Objects.nonNull(read)) {
                        return;
                    }
                    Iterator<Consumer<Message<T>>> it = this.onMessageCallbacks.iterator();
                    while (it.hasNext()) {
                        it.next().accept(read);
                    }
                } catch (Throwable th) {
                    Iterator<Consumer<Throwable>> it2 = this.onFailureCallbacks.iterator();
                    while (it2.hasNext()) {
                        it2.next().accept(th);
                    }
                    return;
                }
            }
        });
    }

    @Override // jp.ad.sinet.stream.api.MessageIO, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.syncReader.close();
        this.manager.shutdown();
    }

    @Override // jp.ad.sinet.stream.api.AsyncMessageReader
    public void addOnMessageCallback(Consumer<Message<T>> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.add(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.add(consumer2);
        }
    }

    @Override // jp.ad.sinet.stream.api.AsyncMessageReader
    public void removeOnMessageCallback(Consumer<Message<T>> consumer, Consumer<Throwable> consumer2) {
        if (Objects.nonNull(consumer)) {
            this.onMessageCallbacks.remove(consumer);
        }
        if (Objects.nonNull(consumer2)) {
            this.onFailureCallbacks.remove(consumer2);
        }
    }

    @Override // jp.ad.sinet.stream.api.AsyncMessageReader
    public void clearOnMessageCallback() {
        this.onMessageCallbacks.clear();
        this.onFailureCallbacks.clear();
    }

    @Override // jp.ad.sinet.stream.api.AsyncMessageReader
    public List<String> getTopics() {
        return this.syncReader.getTopics();
    }

    @Override // jp.ad.sinet.stream.api.AsyncMessageReader
    public Deserializer<T> getDeserializer() {
        return this.syncReader.getDeserializer();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public String getService() {
        return this.syncReader.getService();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public String getTopic() {
        return this.syncReader.getTopic();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public Consistency getConsistency() {
        return this.syncReader.getConsistency();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public String getClientId() {
        return this.syncReader.getClientId();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public Map<String, Object> getConfig() {
        return this.syncReader.getConfig();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public ValueType getValueType() {
        return this.syncReader.getValueType();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public boolean isDataEncryption() {
        return this.syncReader.isDataEncryption();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public Metrics getMetrics() {
        return this.syncReader.getMetrics();
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public void resetMetrics(boolean z) {
        this.syncReader.resetMetrics(z);
    }

    @Override // jp.ad.sinet.stream.api.MessageIO
    public Object getInfo(String str) {
        return this.syncReader.getInfo(str);
    }
}
