package net.scattersphere.server.handler.stream;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.scattersphere.data.DataSerializer;
import net.scattersphere.job.stream.StreamRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.net.NetSocket;

/* loaded from: input_file:net/scattersphere/server/handler/stream/SubscribedStreamWriter.class */
public class SubscribedStreamWriter implements Runnable {
    private final NetSocket endpoint;
    private final String streamId;
    private BlockingQueue<byte[]> stream;
    private final Logger LOG = LoggerFactory.getLogger((Class<?>) SubscribedStreamWriter.class);
    private final AtomicInteger bufferPosition = new AtomicInteger(0);

    public SubscribedStreamWriter(NetSocket netSocket, String str) {
        this.endpoint = netSocket;
        this.streamId = str;
        this.stream = StreamRegistry.instance().getStream(str);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            byte[] bArr = null;
            try {
                bArr = this.stream.take();
                this.bufferPosition.incrementAndGet();
            } catch (InterruptedException e) {
                this.LOG.info("Stream {} interrupted, closing connection to {}", this.streamId, this.endpoint.toString());
            }
            Buffer buffer = new Buffer(DataSerializer.packetize(bArr));
            this.LOG.info("Writing packet: destination={} length={}", this.endpoint.toString(), Integer.valueOf(bArr.length));
            this.endpoint.write(buffer);
            if (StreamRegistry.instance().isClosed(this.streamId) && this.bufferPosition.get() == StreamRegistry.instance().getSize(this.streamId)) {
                this.endpoint.close();
                this.LOG.info("Stream closed to {}", this.endpoint.toString());
                return;
            }
        }
    }
}
