package com.nokia.dempsy.messagetransport.tcp;

import com.nokia.dempsy.TestUtils;
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.executor.DefaultDempsyExecutor;
import com.nokia.dempsy.executor.DempsyExecutor;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.messagetransport.Listener;
import com.nokia.dempsy.messagetransport.MessageTransportException;
import com.nokia.dempsy.messagetransport.Sender;
import com.nokia.dempsy.messagetransport.SenderFactory;
import com.nokia.dempsy.messagetransport.tcp.TcpReceiver;
import com.nokia.dempsy.monitoring.StatsCollector;
import com.nokia.dempsy.monitoring.basic.BasicStatsCollector;
import com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda;
import com.yammer.metrics.core.Histogram;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest.class */
public class TcpTransportTest {
    private static final int numThreads = 4;
    private static final long baseTimeoutMillis = 9999999999999L;
    private static final int tcpTransportHeaderSize = 2;
    private static final String charSet = "ABCDEFGHIJKLMNOPabcdefghijklmnop0123456789";
    private byte[] receivedByteArrayMessage;
    private CountDownLatch receiveLargeMessageLatch;
    static Logger logger = LoggerFactory.getLogger(TcpTransportTest.class);
    private static int failFastCalls = 0;
    private static int disruptionCount = 5;
    private static boolean onlyOnce = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest$13, reason: invalid class name */
    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$13.class */
    public class AnonymousClass13 extends TcpSenderFactory {
        final /* synthetic */ OutputStreamFactory val$osfactory;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass13(StatsCollector statsCollector, long j, long j2, long j3, OutputStreamFactory outputStreamFactory) {
            super(statsCollector, j, j2, j3);
            this.val$osfactory = outputStreamFactory;
        }

        protected TcpSender makeTcpSender(TcpDestination tcpDestination) throws MessageTransportException {
            return new TcpSender(tcpDestination, this.statsCollector, this.maxNumberOfQueuedOutbound, this.socketWriteTimeoutMillis, this.batchOutgoingMessagesDelayMillis, 1500) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.13.1
                boolean onceOnly = TcpTransportTest.onlyOnce;
                boolean didItOnceAlready = false;

                protected synchronized Socket makeSocket(TcpDestination tcpDestination2) throws IOException {
                    if (this.didItOnceAlready && this.onceOnly) {
                        return new Socket(tcpDestination2.inetAddress, tcpDestination2.port);
                    }
                    this.didItOnceAlready = true;
                    return new Socket(tcpDestination2.inetAddress, tcpDestination2.port) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.13.1.1
                        final OutputStreamFactory factory;

                        {
                            this.factory = AnonymousClass13.this.val$osfactory;
                        }

                        @Override // java.net.Socket
                        public OutputStream getOutputStream() throws IOException {
                            return this.factory.makeOutputStream(super.getOutputStream(), this);
                        }
                    };
                }
            };
        }
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$BatchingOutputStreamWatcher.class */
    public static class BatchingOutputStreamWatcher implements OutputStreamFactory {
        final long[] flushByteCounts;

        public BatchingOutputStreamWatcher(long[] jArr) {
            this.flushByteCounts = jArr;
        }

        @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.OutputStreamFactory
        public OutputStream makeOutputStream(final OutputStream outputStream, Socket socket) throws IOException {
            return new OutputStream() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.BatchingOutputStreamWatcher.1
                final OutputStream proxy;
                int curCount = 0;

                {
                    this.proxy = outputStream;
                }

                @Override // java.io.OutputStream
                public void write(int i) throws IOException {
                    long[] jArr = BatchingOutputStreamWatcher.this.flushByteCounts;
                    int i2 = this.curCount;
                    jArr[i2] = jArr[i2] + 1;
                }

                @Override // java.io.OutputStream, java.io.Flushable
                public void flush() throws IOException {
                    this.curCount++;
                    this.proxy.flush();
                }

                @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
                public void close() throws IOException {
                    this.proxy.close();
                }

                @Override // java.io.OutputStream
                public void write(byte[] bArr, int i, int i2) throws IOException {
                    long[] jArr = BatchingOutputStreamWatcher.this.flushByteCounts;
                    int i3 = this.curCount;
                    jArr[i3] = jArr[i3] + i2;
                    this.proxy.write(bArr, i, i2);
                }
            };
        }
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$Checker.class */
    public interface Checker {
        void check(int i, boolean z, long j) throws Throwable;
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$DisruptibleOutputStream.class */
    private static class DisruptibleOutputStream extends OutputStream {
        OutputStream proxied;
        Socket socket;
        int dc;
        int numBytesWritten = 0;

        public DisruptibleOutputStream(OutputStream outputStream, Socket socket) {
            this.dc = -1;
            this.proxied = outputStream;
            this.socket = socket;
            this.dc = TcpTransportTest.disruptionCount;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (this.dc >= 0 && this.numBytesWritten == this.dc) {
                try {
                    this.socket.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
            this.proxied.write(i);
            this.numBytesWritten++;
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.proxied.flush();
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.proxied.close();
        }
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$OutputStreamFactory.class */
    public interface OutputStreamFactory {
        OutputStream makeOutputStream(OutputStream outputStream, Socket socket) throws IOException;
    }

    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$SenderRunnable.class */
    static class SenderRunnable implements Runnable {
        int threadInstance;
        Destination destination;
        SenderFactory senderFactory;
        AtomicBoolean stopOnNormalFailure = new AtomicBoolean(false);
        AtomicBoolean totalFailure = new AtomicBoolean(false);
        AtomicBoolean isStopped = new AtomicBoolean(false);
        AtomicBoolean sendFailed = new AtomicBoolean(false);
        AtomicBoolean keepGoing = new AtomicBoolean(true);
        AtomicLong sentMessageCount = new AtomicLong(0);

        SenderRunnable(Destination destination, int i, SenderFactory senderFactory) {
            this.destination = destination;
            this.threadInstance = i;
            this.senderFactory = senderFactory;
        }

        SenderRunnable stopOnNormalFailure(boolean z) {
            this.stopOnNormalFailure.set(z);
            return this;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (this.keepGoing.get() && !z) {
                try {
                    this.senderFactory.getSender(this.destination).send(("Hello from " + this.threadInstance).getBytes());
                    this.sentMessageCount.incrementAndGet();
                } catch (MessageTransportException e) {
                    this.sendFailed.set(true);
                    if (this.stopOnNormalFailure.get()) {
                        z = true;
                    }
                } catch (Throwable th) {
                    this.totalFailure.set(true);
                }
                Thread.yield();
            }
            this.isStopped.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/nokia/dempsy/messagetransport/tcp/TcpTransportTest$StringListener.class */
    public static class StringListener implements Listener {
        Set<String> receivedStringMessages;
        AtomicReference<RuntimeException> throwThisOnce;
        AtomicLong numMessages;
        Object latch;
        AtomicLong numIn;

        public StringListener() {
            this.receivedStringMessages = new HashSet();
            this.throwThisOnce = new AtomicReference<>();
            this.numMessages = new AtomicLong(0L);
            this.latch = null;
            this.numIn = new AtomicLong(0L);
        }

        public StringListener(Object obj) {
            this.receivedStringMessages = new HashSet();
            this.throwThisOnce = new AtomicReference<>();
            this.numMessages = new AtomicLong(0L);
            this.latch = null;
            this.numIn = new AtomicLong(0L);
            this.latch = obj;
        }

        public boolean onMessage(byte[] bArr, boolean z) throws MessageTransportException {
            try {
                this.numIn.incrementAndGet();
                if (this.latch != null) {
                    synchronized (this.latch) {
                        try {
                            this.latch.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                synchronized (this) {
                    this.receivedStringMessages.add(new String(bArr));
                }
                this.numMessages.incrementAndGet();
                if (this.throwThisOnce.get() != null) {
                    throw this.throwThisOnce.getAndSet(null);
                }
                return true;
            } finally {
                this.numIn.decrementAndGet();
            }
        }

        public void shuttingDown() {
        }
    }

    private static boolean getFailFast() {
        failFastCalls++;
        return (failFastCalls & 1) != 0;
    }

    private void runAllCombinations(Checker checker) throws Throwable {
        logger.debug("checking " + checker + " with ephemeral port and not using \"localhost.\"");
        checker.check(-1, false, 500L);
        logger.debug("checking " + checker + " with port 8765 and not using \"localhost.\"");
        checker.check(8765, false, -1L);
        logger.debug("checking " + checker + " with ephemeral port using \"localhost.\"");
        checker.check(-1, true, -1L);
        logger.debug("checking " + checker + " with port 8765 using \"localhost.\"");
        checker.check(8765, true, -1L);
    }

    @Test
    public void testTransportInstantiation() throws Throwable {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.1
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                StatsCollector createStatsCollector = new StatsCollectorFactoryCoda().createStatsCollector(new ClusterId("test", "test-cluster"), new Destination() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.1.1
                });
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                boolean z2 = j >= 0;
                if (z2) {
                    try {
                        atomicBoolean.set(true);
                    } catch (Throwable th) {
                        if (senderFactory != null) {
                            senderFactory.stop();
                        }
                        if (tcpReceiver != null) {
                            tcpReceiver.stop();
                        }
                        throw th;
                    }
                }
                TcpTransport tcpTransport = new TcpTransport();
                tcpTransport.setFailFast(TcpTransportTest.access$000());
                Assert.assertFalse(tcpTransport.isBatchingDisabled());
                if (!z2) {
                    tcpTransport.setDisableBatching(true);
                }
                if (!z2) {
                    Assert.assertTrue(tcpTransport.isBatchingDisabled());
                }
                Assert.assertEquals(Boolean.valueOf(!z2), Boolean.valueOf(tcpTransport.isBatchingDisabled()));
                tcpReceiver = (TcpReceiver) tcpTransport.createInbound((DempsyExecutor) null);
                tcpReceiver.setStatsCollector(createStatsCollector);
                StringListener stringListener = new StringListener();
                tcpReceiver.setListener(stringListener);
                senderFactory = tcpTransport.createOutbound((DempsyExecutor) null, createStatsCollector);
                if (i > 0) {
                    tcpReceiver.setPort(i);
                }
                if (z) {
                    tcpReceiver.setUseLocalhost(z);
                }
                tcpReceiver.start();
                TcpDestination destination = tcpReceiver.getDestination();
                byte[] bytes = "Hello".getBytes();
                TcpSender sender = senderFactory.getSender(destination);
                Assert.assertEquals(z2 ? 175L : -1L, sender.getBatchOutgoingMessagesDelayMillis());
                sender.send(bytes);
                sender.send(bytes);
                long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() < 2) {
                    Thread.sleep(1L);
                }
                Assert.assertEquals(2L, stringListener.numMessages.get());
                Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                Assert.assertEquals("Hello", stringListener.receivedStringMessages.iterator().next());
                if (z2) {
                    Histogram batchingHistogram = sender.getBatchingHistogram();
                    Assert.assertEquals(TcpTransportTest.calcMean(2), batchingHistogram.mean(), 1.0E-7d);
                    Assert.assertEquals(1L, batchingHistogram.count());
                }
                if (senderFactory != null) {
                    senderFactory.stop();
                }
                if (tcpReceiver != null) {
                    tcpReceiver.stop();
                }
            }

            public String toString() {
                return "testTransportInstantiation";
            }
        });
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void transportSimpleMessage() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.2
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    senderFactory.getSender(tcpReceiver.getDestination()).send("Hello".getBytes());
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() == 0) {
                        Thread.sleep(1L);
                    }
                    Assert.assertEquals(1L, stringListener.numMessages.get());
                    Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals("Hello", stringListener.receivedStringMessages.iterator().next());
                    tcpReceiver.stop();
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "testTransportSimpleMessage";
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final double calcMean(long... jArr) {
        double d = 0.0d;
        for (long j : jArr) {
            d += j;
        }
        return d / jArr.length;
    }

    @Test
    public void transportBatchingTimeout() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.3
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                StatsCollector createStatsCollector = new StatsCollectorFactoryCoda().createStatsCollector(new ClusterId("test", "test-cluster"), new Destination() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.3.1
                });
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(createStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    long[] jArr = new long[TcpTransportTest.numThreads];
                    senderFactory = TcpTransportTest.this.makeSenderFactory(new BatchingOutputStreamWatcher(jArr), createStatsCollector, 175L);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpSender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    byte[] bytes = "Hello".getBytes();
                    int length = bytes.length + TcpTransportTest.tcpTransportHeaderSize;
                    sender.send(bytes);
                    Thread.sleep(175 * 2);
                    sender.send(bytes);
                    sender.send(bytes);
                    Thread.sleep(175 * 2);
                    sender.send(bytes);
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() < 4) {
                        Thread.sleep(1L);
                    }
                    Thread.sleep(10L);
                    Assert.assertEquals(4L, stringListener.numMessages.get());
                    Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals("Hello", stringListener.receivedStringMessages.iterator().next());
                    Assert.assertEquals(length, jArr[0]);
                    Assert.assertEquals(length * TcpTransportTest.tcpTransportHeaderSize, jArr[1]);
                    Assert.assertEquals(length, jArr[TcpTransportTest.tcpTransportHeaderSize]);
                    Assert.assertEquals(0L, jArr[3]);
                    Histogram batchingHistogram = sender.getBatchingHistogram();
                    Assert.assertEquals(TcpTransportTest.calcMean(1, 2, 1), batchingHistogram.mean(), 1.0E-7d);
                    Assert.assertEquals(3L, batchingHistogram.count());
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                    throw th;
                }
            }

            public String toString() {
                return "transportBatchingTimeout";
            }
        });
    }

    @Test
    public void transportBatching() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.4
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                StatsCollector createStatsCollector = new StatsCollectorFactoryCoda().createStatsCollector(new ClusterId("test", "test-cluster"), new Destination() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.4.1
                });
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(createStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    long[] jArr = new long[TcpTransportTest.tcpTransportHeaderSize];
                    senderFactory = TcpTransportTest.this.makeSenderFactory(new BatchingOutputStreamWatcher(jArr), createStatsCollector, 10000L);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpSender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    int mtu = sender.getMtu();
                    byte[] bytes = "Hello".getBytes();
                    int i2 = 0;
                    int i3 = 0;
                    int i4 = 0;
                    while (i3 < mtu) {
                        sender.send(bytes);
                        i4 = i3;
                        i3 += bytes.length + TcpTransportTest.tcpTransportHeaderSize;
                        i2++;
                    }
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() < i2) {
                        Thread.sleep(1L);
                    }
                    Thread.sleep(10L);
                    Assert.assertEquals(i2, stringListener.numMessages.get());
                    Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals("Hello", stringListener.receivedStringMessages.iterator().next());
                    Assert.assertEquals(i4, jArr[0]);
                    Assert.assertEquals(i3 - i4, jArr[1]);
                    Histogram batchingHistogram = sender.getBatchingHistogram();
                    Assert.assertEquals(TcpTransportTest.calcMean(i2 - 1, 1), batchingHistogram.mean(), 1.0E-7d);
                    Assert.assertEquals(2L, batchingHistogram.count());
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                    throw th;
                }
            }

            public String toString() {
                return "transportBatching";
            }
        });
    }

    @Test
    public void transportBatchingMessageLargerThanMtu() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.5
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                StatsCollector createStatsCollector = new StatsCollectorFactoryCoda().createStatsCollector(new ClusterId("test", "test-cluster"), new Destination() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.5.1
                });
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                TcpReceiver tcpReceiver2 = null;
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver2 = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(createStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    long[] jArr = new long[TcpTransportTest.tcpTransportHeaderSize];
                    senderFactory = TcpTransportTest.this.makeSenderFactory(new BatchingOutputStreamWatcher(jArr), createStatsCollector, 10000L);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpDestination destination = tcpReceiver.getDestination();
                    TcpDestination destination2 = tcpReceiver2.getDestination();
                    TcpSender sender = senderFactory.getSender(destination);
                    senderFactory.getSender(destination2);
                    int mtu = sender.getMtu();
                    Random random = new Random();
                    byte[] bArr = new byte[(int) (mtu * 1.1d)];
                    for (int i2 = 0; i2 < bArr.length; i2++) {
                        bArr[i2] = (byte) TcpTransportTest.charSet.charAt(random.nextInt(TcpTransportTest.charSet.length()));
                    }
                    sender.send(bArr);
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() == 0) {
                        Thread.sleep(1L);
                    }
                    Thread.sleep(10L);
                    Assert.assertEquals(1L, stringListener.numMessages.get());
                    Assert.assertEquals(1L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals(bArr.length + TcpTransportTest.tcpTransportHeaderSize, jArr[0]);
                    Assert.assertEquals(0L, jArr[1]);
                    Histogram batchingHistogram = sender.getBatchingHistogram();
                    Assert.assertEquals(1.0d, batchingHistogram.mean(), 1.0E-7d);
                    Assert.assertEquals(1L, batchingHistogram.count());
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    if (tcpReceiver2 != null) {
                        tcpReceiver2.stop();
                    }
                    createStatsCollector.stop();
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    if (tcpReceiver2 != null) {
                        tcpReceiver2.stop();
                    }
                    createStatsCollector.stop();
                    throw th;
                }
            }

            public String toString() {
                return "transportBatchingMessageLargerThanMtu";
            }
        });
    }

    @Test
    public void transportBatchingMessageLargerAndSmallerMixed() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                StatsCollector createStatsCollector = new StatsCollectorFactoryCoda().createStatsCollector(new ClusterId("test", "test-cluster"), new Destination() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.6.1
                });
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(createStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    long[] jArr = new long[TcpTransportTest.numThreads];
                    senderFactory = TcpTransportTest.this.makeSenderFactory(new BatchingOutputStreamWatcher(jArr), createStatsCollector, 10000L);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpSender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    int mtu = sender.getMtu();
                    Random random = new Random();
                    byte[] bArr = new byte[(int) (mtu * 1.1d)];
                    for (int i2 = 0; i2 < bArr.length; i2++) {
                        bArr[i2] = (byte) TcpTransportTest.charSet.charAt(random.nextInt(TcpTransportTest.charSet.length()));
                    }
                    byte[] bytes = "Hello".getBytes();
                    sender.send(bArr);
                    sender.send(bytes);
                    sender.send(bytes);
                    sender.send(bArr);
                    long currentTimeMillis = System.currentTimeMillis() + TcpTransportTest.baseTimeoutMillis;
                    while (currentTimeMillis > System.currentTimeMillis() && stringListener.numMessages.get() < 4) {
                        Thread.sleep(1L);
                    }
                    Thread.sleep(10L);
                    Assert.assertEquals(4L, stringListener.numMessages.get());
                    Assert.assertEquals(2L, stringListener.receivedStringMessages.size());
                    Assert.assertEquals(bArr.length + TcpTransportTest.tcpTransportHeaderSize, jArr[0]);
                    Assert.assertEquals((bytes.length + TcpTransportTest.tcpTransportHeaderSize) * TcpTransportTest.tcpTransportHeaderSize, jArr[1]);
                    Assert.assertEquals(bArr.length + TcpTransportTest.tcpTransportHeaderSize, jArr[TcpTransportTest.tcpTransportHeaderSize]);
                    Assert.assertEquals(0L, jArr[3]);
                    Histogram batchingHistogram = sender.getBatchingHistogram();
                    Assert.assertEquals(TcpTransportTest.calcMean(1, 2, 1), batchingHistogram.mean(), 1.0E-7d);
                    Assert.assertEquals(3L, batchingHistogram.count());
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    createStatsCollector.stop();
                    throw th;
                }
            }

            public String toString() {
                return "transportBatchingMessageLargerAndSmallerMixed";
            }
        });
    }

    static TcpReceiver makeHangingReceiver() {
        return new TcpReceiver(null, getFailFast()) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.7
            int count = 0;

            protected TcpReceiver.ClientThread makeNewClientThread(Socket socket) throws IOException {
                this.count++;
                return this.count == 1 ? new TcpReceiver.ClientThread(socket) { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.7.1
                    public void run() {
                        while (!this.stopClient.get()) {
                            try {
                                Thread.sleep(1L);
                            } catch (Throwable th) {
                            }
                        }
                    }
                } : new TcpReceiver.ClientThread(this, socket);
            }
        };
    }

    @Test
    public void transportSimpleMessageHungReciever() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.8
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                final byte[] bArr = new byte[8192];
                BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                try {
                    tcpReceiver = TcpTransportTest.makeHangingReceiver();
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    final StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    final TcpSender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    sender.setTimeoutMillis(100L);
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.8.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            sender.send(bArr);
                            return basicStatsCollector2.getMessagesNotSentCount() > 0;
                        }
                    }));
                    Thread.sleep(100L);
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, Long.valueOf(stringListener.numMessages.get()), new TestUtils.Condition<Long>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.8.2
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(Long l) throws Throwable {
                            sender.send("Hello".getBytes());
                            return stringListener.numMessages.get() > l.longValue();
                        }
                    }));
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportSimpleMessageHungReciever";
            }
        });
    }

    @Test
    public void transportLargeMessage() throws Throwable {
        SenderFactory senderFactory = null;
        TcpReceiver tcpReceiver = null;
        TcpSenderFactory tcpSenderFactory = null;
        try {
            tcpReceiver = new TcpReceiver((DempsyExecutor) null, getFailFast());
            senderFactory = makeSenderFactory(false, (StatsCollector) null, 500L);
            this.receiveLargeMessageLatch = new CountDownLatch(1);
            tcpReceiver.setListener(new Listener() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.9
                public boolean onMessage(byte[] bArr, boolean z) throws MessageTransportException {
                    TcpTransportTest.this.receivedByteArrayMessage = bArr;
                    TcpTransportTest.this.receiveLargeMessageLatch.countDown();
                    return true;
                }

                public void shuttingDown() {
                }
            });
            if (-1 <= 0) {
                tcpReceiver.setUseEphemeralPort(true);
            }
            if (-1 > 0) {
                tcpReceiver.setPort(-1);
            }
            if (0 != 0) {
                tcpReceiver.setUseLocalhost(false);
            }
            tcpReceiver.start();
            tcpReceiver.start();
            TcpDestination destination = tcpReceiver.getDestination();
            if (-1 > 0) {
                tcpReceiver.setPort(-1);
            }
            if (0 != 0) {
                tcpReceiver.setUseLocalhost(false);
            }
            tcpSenderFactory = makeSenderFactory(false, (StatsCollector) null, 500L);
            byte[] bArr = new byte[10485760];
            for (int i = 0; i < 10485760; i++) {
                bArr[i] = (byte) i;
            }
            TcpSender sender = tcpSenderFactory.getSender(destination);
            sender.setTimeoutMillis(100000L);
            sender.send(bArr);
            Assert.assertTrue(this.receiveLargeMessageLatch.await(1L, TimeUnit.MINUTES));
            Assert.assertArrayEquals(bArr, this.receivedByteArrayMessage);
            if (senderFactory != null) {
                senderFactory.stop();
            }
            if (tcpSenderFactory != null) {
                tcpSenderFactory.stop();
            }
            if (tcpReceiver != null) {
                tcpReceiver.stop();
            }
            this.receivedByteArrayMessage = null;
        } catch (Throwable th) {
            if (senderFactory != null) {
                senderFactory.stop();
            }
            if (tcpSenderFactory != null) {
                tcpSenderFactory.stop();
            }
            if (tcpReceiver != null) {
                tcpReceiver.stop();
            }
            this.receivedByteArrayMessage = null;
            throw th;
        }
    }

    @Test
    public void transportMultipleConnectionsFailedClient() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.10
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(true, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpDestination destination = tcpReceiver.getDestination();
                    ArrayList arrayList = new ArrayList();
                    SenderRunnable[] senderRunnableArr = new SenderRunnable[TcpTransportTest.numThreads];
                    for (int i2 = 0; i2 < TcpTransportTest.numThreads; i2++) {
                        SenderRunnable senderRunnable = new SenderRunnable(destination, i2, senderFactory);
                        senderRunnableArr[i2] = senderRunnable;
                        arrayList.add(i2, new Thread(senderRunnable, "Test Sender for " + i2));
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Thread) it.next()).start();
                    }
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.10.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            return basicStatsCollector2.getMessagesNotSentCount() > 0;
                        }
                    }));
                    Thread.sleep(10L);
                    Assert.assertEquals(1L, basicStatsCollector.getMessagesNotSentCount());
                    long[] jArr = new long[TcpTransportTest.numThreads];
                    int i3 = 0;
                    for (SenderRunnable senderRunnable2 : senderRunnableArr) {
                        int i4 = i3;
                        i3++;
                        jArr[i4] = senderRunnable2.sentMessageCount.get();
                    }
                    boolean z2 = false;
                    long currentTimeMillis = System.currentTimeMillis() + 39999999999996L;
                    while (currentTimeMillis > System.currentTimeMillis() && !z2) {
                        z2 = true;
                        Thread.sleep(1L);
                        int i5 = 0;
                        for (SenderRunnable senderRunnable3 : senderRunnableArr) {
                            int i6 = i5;
                            i5++;
                            if (jArr[i6] >= senderRunnable3.sentMessageCount.get()) {
                                z2 = false;
                            }
                        }
                    }
                    Assert.assertTrue(z2);
                    for (SenderRunnable senderRunnable4 : senderRunnableArr) {
                        senderRunnable4.keepGoing.set(false);
                    }
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        ((Thread) it2.next()).join(5000L);
                    }
                    for (SenderRunnable senderRunnable5 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable5.isStopped.get());
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportMultipleConnectionsFailedClient";
            }
        });
    }

    @Test
    public void transportMultipleConnectionsFailedReceiver() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.11
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    TcpDestination destination = tcpReceiver.getDestination();
                    ArrayList arrayList = new ArrayList();
                    SenderRunnable[] senderRunnableArr = new SenderRunnable[TcpTransportTest.numThreads];
                    for (int i2 = 0; i2 < TcpTransportTest.numThreads; i2++) {
                        SenderRunnable stopOnNormalFailure = new SenderRunnable(destination, i2, senderFactory).stopOnNormalFailure(true);
                        senderRunnableArr[i2] = stopOnNormalFailure;
                        arrayList.add(i2, new Thread(stopOnNormalFailure, "Test Sender for " + i2));
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Thread) it.next()).start();
                    }
                    for (SenderRunnable senderRunnable : senderRunnableArr) {
                        long currentTimeMillis = System.currentTimeMillis() + 39999999999996L;
                        while (currentTimeMillis > System.currentTimeMillis() && senderRunnable.sentMessageCount.get() == 0) {
                            Thread.sleep(1L);
                        }
                    }
                    for (SenderRunnable senderRunnable2 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable2.sentMessageCount.get() > 0);
                    }
                    long currentTimeMillis2 = System.currentTimeMillis() + 39999999999996L;
                    while (currentTimeMillis2 > System.currentTimeMillis() && stringListener.receivedStringMessages.size() != TcpTransportTest.numThreads) {
                        Thread.sleep(1L);
                    }
                    long j2 = stringListener.numMessages.get();
                    stringListener.throwThisOnce.set(new RuntimeException("Yo!"));
                    long currentTimeMillis3 = System.currentTimeMillis() + 39999999999996L;
                    while (currentTimeMillis3 > System.currentTimeMillis() && stringListener.numMessages.get() <= j2) {
                        Thread.sleep(1L);
                    }
                    Assert.assertTrue(stringListener.numMessages.get() > j2);
                    long j3 = stringListener.numMessages.get();
                    long currentTimeMillis4 = System.currentTimeMillis() + 39999999999996L;
                    while (currentTimeMillis4 > System.currentTimeMillis() && stringListener.numMessages.get() <= j3) {
                        Thread.sleep(1L);
                    }
                    Assert.assertTrue(stringListener.numMessages.get() > j3);
                    Assert.assertEquals(0L, basicStatsCollector.getMessagesNotSentCount());
                    tcpReceiver.stop();
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.11.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            return basicStatsCollector2.getMessagesNotSentCount() > 40;
                        }
                    }));
                    for (SenderRunnable senderRunnable3 : senderRunnableArr) {
                        senderRunnable3.keepGoing.set(false);
                    }
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        ((Thread) it2.next()).join(39999999999996L);
                    }
                    for (SenderRunnable senderRunnable4 : senderRunnableArr) {
                        Assert.assertTrue(senderRunnable4.isStopped.get());
                    }
                    Assert.assertEquals(4L, stringListener.receivedStringMessages.size());
                    for (int i3 = 0; i3 < TcpTransportTest.numThreads; i3++) {
                        Assert.assertTrue(stringListener.receivedStringMessages.contains("Hello from " + i3));
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportMultipleConnectionsFailedWriter";
            }
        });
    }

    @Test
    public void testBoundedOutputQueue() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.12
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                final byte[] bArr = new byte[8192];
                BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                try {
                    tcpReceiver = TcpTransportTest.makeHangingReceiver();
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    final StringListener stringListener = new StringListener();
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    final TcpSender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    sender.setMaxNumberOfQueuedMessages(100L);
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.12.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            sender.send(bArr);
                            return sender.sendingQueue.size() > 200;
                        }
                    }));
                    Thread.sleep(100L);
                    int size = sender.sendingQueue.size();
                    sender.socketTimeout.disrupt();
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.12.2
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            return basicStatsCollector2.getMessagesNotSentCount() > 100;
                        }
                    }));
                    TcpTransportTest.logger.info("there are " + size + " message baked up, and " + basicStatsCollector.getMessagesNotSentCount() + " have been discarded.");
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, Long.valueOf(stringListener.numMessages.get()), new TestUtils.Condition<Long>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.12.3
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(Long l) throws Throwable {
                            sender.send("Hello".getBytes());
                            return stringListener.numMessages.get() > l.longValue();
                        }
                    }));
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "testBoundedOutputQueue";
            }
        });
    }

    @Before
    public void resetGlobals() {
        disruptionCount = 5;
        onlyOnce = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TcpSenderFactory makeSenderFactory(OutputStreamFactory outputStreamFactory, StatsCollector statsCollector, long j) {
        return outputStreamFactory != null ? new AnonymousClass13(statsCollector, -1L, 10000L, j, outputStreamFactory) : new TcpSenderFactory(statsCollector, -1L, 10000L, j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TcpSenderFactory makeSenderFactory(boolean z, StatsCollector statsCollector, long j) {
        return z ? makeSenderFactory(new OutputStreamFactory() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.14
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.OutputStreamFactory
            public OutputStream makeOutputStream(OutputStream outputStream, Socket socket) throws IOException {
                return new DisruptibleOutputStream(outputStream, socket);
            }
        }, statsCollector, j) : new TcpSenderFactory(statsCollector, -1L, 10000L, j);
    }

    @Test
    public void transportMTWorkerMessage() throws Throwable {
        runAllCombinations(new Checker() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.15
            @Override // com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.Checker
            public void check(int i, boolean z, long j) throws Throwable {
                SenderFactory senderFactory = null;
                TcpReceiver tcpReceiver = null;
                try {
                    tcpReceiver = new TcpReceiver((DempsyExecutor) null, TcpTransportTest.access$000());
                    final Object obj = new Object();
                    BasicStatsCollector basicStatsCollector = new BasicStatsCollector();
                    tcpReceiver.setStatsCollector(basicStatsCollector);
                    StringListener stringListener = new StringListener(obj);
                    tcpReceiver.setListener(stringListener);
                    senderFactory = TcpTransportTest.this.makeSenderFactory(false, (StatsCollector) basicStatsCollector, j);
                    if (i > 0) {
                        tcpReceiver.setPort(i);
                    }
                    if (z) {
                        tcpReceiver.setUseLocalhost(z);
                    }
                    tcpReceiver.start();
                    final Sender sender = senderFactory.getSender(tcpReceiver.getDestination());
                    final int numThreads2 = tcpReceiver.executor.getNumThreads();
                    Assert.assertTrue(numThreads2 > 1);
                    DefaultDempsyExecutor defaultDempsyExecutor = tcpReceiver.executor;
                    for (int i2 = 0; i2 < defaultDempsyExecutor.getMaxNumberOfQueuedLimitedTasks(); i2++) {
                        sender.send("Hello".getBytes());
                    }
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, stringListener, new TestUtils.Condition<StringListener>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.15.1
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(StringListener stringListener2) {
                            return stringListener2.numIn.get() == ((long) numThreads2);
                        }
                    }));
                    for (int i3 = 0; i3 < numThreads2; i3++) {
                        sender.send("Hello".getBytes());
                    }
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, tcpReceiver.executor, new TestUtils.Condition<DefaultDempsyExecutor>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.15.2
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(DefaultDempsyExecutor defaultDempsyExecutor2) {
                            return defaultDempsyExecutor2.getNumberLimitedPending() == defaultDempsyExecutor2.getMaxNumberOfQueuedLimitedTasks();
                        }
                    }));
                    Assert.assertEquals(0L, basicStatsCollector.getDiscardedMessageCount());
                    Assert.assertTrue(TestUtils.poll(TcpTransportTest.baseTimeoutMillis, basicStatsCollector, new TestUtils.Condition<BasicStatsCollector>() { // from class: com.nokia.dempsy.messagetransport.tcp.TcpTransportTest.15.3
                        @Override // com.nokia.dempsy.TestUtils.Condition
                        public boolean conditionMet(BasicStatsCollector basicStatsCollector2) throws Throwable {
                            sender.send("Hello".getBytes());
                            sender.send("Hello".getBytes());
                            synchronized (obj) {
                                obj.notify();
                            }
                            return basicStatsCollector2.getDiscardedMessageCount() > 0;
                        }
                    }));
                    stringListener.latch = null;
                    synchronized (obj) {
                        obj.notifyAll();
                    }
                    tcpReceiver.stop();
                    synchronized (obj) {
                        obj.notifyAll();
                    }
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                } catch (Throwable th) {
                    if (senderFactory != null) {
                        senderFactory.stop();
                    }
                    if (tcpReceiver != null) {
                        tcpReceiver.stop();
                    }
                    throw th;
                }
            }

            public String toString() {
                return "transportMTWorkerMessage";
            }
        });
    }

    static /* synthetic */ boolean access$000() {
        return getFailFast();
    }
}
