package com.nokia.dempsy.container;

import com.nokia.dempsy.Dispatcher;
import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.annotations.Output;
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.container.mocks.MockInputMessage;
import com.nokia.dempsy.container.mocks.MockOutputMessage;
import com.nokia.dempsy.monitoring.coda.MetricGetters;
import com.nokia.dempsy.monitoring.coda.StatsCollectorCoda;
import com.nokia.dempsy.serialization.SerializationException;
import com.nokia.dempsy.serialization.java.JavaSerializer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nokia/dempsy/container/TestMpContainerLoadHandling.class */
public class TestMpContainerLoadHandling {
    private MpContainer container;
    private MetricGetters stats;
    private MockDispatcher dispatcher;
    private CountDownLatch startLatch;
    private CountDownLatch finishLatch;
    private CountDownLatch imIn;
    private CountDownLatch finishOutputLatch;
    private volatile boolean forceOutputException = false;
    private int sequence = 0;
    private static int NTHREADS = 2;
    private static Logger logger = LoggerFactory.getLogger(TestMpContainerLoadHandling.class);
    public static boolean failed = false;

    /* loaded from: input_file:com/nokia/dempsy/container/TestMpContainerLoadHandling$MockDispatcher.class */
    class MockDispatcher implements Dispatcher {
        public CountDownLatch latch = null;
        public List<Object> messages = Collections.synchronizedList(new ArrayList());

        MockDispatcher() {
        }

        public void dispatch(Object obj) {
            Assert.assertNotNull(obj);
            this.messages.add(obj);
            if (this.latch != null) {
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:com/nokia/dempsy/container/TestMpContainerLoadHandling$SendMessageThread.class */
    public static class SendMessageThread implements Runnable {
        MpContainer mpc;
        Object message;
        boolean block;
        static CountDownLatch latch = new CountDownLatch(0);

        public SendMessageThread(MpContainer mpContainer, Object obj, boolean z) {
            this.mpc = mpContainer;
            this.message = obj;
            this.block = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.mpc.onMessage(new JavaSerializer().serialize(this.message), !this.block);
                    latch.countDown();
                } catch (SerializationException e) {
                    TestMpContainerLoadHandling.failed = true;
                    System.out.println("FAILED!");
                    e.printStackTrace();
                    throw new RuntimeException((Throwable) e);
                }
            } catch (Throwable th) {
                latch.countDown();
                throw th;
            }
        }
    }

    @MessageProcessor
    /* loaded from: input_file:com/nokia/dempsy/container/TestMpContainerLoadHandling$TestMessageProcessor.class */
    public class TestMessageProcessor implements Cloneable {
        int messageCount = 0;
        String key;

        public TestMessageProcessor() {
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public TestMessageProcessor m20clone() throws CloneNotSupportedException {
            return (TestMessageProcessor) super.clone();
        }

        @MessageHandler
        public MockOutputMessage handleMessage(MockInputMessage mockInputMessage) throws InterruptedException {
            TestMpContainerLoadHandling.this.imIn.countDown();
            TestMpContainerLoadHandling.this.startLatch.await();
            TestMpContainerLoadHandling.logger.trace("handling key " + mockInputMessage.getKey() + " count is " + this.messageCount);
            this.key = mockInputMessage.getKey();
            this.messageCount++;
            mockInputMessage.setProcessed(true);
            MockOutputMessage mockOutputMessage = new MockOutputMessage(mockInputMessage.getKey());
            TestMpContainerLoadHandling.this.finishLatch.countDown();
            return mockOutputMessage;
        }

        @Output
        public MockOutputMessage doOutput() throws InterruptedException {
            TestMpContainerLoadHandling.logger.trace("handling output message for mp with key " + this.key);
            TestMpContainerLoadHandling.this.imIn.countDown();
            MockOutputMessage mockOutputMessage = new MockOutputMessage(this.key, "output");
            if (TestMpContainerLoadHandling.this.finishOutputLatch != null) {
                TestMpContainerLoadHandling.this.finishOutputLatch.countDown();
            }
            if (TestMpContainerLoadHandling.this.forceOutputException) {
                throw new RuntimeException("Forced Exception!");
            }
            return mockOutputMessage;
        }
    }

    private void checkStat(MetricGetters metricGetters) {
        Assert.assertEquals(metricGetters.getDispatchedMessageCount(), metricGetters.getMessageFailedCount() + metricGetters.getProcessedMessageCount() + metricGetters.getDiscardedMessageCount() + metricGetters.getInFlightMessageCount());
    }

    @Before
    public void setUp() throws Exception {
        StringBuilder append = new StringBuilder().append("test");
        int i = this.sequence;
        this.sequence = i + 1;
        ClusterId clusterId = new ClusterId("TestMpContainerLoadHandling", append.append(i).toString());
        this.dispatcher = new MockDispatcher();
        StatsCollectorCoda statsCollectorCoda = new StatsCollectorCoda(clusterId);
        this.stats = statsCollectorCoda;
        JavaSerializer javaSerializer = new JavaSerializer();
        this.container = new MpContainer(clusterId);
        this.container.setDispatcher(this.dispatcher);
        this.container.setStatCollector(statsCollectorCoda);
        this.container.setSerializer(javaSerializer);
        this.container.setPrototype(new TestMessageProcessor());
        this.forceOutputException = false;
    }

    @After
    public void tearDown() throws Exception {
        this.stats.stop();
    }

    private void sendMessage(MpContainer mpContainer, Object obj, boolean z) throws Exception {
        new Thread(new SendMessageThread(mpContainer, obj, z)).start();
    }

    @Test
    public void testSimpleProcessingWithoutQueuing() throws Exception {
        for (int i = 0; i < 3; i++) {
            this.startLatch = new CountDownLatch(1);
            this.finishLatch = new CountDownLatch(NTHREADS);
            this.imIn = new CountDownLatch(NTHREADS);
            this.dispatcher.latch = new CountDownLatch(NTHREADS);
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < NTHREADS; i2++) {
                arrayList.add(new MockInputMessage("key" + i2));
            }
            SendMessageThread.latch = new CountDownLatch(arrayList.size());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                sendMessage(this.container, (MockInputMessage) it.next(), false);
            }
            logger.trace("releasing workers");
            this.startLatch.countDown();
            logger.trace("should be running");
            Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
            Assert.assertTrue("Timeout waiting for MPs", this.finishLatch.await(2L, TimeUnit.SECONDS));
            while (this.stats.getInFlightMessageCount() > 0) {
                Thread.yield();
            }
            Assert.assertTrue("Timeout waiting for MP sends", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
            Assert.assertEquals(NTHREADS * (i + 1), this.dispatcher.messages.size());
            Assert.assertEquals(NTHREADS * (i + 1), this.stats.getProcessedMessageCount());
        }
        checkStat(this.stats);
    }

    @Test
    public void testMessagesCanQueueWithinLimits() throws Exception {
        for (int i = 0; i < 3; i++) {
            logger.trace("starting loop " + i);
            this.startLatch = new CountDownLatch(1);
            this.finishLatch = new CountDownLatch(3 * NTHREADS);
            this.imIn = new CountDownLatch(3 * NTHREADS);
            this.dispatcher.latch = new CountDownLatch(3 * NTHREADS);
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 3 * NTHREADS; i2++) {
                arrayList.add(new MockInputMessage("key" + i2));
            }
            SendMessageThread.latch = new CountDownLatch(arrayList.size());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                sendMessage(this.container, (MockInputMessage) it.next(), false);
                Thread.sleep(50L);
                Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
                Thread.yield();
            }
            this.startLatch.countDown();
            Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
            Assert.assertTrue("Timeout waiting for MPs", this.finishLatch.await(2L, TimeUnit.SECONDS));
            while (this.stats.getInFlightMessageCount() > 0) {
                Thread.yield();
            }
            Assert.assertTrue("Timeout waiting for MP sends", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
            Assert.assertEquals(3 * NTHREADS * (i + 1), this.dispatcher.messages.size());
            Assert.assertEquals(3 * NTHREADS * (i + 1), this.stats.getProcessedMessageCount());
            checkStat(this.stats);
        }
    }

    @Test
    public void testExcessLoadIsDiscarded() throws Exception {
        this.startLatch = new CountDownLatch(1);
        this.finishLatch = new CountDownLatch(3 * NTHREADS);
        this.imIn = new CountDownLatch(3 * NTHREADS);
        this.dispatcher.latch = new CountDownLatch(3 * NTHREADS);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3 * NTHREADS; i++) {
            arrayList.add(new MockInputMessage("key" + i));
        }
        SendMessageThread.latch = new CountDownLatch(arrayList.size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sendMessage(this.container, (MockInputMessage) it.next(), false);
            Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        }
        Assert.assertTrue(this.imIn.await(2L, TimeUnit.SECONDS));
        this.container.dispatch(new MockInputMessage("key0"), false);
        Assert.assertEquals(1L, this.stats.getDiscardedMessageCount());
        this.container.dispatch(new MockInputMessage("key1"), false);
        Assert.assertEquals(2L, this.stats.getDiscardedMessageCount());
        checkStat(this.stats);
        this.startLatch.countDown();
        checkStat(this.stats);
        Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout waiting for MPs", this.finishLatch.await(2L, TimeUnit.SECONDS));
        while (this.stats.getInFlightMessageCount() > 0) {
            Thread.yield();
        }
        Assert.assertTrue("Timeout waiting for MP sends", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertEquals(3 * NTHREADS, this.dispatcher.messages.size());
        Assert.assertEquals(3 * NTHREADS, this.stats.getProcessedMessageCount());
        this.dispatcher.latch = new CountDownLatch(3 * NTHREADS);
        this.finishLatch = new CountDownLatch(3 * NTHREADS);
        SendMessageThread.latch = new CountDownLatch(arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            sendMessage(this.container, (MockInputMessage) it2.next(), false);
            Thread.sleep(50L);
            Assert.assertEquals(2L, this.stats.getDiscardedMessageCount());
        }
        Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout waiting for MPs", this.finishLatch.await(2L, TimeUnit.SECONDS));
        while (this.stats.getInFlightMessageCount() > 0) {
            Thread.yield();
        }
        Assert.assertTrue("Timeout waiting for MPs", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertEquals(6 * NTHREADS, this.dispatcher.messages.size());
        Assert.assertEquals(6 * NTHREADS, this.stats.getProcessedMessageCount());
        checkStat(this.stats);
    }

    @Test
    public void testOutputOperationsNotDiscarded() throws Exception {
        this.startLatch = new CountDownLatch(0);
        this.finishLatch = new CountDownLatch(NTHREADS * 4);
        this.imIn = new CountDownLatch(4 * NTHREADS);
        this.dispatcher.latch = new CountDownLatch(4 * NTHREADS);
        SendMessageThread.latch = new CountDownLatch(4 * NTHREADS);
        for (int i = 0; i < NTHREADS * 4; i++) {
            sendMessage(this.container, new MockInputMessage("key" + i), false);
        }
        Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout on initial messages", this.finishLatch.await(4L, TimeUnit.SECONDS));
        Thread.yield();
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        while (this.stats.getInFlightMessageCount() > 0 && currentTimeMillis > System.currentTimeMillis()) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(0L, this.stats.getInFlightMessageCount());
        Assert.assertEquals(4 * NTHREADS, this.stats.getProcessedMessageCount());
        this.startLatch = new CountDownLatch(1);
        this.finishLatch = new CountDownLatch(2 * NTHREADS);
        this.imIn = new CountDownLatch(2 * NTHREADS);
        for (int i2 = 0; i2 < 2 * NTHREADS; i2++) {
            sendMessage(this.container, new MockInputMessage("key" + i2), false);
            Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        }
        Assert.assertTrue("Timeout on initial messages", this.imIn.await(4L, TimeUnit.SECONDS));
        checkStat(this.stats);
        Assert.assertEquals(2 * NTHREADS, this.stats.getInFlightMessageCount());
        this.finishOutputLatch = new CountDownLatch(2 * NTHREADS);
        long processedMessageCount = this.stats.getProcessedMessageCount();
        this.imIn = new CountDownLatch(2 * NTHREADS);
        new Thread(new Runnable() { // from class: com.nokia.dempsy.container.TestMpContainerLoadHandling.1
            @Override // java.lang.Runnable
            public void run() {
                TestMpContainerLoadHandling.this.container.outputPass();
            }
        }).start();
        Assert.assertTrue("Timeout on initial messages", this.imIn.await(4L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout on initial messages", this.finishOutputLatch.await(4 * NTHREADS, TimeUnit.SECONDS));
        long currentTimeMillis2 = System.currentTimeMillis() + (NTHREADS * 4000);
        while (currentTimeMillis2 > System.currentTimeMillis() && (2 * NTHREADS) + processedMessageCount != this.stats.getProcessedMessageCount()) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((2 * NTHREADS) + processedMessageCount, this.stats.getProcessedMessageCount());
        Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        checkStat(this.stats);
        this.imIn = new CountDownLatch(2 * NTHREADS);
        this.startLatch.countDown();
        Assert.assertTrue("Timeout waiting for MPs", this.finishLatch.await(6L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout waiting for MP outputs to finish", this.imIn.await(6L, TimeUnit.SECONDS));
        while (this.stats.getInFlightMessageCount() > 0) {
            Thread.yield();
        }
        Assert.assertTrue("Timeout waiting for MP sends", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
        int i3 = 0;
        Iterator<Object> it = this.dispatcher.messages.iterator();
        while (it.hasNext()) {
            MockOutputMessage mockOutputMessage = (MockOutputMessage) it.next();
            if (mockOutputMessage == null) {
                Assert.fail("wtf!?");
            }
            if ("output".equals(mockOutputMessage.getType())) {
                i3++;
            }
        }
        Assert.assertEquals(4 * NTHREADS, i3);
        checkStat(this.stats);
    }

    @Test
    public void testOutputOperationsAloneDoNotCauseDiscard() throws Exception {
        this.startLatch = new CountDownLatch(0);
        this.finishLatch = new CountDownLatch(NTHREADS * 4);
        this.imIn = new CountDownLatch(0);
        this.dispatcher.latch = new CountDownLatch(10 * NTHREADS);
        SendMessageThread.latch = new CountDownLatch(NTHREADS * 4);
        for (int i = 0; i < NTHREADS * 4; i++) {
            sendMessage(this.container, new MockInputMessage("key" + i), false);
        }
        Assert.assertTrue("Timeout on initial messages", this.finishLatch.await(4L, TimeUnit.SECONDS));
        Thread.yield();
        Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
        checkStat(this.stats);
        Assert.assertEquals(NTHREADS * 4, this.stats.getProcessedMessageCount());
        this.startLatch = new CountDownLatch(1);
        this.finishLatch = new CountDownLatch(6 * NTHREADS);
        this.forceOutputException = true;
        this.container.outputPass();
        this.forceOutputException = false;
        Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        Assert.assertEquals(NTHREADS * 4, this.stats.getMessageFailedCount());
        this.finishOutputLatch = new CountDownLatch(NTHREADS * 4);
        this.container.outputPass();
        Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        Assert.assertEquals(NTHREADS * 4, this.stats.getMessageFailedCount());
        Assert.assertEquals(2 * NTHREADS * 4, this.stats.getProcessedMessageCount());
        checkStat(this.stats);
        this.imIn = new CountDownLatch(2 * NTHREADS);
        this.finishLatch = new CountDownLatch(2 * NTHREADS);
        SendMessageThread.latch = new CountDownLatch(NTHREADS * 2);
        for (int i2 = 0; i2 < 2 * NTHREADS; i2++) {
            sendMessage(this.container, new MockInputMessage("key" + i2), false);
            Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        }
        Assert.assertTrue("Timeout on initial messages", this.imIn.await(4L, TimeUnit.SECONDS));
        Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        checkStat(this.stats);
        this.startLatch.countDown();
        Assert.assertTrue("Timeout waiting on MPs", this.finishLatch.await(6L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout waiting on message to be sent", SendMessageThread.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertTrue("Timeout waiting for MP sends", this.dispatcher.latch.await(2L, TimeUnit.SECONDS));
        Assert.assertEquals(0L, this.stats.getDiscardedMessageCount());
        Assert.assertEquals(10 * NTHREADS, this.dispatcher.messages.size());
    }
}
