package io.nosqlbench.activitytype.tcpserver;

import io.nosqlbench.activitytype.stdout.StdoutActivity;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.util.SSLKsFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/nosqlbench/activitytype/tcpserver/TCPServerActivity.class */
public class TCPServerActivity extends StdoutActivity {
    private static final Logger logger = LoggerFactory.getLogger(TCPServerActivity.class);
    private final ServerSocketFactory socketFactory;
    private LinkedBlockingQueue<String> queue;
    private ServerSocket listenerSocket;
    private List<Shutdown> managedShutdown;
    private int capacity;

    /* loaded from: input_file:io/nosqlbench/activitytype/tcpserver/TCPServerActivity$QueueWriterAdapter.class */
    public static class QueueWriterAdapter extends Writer {
        private BlockingQueue<String> queue;

        public QueueWriterAdapter(BlockingQueue<String> blockingQueue) {
            this.queue = blockingQueue;
        }

        @Override // java.io.Writer
        public synchronized void write(char[] cArr, int i, int i2) {
            while (true) {
                try {
                    this.queue.put(new String(cArr, i, i2));
                    return;
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
        }

        @Override // java.io.Writer, java.io.Flushable
        public synchronized void flush() throws IOException {
        }

        @Override // java.io.Writer, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() throws IOException {
            flush();
            this.queue = null;
        }
    }

    /* loaded from: input_file:io/nosqlbench/activitytype/tcpserver/TCPServerActivity$Shutdown.class */
    private interface Shutdown {
        void shutdown();
    }

    /* loaded from: input_file:io/nosqlbench/activitytype/tcpserver/TCPServerActivity$SocketAcceptor.class */
    public class SocketAcceptor implements Runnable, Shutdown {
        private final BlockingQueue<String> queue;
        private final ServerSocket serverSocket;
        private boolean running = true;

        public SocketAcceptor(BlockingQueue<String> blockingQueue, ServerSocket serverSocket) {
            this.queue = blockingQueue;
            this.serverSocket = serverSocket;
        }

        @Override // io.nosqlbench.activitytype.tcpserver.TCPServerActivity.Shutdown
        public void shutdown() {
            this.running = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ServerSocket serverSocket = this.serverSocket;
                while (this.running) {
                    try {
                        serverSocket.setSoTimeout(1000);
                        serverSocket.setReuseAddress(true);
                        try {
                            Socket accept = serverSocket.accept();
                            SocketWriter socketWriter = new SocketWriter(this.queue, accept);
                            TCPServerActivity.this.managedShutdown.add(socketWriter);
                            Thread thread = new Thread(socketWriter);
                            thread.setName("SocketWriter/" + accept);
                            thread.setDaemon(true);
                            thread.start();
                            TCPServerActivity.logger.info("Started writer thread for " + accept);
                        } catch (SocketTimeoutException e) {
                        }
                    } finally {
                    }
                }
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    /* loaded from: input_file:io/nosqlbench/activitytype/tcpserver/TCPServerActivity$SocketWriter.class */
    public static class SocketWriter implements Runnable, Shutdown {
        private final BlockingQueue<String> sourceQueue;
        private final OutputStream outputStream;
        private final OutputStreamWriter writer;
        private boolean running = true;

        public SocketWriter(BlockingQueue<String> blockingQueue, Socket socket) {
            this.sourceQueue = blockingQueue;
            try {
                this.outputStream = socket.getOutputStream();
                this.writer = new OutputStreamWriter(this.outputStream);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // io.nosqlbench.activitytype.tcpserver.TCPServerActivity.Shutdown
        public void shutdown() {
            this.running = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                OutputStreamWriter outputStreamWriter = this.writer;
                while (true) {
                    try {
                        if (!this.sourceQueue.isEmpty() || this.running) {
                            try {
                                outputStreamWriter.write(this.sourceQueue.take());
                                outputStreamWriter.flush();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(10L);
                            } catch (InterruptedException e2) {
                            }
                        }
                    } finally {
                    }
                }
            } catch (Exception e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    public TCPServerActivity(ActivityDef activityDef) {
        super(activityDef);
        this.managedShutdown = new ArrayList();
        this.capacity = 10;
        boolean booleanValue = ((Boolean) activityDef.getParams().getOptionalBoolean("ssl").orElse(false)).booleanValue();
        this.capacity = ((Integer) activityDef.getParams().getOptionalInteger("capacity").orElse(10)).intValue();
        this.queue = new LinkedBlockingQueue<>(this.capacity);
        if (booleanValue) {
            this.socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(activityDef);
        } else {
            this.socketFactory = ServerSocketFactory.getDefault();
        }
    }

    public void onActivityDefUpdate(ActivityDef activityDef) {
        super.onActivityDefUpdate(activityDef);
    }

    public void shutdownActivity() {
        super.shutdownActivity();
        Iterator<Shutdown> it = this.managedShutdown.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public synchronized void write(String str) {
        while (true) {
            try {
                this.queue.put(str);
                return;
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    protected synchronized Writer createPrintWriter() {
        String str = (String) getActivityDef().getParams().getOptionalString(new String[]{"host"}).orElse("localhost");
        int intValue = ((Integer) getActivityDef().getParams().getOptionalInteger("port").orElse(12345)).intValue();
        if (this.listenerSocket == null || this.listenerSocket.isClosed()) {
            try {
                this.listenerSocket = this.socketFactory.createServerSocket(intValue, 10, InetAddress.getByName(str));
                if (this.socketFactory instanceof SSLServerSocketFactory) {
                    logger.info("SSL enabled on server socket " + this.listenerSocket);
                }
                SocketAcceptor socketAcceptor = new SocketAcceptor(this.queue, this.listenerSocket);
                this.managedShutdown.add(socketAcceptor);
                Thread thread = new Thread(socketAcceptor);
                thread.setDaemon(true);
                thread.setName("Listener/" + this.listenerSocket);
                thread.start();
            } catch (IOException e) {
                throw new RuntimeException("Error listening on listenerSocket:" + e, e);
            }
        }
        QueueWriterAdapter queueWriterAdapter = new QueueWriterAdapter(this.queue);
        logger.info("initialized queue writer:" + queueWriterAdapter);
        return queueWriterAdapter;
    }
}
