package net.scattersphere.server.handler.stream;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import net.scattersphere.data.DataSerializer;
import net.scattersphere.job.stream.StreamRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.net.NetSocket;

/* loaded from: input_file:net/scattersphere/server/handler/stream/SubscribedStreamWriter.class */
public class SubscribedStreamWriter implements Runnable {
    private final NetSocket endpoint;
    private final String streamId;
    private final BlockingQueue<byte[]> stream;
    private final Logger LOG = LoggerFactory.getLogger((Class<?>) SubscribedStreamWriter.class);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);

    public SubscribedStreamWriter(NetSocket netSocket, String str) {
        this.endpoint = netSocket;
        this.streamId = str;
        this.stream = StreamRegistry.instance().getStream(str);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.isRunning.set(true);
        while (true) {
            if (!this.isRunning.get()) {
                break;
            }
            byte[] bArr = null;
            if (this.endpoint != null) {
                try {
                    bArr = this.stream.take();
                    StreamRegistry.instance().decrementStreamSize(this.streamId);
                } catch (InterruptedException e) {
                    this.LOG.info("Stream {} interrupted, closing connection to {}", this.streamId, this.endpoint.toString());
                } catch (NullPointerException e2) {
                    this.LOG.info("Stream {} empty, closing connection to {}", this.streamId, this.endpoint.toString());
                    this.endpoint.close();
                }
                Buffer buffer = new Buffer(DataSerializer.packetize(bArr));
                this.LOG.info("Writing packet: destination={} length={} status={} size={}", this.endpoint.toString(), Integer.valueOf(bArr.length), Boolean.valueOf(StreamRegistry.instance().isClosed(this.streamId)), Integer.valueOf(StreamRegistry.instance().getSize(this.streamId)));
                this.endpoint.write(buffer);
                if (StreamRegistry.instance().isClosed(this.streamId) && StreamRegistry.instance().isEmpty(this.streamId)) {
                    this.endpoint.close();
                    this.LOG.info("Stream closed to {}", this.endpoint.toString());
                    break;
                }
            } else {
                this.LOG.info("Stream {} endpoint is null", this.streamId);
                break;
            }
        }
        if (this.isRunning.get()) {
            this.LOG.info("Thread shutdown normally.");
        } else {
            this.LOG.info("Thread shutdown due to disconnect or error.");
        }
    }

    public void stop() {
        this.isRunning.set(false);
    }
}
