package com.nokia.dempsy.messagetransport.tcp;

import com.nokia.dempsy.TestUtils;
import com.nokia.dempsy.executor.DefaultDempsyExecutor;
import com.nokia.dempsy.executor.DempsyExecutor;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.messagetransport.Listener;
import com.nokia.dempsy.messagetransport.MessageTransportException;
import com.nokia.dempsy.messagetransport.Sender;
import com.nokia.dempsy.messagetransport.SenderFactory;
import com.nokia.dempsy.monitoring.basic.BasicStatsCollector;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest.class */
public class TcpTransportTest {
    Logger logger = LoggerFactory.getLogger(TcpTransportTest.class);
    private static final int numThreads = 4;
    private byte[] receivedByteArrayMessage;
    private CountDownLatch receiveLargeMessageLatch;
    private static long baseTimeoutMillis = 20000;
    private static int disruptionCount = 5;
    private static boolean onlyOnce = true;

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$Checker.class */
    public interface Checker {
        void check(int i, boolean z) throws Throwable;
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$DisruptibleOutputStream.class */
    private static class DisruptibleOutputStream extends OutputStream {
        OutputStream proxied;
        Socket socket;
        int dc;
        int numBytesWritten = 0;

        public DisruptibleOutputStream(OutputStream outputStream, Socket socket) {
            this.dc = -1;
            this.proxied = outputStream;
            this.socket = socket;
            this.dc = TcpTransportTest.disruptionCount;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (this.dc >= 0 && this.numBytesWritten == this.dc) {
                try {
                    this.socket.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
            this.proxied.write(i);
            this.numBytesWritten++;
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.proxied.flush();
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.proxied.close();
        }
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$SenderRunnable.class */
    static class SenderRunnable implements Runnable {
        int threadInstance;
        Destination destination;
        SenderFactory senderFactory;
        AtomicBoolean stopOnNormalFailure = new AtomicBoolean(false);
        AtomicBoolean totalFailure = new AtomicBoolean(false);
        AtomicBoolean isStopped = new AtomicBoolean(false);
        AtomicBoolean sendFailed = new AtomicBoolean(false);
        AtomicBoolean keepGoing = new AtomicBoolean(true);
        AtomicLong sentMessageCount = new AtomicLong(0);

        SenderRunnable(Destination destination, int i, SenderFactory senderFactory) {
            this.destination = destination;
            this.threadInstance = i;
            this.senderFactory = senderFactory;
        }

        SenderRunnable stopOnNormalFailure(boolean z) {
            this.stopOnNormalFailure.set(z);
            return this;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (this.keepGoing.get() && !z) {
                try {
                    this.senderFactory.getSender(this.destination).send(("Hello from " + this.threadInstance).getBytes());
                    this.sentMessageCount.incrementAndGet();
                } catch (MessageTransportException e) {
                    this.sendFailed.set(true);
                    if (this.stopOnNormalFailure.get()) {
                        z = true;
                    }
                } catch (Throwable th) {
                    this.totalFailure.set(true);
                }
                Thread.yield();
            }
            this.isStopped.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$StringListener.class */
    public static class StringListener implements Listener {
        Set<String> receivedStringMessages;
        AtomicReference<RuntimeException> throwThisOnce;
        AtomicLong numMessages;
        Object latch;
        AtomicLong numIn;

        public StringListener() {
            this.receivedStringMessages = new HashSet();
            this.throwThisOnce = new AtomicReference<>();
            this.numMessages = new AtomicLong(0L);
            this.latch = null;
            this.numIn = new AtomicLong(0L);
        }

        public StringListener(Object obj) {
            this.receivedStringMessages = new HashSet();
            this.throwThisOnce = new AtomicReference<>();
            this.numMessages = new AtomicLong(0L);
            this.latch = null;
            this.numIn = new AtomicLong(0L);
            this.latch = obj;
        }

        public boolean onMessage(byte[] bArr, boolean z) throws MessageTransportException {
            try {
                this.numIn.incrementAndGet();
                if (this.latch != null) {
                    synchronized (this.latch) {
                        try {
                            this.latch.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                synchronized (this) {
                    this.receivedStringMessages.add(new String(bArr));
                }
                this.numMessages.incrementAndGet();
                if (this.throwThisOnce.get() != null) {
                    throw this.throwThisOnce.getAndSet(null);
                }
                return true;
            } finally {
                this.numIn.decrementAndGet();
            }
        }

        public void shuttingDown() {
        }
    }

    private void runAllCombinations(Checker checker) throws Throwable {
        this.logger.debug("checking " + checker + " with ephemeral port and not using \"localhost.\"");
        checker.check(-1, false);
        this.logger.debug("checking " + checker + " with port 8765 and not using \"localhost.\"");
        checker.check(8765, false);
        this.logger.debug("checking " + checker + " with ephemeral port using \"localhost.\"");
        checker.check(-1, true);
        this.logger.debug("checking " + checker + " with port 8765 using \"localhost.\"");
        checker.check(8765, true);
    }

    @Test
    public void transportSimpleMessage() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.1
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver();
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start((DempsyExecutor) null);
                    senderFactory.getSender(tcpReceiver.getDestination()).send("Hello".getBytes());
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() == 0) {
                        Thread.sleep(1L);
                    }
                    Assert.assertEquals(1L, stringListener.numMessages.get());
                    Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals("Hello", stringListener.receivedStringMessages.iterator().next());
                    tcpReceiver.stop();
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "testTransportSimpleMessage";
            }
        });
    }

    @Test
    public void transportLargeMessage() throws Throwable {
        SenderFactory senderFactory = null;
        TcpReceiver tcpReceiver = null;
        try {
            tcpReceiver = new TcpReceiver();
            senderFactory = makeSenderFactory(false);
            this.receiveLargeMessageLatch = new CountDownLatch(1);
            tcpReceiver.setListener(new Listener() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.2
                public boolean onMessage(byte[] bArr, boolean z) throws MessageTransportException {
                    TcpTransportTest.this.receivedByteArrayMessage = bArr;
                    TcpTransportTest.this.receiveLargeMessageLatch.countDown();
                    return true;
                }

                public void shuttingDown() {
                }
            });
            if (-1 <= 0) {
                tcpReceiver.setUseEphemeralPort(true);
            }
            if (-1 > 0) {
                tcpReceiver.setPort(-1);
            }
            if (0 != 0) {
                tcpReceiver.setUseLocalhost(false);
            }
            tcpReceiver.start((DempsyExecutor) null);
            tcpReceiver.start((DempsyExecutor) null);
            TcpDestination destination = tcpReceiver.getDestination();
            if (-1 > 0) {
                tcpReceiver.setPort(-1);
            }
            if (0 != 0) {
                tcpReceiver.setUseLocalhost(false);
            }
            TcpSenderFactory makeSenderFactory = makeSenderFactory(false);
            byte[] bArr = new byte[10485760];
            for (int i = 0; i < 10485760; i++) {
                bArr[i] = (byte) i;
            }
            makeSenderFactory.getSender(destination).send(bArr);
            Assert.assertTrue(this.receiveLargeMessageLatch.await(1L, TimeUnit.MINUTES));
            Assert.assertArrayEquals(bArr, this.receivedByteArrayMessage);
            if (senderFactory != null) {
                senderFactory.stop();
            }
            if (tcpReceiver != null) {
                tcpReceiver.stop();
            }
            this.receivedByteArrayMessage = null;
        } catch (Throwable th) {
            if (senderFactory != null) {
                senderFactory.stop();
            }
            if (tcpReceiver != null) {
                tcpReceiver.stop();
            }
            this.receivedByteArrayMessage = null;
            throw th;
        }
    }

    @Test
    public void transportMultipleConnectionsFailedClient() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.3
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver();
                    senderFactory = TcpTransportTest.this.makeSenderFactory(true);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start((DempsyExecutor) null);
                    TcpDestination destination = tcpReceiver.getDestination();
                    ArrayList arrayList = new ArrayList();
                    SenderRunnable[] senderRunnableArr = new SenderRunnable[TcpTransportTest.numThreads];
                    for (int i2 = 0; i2 < TcpTransportTest.numThreads; i2++) {
                        SenderRunnable senderRunnable = new SenderRunnable(destination, i2, senderFactory);
                        senderRunnableArr[i2] = senderRunnable;
                        arrayList.add(i2, new Thread(senderRunnable, "Test Sender for " + i2));
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Thread) it.next()).start();
                    }
                    SenderRunnable senderRunnable2 = null;
                    long currentTimeMillis = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                    while (currentTimeMillis > System.currentTimeMillis() && senderRunnable2 == null) {
                        Thread.sleep(1L);
                        for (SenderRunnable senderRunnable3 : senderRunnableArr) {
                            if (senderRunnable3.sendFailed.get()) {
                                senderRunnable2 = senderRunnable3;
                            }
                        }
                    }
                    Assert.assertNotNull(senderRunnable2);
                    ArrayList arrayList2 = new ArrayList();
                    for (SenderRunnable senderRunnable4 : senderRunnableArr) {
                        if (senderRunnable4 != senderRunnable2) {
                            arrayList2.add(senderRunnable4);
                        }
                    }
                    Assert.assertEquals(3L, arrayList2.size());
                    long[] jArr = new long[TcpTransportTest.numThreads];
                    int i3 = 0;
                    for (SenderRunnable senderRunnable5 : senderRunnableArr) {
                        int i4 = i3;
                        i3++;
                        jArr[i4] = senderRunnable5.sentMessageCount.get();
                    }
                    boolean z2 = false;
                    long currentTimeMillis2 = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                    while (currentTimeMillis2 > System.currentTimeMillis() && !z2) {
                        z2 = true;
                        Thread.sleep(1L);
                        int i5 = 0;
                        for (SenderRunnable senderRunnable6 : senderRunnableArr) {
                            int i6 = i5;
                            i5++;
                            if (jArr[i6] >= senderRunnable6.sentMessageCount.get()) {
                                z2 = false;
                            }
                        }
                    }
                    Assert.assertTrue(z2);
                    for (SenderRunnable senderRunnable7 : senderRunnableArr) {
                        senderRunnable7.keepGoing.set(false);
                    }
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        ((Thread) it2.next()).join(5000L);
                    }
                    for (SenderRunnable senderRunnable8 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable8.isStopped.get());
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportMultipleConnectionsFailedClient";
            }
        });
    }

    @Test
    public void transportMultipleConnectionsFailedReceiver() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.4
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver();
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start((DempsyExecutor) null);
                    TcpDestination destination = tcpReceiver.getDestination();
                    ArrayList arrayList = new ArrayList();
                    SenderRunnable[] senderRunnableArr = new SenderRunnable[TcpTransportTest.numThreads];
                    for (int i2 = 0; i2 < TcpTransportTest.numThreads; i2++) {
                        SenderRunnable stopOnNormalFailure = new SenderRunnable(destination, i2, senderFactory).stopOnNormalFailure(true);
                        senderRunnableArr[i2] = stopOnNormalFailure;
                        arrayList.add(i2, new Thread(stopOnNormalFailure, "Test Sender for " + i2));
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Thread) it.next()).start();
                    }
                    for (SenderRunnable senderRunnable : senderRunnableArr) {
                        long currentTimeMillis = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                        while (currentTimeMillis > System.currentTimeMillis() && senderRunnable.sentMessageCount.get() == 0) {
                            Thread.sleep(1L);
                        }
                    }
                    for (SenderRunnable senderRunnable2 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable2.sentMessageCount.get() > 0);
                    }
                    long currentTimeMillis2 = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                    while (currentTimeMillis2 > System.currentTimeMillis() && stringListener.receivedStringMessages.size() != TcpTransportTest.numThreads) {
                        Thread.sleep(1L);
                    }
                    long j = stringListener.numMessages.get();
                    stringListener.throwThisOnce.set(new RuntimeException("Yo!"));
                    long currentTimeMillis3 = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                    while (currentTimeMillis3 > System.currentTimeMillis() && stringListener.numMessages.get() <= j) {
                        Thread.sleep(1L);
                    }
                    Assert.assertTrue(stringListener.numMessages.get() > j);
                    long j2 = stringListener.numMessages.get();
                    long currentTimeMillis4 = System.currentTimeMillis() + (4 * TcpTransportTest.baseTimeoutMillis);
                    while (currentTimeMillis4 > System.currentTimeMillis() && stringListener.numMessages.get() <= j2) {
                        Thread.sleep(1L);
                    }
                    Assert.assertTrue(stringListener.numMessages.get() > j2);
                    tcpReceiver.stop();
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        ((Thread) it2.next()).join(4 * TcpTransportTest.baseTimeoutMillis);
                    }
                    for (SenderRunnable senderRunnable3 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable3.isStopped.get());
                    }
                    Assert.assertEquals(4L, stringListener.receivedStringMessages.size());
                    for (int i3 = 0; i3 < TcpTransportTest.numThreads; i3++) {
                        Assert.assertTrue(stringListener.receivedStringMessages.contains("Hello from " + i3));
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportMultipleConnectionsFailedWriter";
            }
        });
    }

    @Before
    public void resetGlobals() {
        disruptionCount = 5;
        onlyOnce = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TcpSenderFactory makeSenderFactory(boolean z) {
        return z ? new TcpSenderFactory() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.5
            protected TcpSender makeTcpSender(TcpDestination tcpDestination) throws MessageTransportException {
                return new TcpSender(tcpDestination) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.5.1
                    boolean onceOnly = TcpTransportTest.onlyOnce;
                    boolean didItOnceAlready = false;

                    protected synchronized Socket makeSocket(TcpDestination tcpDestination2) throws IOException {
                        if (this.didItOnceAlready && this.onceOnly) {
                            return new Socket(tcpDestination2.inetAddress, tcpDestination2.port);
                        }
                        this.didItOnceAlready = true;
                        return new Socket(tcpDestination2.inetAddress, tcpDestination2.port) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.5.1.1
                            @Override // java.net.Socket
                            public OutputStream getOutputStream() throws IOException {
                                return new DisruptibleOutputStream(super.getOutputStream(), this);
                            }
                        };
                    }
                };
            }
        } : new TcpSenderFactory();
    }

    @Test
    public void transportMTWorkerMessage() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver();
                    final Object obj = new Object();
                    BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    StringListener stringListener = new StringListener(obj);
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start((DempsyExecutor) null);
                    final Sender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    final int numThreads2 = tcpReceiver.executor.getNumThreads();
                    Assert.assertTrue(numThreads2 > 1);
                    for (int i2 = 0; i2 < tcpReceiver.executor.getMaxNumberOfQueuedLimitedTasks() + tcpReceiver.executor.getNumThreads(); i2++) {
                        sender.send("Hello".getBytes());
                    }
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, stringListener, new TestUtils.Condition<StringListener>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(StringListener stringListener2) {
                            return stringListener2.numIn.get() == ((long) numThreads2);
                        }
                    }));
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, tcpReceiver.executor, new TestUtils.Condition<DefaultDempsyExecutor>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6.2
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(DefaultDempsyExecutor defaultDempsyExecutor) {
                            return defaultDempsyExecutor.getCurrentQueuedLimitedTasks() == defaultDempsyExecutor.getMaxNumberOfQueuedLimitedTasks();
                        }
                    }));
                    Assert.assertEquals(0L, basicStatsCollector.getDiscardedMessageCount());
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6.3
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            sender.send("Hello".getBytes());
                            sender.send("Hello".getBytes());
                            synchronized (obj) {
                                obj.notify();
                            }
                            return basicStatsCollector2.getDiscardedMessageCount() > 0;
                        }
                    }));
                    stringListener.latch = null;
                    synchronized (obj) {
                        obj.notifyAll();
                    }
                    tcpReceiver.stop();
                    synchronized (obj) {
                        obj.notifyAll();
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "testTransportSimpleMessage";
            }
        });
    }
}
