[disruptor] 01/02: New upstream version 3.3.6

Tony Mancill tmancill at moszumanska.debian.org
Mon Oct 10 02:54:39 UTC 2016


This is an automated email from the git hooks/post-receive script.

tmancill pushed a commit to branch master
in repository disruptor.

commit d3e725b50ab91f48dbe8cc88b03fd551e9e1ffb0
Author: tony mancill <tmancill at debian.org>
Date:   Sun Oct 9 19:44:01 2016 -0700

    New upstream version 3.3.6
---
 README.md                                          |  12 ++
 build.gradle                                       |   2 +-
 .../lmax/disruptor/SingleProducerSequencer.java    |   4 +
 .../java/com/lmax/disruptor/dsl/Disruptor.java     |  53 +++++-
 .../com/lmax/disruptor/dsl/EventHandlerGroup.java  |   5 +-
 .../OneToOneTranslatorThroughputTest.java          |  13 +-
 .../com/lmax/disruptor/DisruptorStressTest.java    |  11 +-
 .../disruptor/ShutdownOnFatalExceptionTest.java    |   4 +-
 ...ruptorStressTest.java => WorkerStressTest.java} |  46 ++---
 .../java/com/lmax/disruptor/dsl/DisruptorTest.java | 211 ++++++++++++++++++---
 .../{StubExecutor.java => StubThreadFactory.java}  |  27 ++-
 .../example/MultiProducerWithTranslator.java       |  11 +-
 .../lmax/disruptor/example/WaitForShutdown.java    |  65 +++++++
 13 files changed, 372 insertions(+), 92 deletions(-)

diff --git a/README.md b/README.md
index 330435b..ce0118e 100644
--- a/README.md
+++ b/README.md
@@ -13,6 +13,18 @@ A High Performance Inter-Thread Messaging Library
 
 ## Changelog
 
+### 3.3.6
+
+- Support adding gating sequences before calling Disruptor.start()
+- Fix minor concurrency race when dynamically adding sequences
+- Fix wrapping problem when adding work handlers to the Disruptor
+
+### 3.3.5
+
+- Fix NPE in TimeoutBlockingWaitStrategy when used with WorkProcessor
+- Add LiteTimeoutBlockingWaitStrategy
+- Resignal any waiting threads when trying to publish to a full ring buffer
+
 ### 3.3.4
 
 - Small build fixes and refactorings
diff --git a/build.gradle b/build.gradle
index 0a12475..840b791 100644
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ apply plugin: 'idea'
 defaultTasks 'build'
 
 group = 'com.lmax'
-version = new Version(major: 3, minor: 3, revision: 5)
+version = new Version(major: 3, minor: 3, revision: 6)
 
 ext {
     fullName = 'Disruptor Framework'
diff --git a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
index 50d2bea..35b1b15 100644
--- a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
+++ b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
@@ -79,6 +79,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields
 
         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
         {
+            cursor.setVolatile(nextValue);  // StoreLoad fence
+
             long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
             this.cachedValue = minSequence;
 
@@ -119,6 +121,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields
 
         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
         {
+            cursor.setVolatile(nextValue);  // StoreLoad fence
+
             long minSequence;
             while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
             {
diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
index eaaca72..c9d8f8f 100644
--- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
+++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java
@@ -134,9 +134,9 @@ public class Disruptor<T>
             final ProducerType producerType,
             final WaitStrategy waitStrategy)
     {
-        this(RingBuffer.create(
-                               producerType, eventFactory, ringBufferSize, waitStrategy),
-                new BasicExecutor(threadFactory));
+        this(
+            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
+            new BasicExecutor(threadFactory));
     }
 
     /**
@@ -204,6 +204,15 @@ public class Disruptor<T>
         {
             consumerRepository.add(processor);
         }
+
+        Sequence[] sequences = new Sequence[processors.length];
+        for (int i = 0; i < processors.length; i++)
+        {
+            sequences[i] = processors[i].getSequence();
+        }
+
+        ringBuffer.addGatingSequences(sequences);
+
         return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
     }
 
@@ -349,9 +358,6 @@ public class Disruptor<T>
      */
     public RingBuffer<T> start()
     {
-        final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
-        ringBuffer.addGatingSequences(gatingSequences);
-
         checkOnlyStartedOnce();
         for (final ConsumerInfo consumerInfo : consumerRepository)
         {
@@ -473,6 +479,17 @@ public class Disruptor<T>
     }
 
     /**
+     * Gets the sequence value for the specified event handlers.
+     *
+     * @param b1
+     * @return
+     */
+    public long getSequenceValueFor(EventHandler<T> b1)
+    {
+        return consumerRepository.getSequenceFor(b1).get();
+    }
+
+    /**
      * Confirms if all messages have been consumed by all event processors
      */
     private boolean hasBacklog()
@@ -513,12 +530,22 @@ public class Disruptor<T>
             processorSequences[i] = batchEventProcessor.getSequence();
         }
 
+        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
+
+        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
+    }
+
+    private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
+    {
         if (processorSequences.length > 0)
         {
+            ringBuffer.addGatingSequences(processorSequences);
+            for (final Sequence barrierSequence : barrierSequences)
+            {
+                ringBuffer.removeGatingSequence(barrierSequence);
+            }
             consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
         }
-
-        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
     }
 
     EventHandlerGroup<T> createEventProcessors(
@@ -529,6 +556,7 @@ public class Disruptor<T>
         {
             eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences);
         }
+
         return handleEventsWith(eventProcessors);
     }
 
@@ -537,8 +565,15 @@ public class Disruptor<T>
     {
         final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
         final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
+
+
         consumerRepository.add(workerPool, sequenceBarrier);
-        return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
+
+        Sequence[] workerSequences = workerPool.getWorkerSequences();
+
+        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
+
+        return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
     }
 
     private void checkNotStarted()
diff --git a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
index 0acfc92..c8fe91d 100644
--- a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
+++ b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
@@ -54,8 +54,9 @@ public class EventHandlerGroup<T>
     {
         final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length];
         System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length);
-        System
-            .arraycopy(otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
+        System.arraycopy(
+            otherHandlerGroup.sequences, 0,
+            combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
         return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
     }
 
diff --git a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
index 94863c8..3c483a8 100644
--- a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
+++ b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java
@@ -15,12 +15,6 @@
  */
 package com.lmax.disruptor.translator;
 
-import static com.lmax.disruptor.support.PerfTestUtil.failIfNot;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import com.lmax.disruptor.AbstractPerfTestDisruptor;
 import com.lmax.disruptor.EventTranslatorOneArg;
 import com.lmax.disruptor.RingBuffer;
@@ -33,6 +27,10 @@ import com.lmax.disruptor.support.ValueEvent;
 import com.lmax.disruptor.util.DaemonThreadFactory;
 import com.lmax.disruptor.util.MutableLong;
 
+import java.util.concurrent.CountDownLatch;
+
+import static com.lmax.disruptor.support.PerfTestUtil.failIfNot;
+
 /**
  * <pre>
  * UniCast a series of items between 1 publisher and 1 event processor using the EventTranslator API
@@ -66,7 +64,6 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr
 {
     private static final int BUFFER_SIZE = 1024 * 64;
     private static final long ITERATIONS = 1000L * 1000L * 100L;
-    private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE);
     private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
     private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
     private final RingBuffer<ValueEvent> ringBuffer;
@@ -80,7 +77,7 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr
         Disruptor<ValueEvent> disruptor =
             new Disruptor<ValueEvent>(
                 ValueEvent.EVENT_FACTORY,
-                BUFFER_SIZE, executor,
+                BUFFER_SIZE, DaemonThreadFactory.INSTANCE,
                 ProducerType.SINGLE,
                 new YieldingWaitStrategy());
         disruptor.handleEventsWith(handler);
diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
index 0d885fb..a066b3c 100644
--- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
+++ b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
@@ -2,6 +2,7 @@ package com.lmax.disruptor;
 
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
 import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
@@ -23,14 +24,14 @@ public class DisruptorStressTest
     public void shouldHandleLotsOfThreads() throws Exception
     {
         Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(
-            TestEvent.FACTORY, 1 << 16, executor,
-            ProducerType.MULTI, new BusySpinWaitStrategy());
+                TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE,
+                ProducerType.MULTI, new BusySpinWaitStrategy());
         RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
         disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
 
         int threads = max(1, Runtime.getRuntime().availableProcessors() / 2);
 
-        int iterations = 20000000;
+        int iterations = 200000;
         int publisherCount = threads;
         int handlerCount = threads;
 
@@ -177,9 +178,9 @@ public class DisruptorStressTest
         public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>()
         {
             @Override
-            public TestEvent newInstance()
+            public DisruptorStressTest.TestEvent newInstance()
             {
-                return new TestEvent();
+                return new DisruptorStressTest.TestEvent();
             }
         };
     }
diff --git a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
index 327c9e2..7a26e6e 100644
--- a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
+++ b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java
@@ -2,12 +2,12 @@ package com.lmax.disruptor;
 
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Random;
-import java.util.concurrent.Executors;
 
 public class ShutdownOnFatalExceptionTest
 {
@@ -23,7 +23,7 @@ public class ShutdownOnFatalExceptionTest
     public void setUp()
     {
         disruptor = new Disruptor<byte[]>(
-            new ByteArrayFactory(256), 1024, Executors.newCachedThreadPool(), ProducerType.SINGLE,
+            new ByteArrayFactory(256), 1024, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE,
             new BlockingWaitStrategy());
         disruptor.handleEventsWith(eventHandler);
         disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/WorkerStressTest.java
similarity index 76%
copy from src/test/java/com/lmax/disruptor/DisruptorStressTest.java
copy to src/test/java/com/lmax/disruptor/WorkerStressTest.java
index 0d885fb..ca5e4c3 100644
--- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java
+++ b/src/test/java/com/lmax/disruptor/WorkerStressTest.java
@@ -2,6 +2,7 @@ package com.lmax.disruptor;
 
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
 import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
@@ -15,7 +16,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 
-public class DisruptorStressTest
+public class WorkerStressTest
 {
     private final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -23,23 +24,25 @@ public class DisruptorStressTest
     public void shouldHandleLotsOfThreads() throws Exception
     {
         Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(
-            TestEvent.FACTORY, 1 << 16, executor,
-            ProducerType.MULTI, new BusySpinWaitStrategy());
+            TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE,
+            ProducerType.MULTI, new SleepingWaitStrategy());
         RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
         disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
 
         int threads = max(1, Runtime.getRuntime().availableProcessors() / 2);
 
-        int iterations = 20000000;
+        int iterations = 200000;
         int publisherCount = threads;
         int handlerCount = threads;
 
         CyclicBarrier barrier = new CyclicBarrier(publisherCount);
         CountDownLatch latch = new CountDownLatch(publisherCount);
 
-        TestEventHandler[] handlers = initialise(disruptor, new TestEventHandler[handlerCount]);
+        TestWorkHandler[] handlers = initialise(new TestWorkHandler[handlerCount]);
         Publisher[] publishers = initialise(new Publisher[publisherCount], ringBuffer, iterations, barrier, latch);
 
+        disruptor.handleEventsWithWorkerPool(handlers);
+
         disruptor.start();
 
         for (Publisher publisher : publishers)
@@ -60,10 +63,9 @@ public class DisruptorStressTest
             assertThat(publisher.failed, is(false));
         }
 
-        for (TestEventHandler handler : handlers)
+        for (TestWorkHandler handler : handlers)
         {
-            assertThat(handler.messagesSeen, is(not(0)));
-            assertThat(handler.failureCount, is(0));
+            assertThat(handler.seen, is(not(0)));
         }
     }
 
@@ -80,39 +82,25 @@ public class DisruptorStressTest
     }
 
     @SuppressWarnings("unchecked")
-    private TestEventHandler[] initialise(Disruptor<TestEvent> disruptor, TestEventHandler[] testEventHandlers)
+    private TestWorkHandler[] initialise(TestWorkHandler[] testEventHandlers)
     {
         for (int i = 0; i < testEventHandlers.length; i++)
         {
-            TestEventHandler handler = new TestEventHandler();
-            disruptor.handleEventsWith(handler);
+            TestWorkHandler handler = new TestWorkHandler();
             testEventHandlers[i] = handler;
         }
 
         return testEventHandlers;
     }
 
-    private static class TestEventHandler implements EventHandler<TestEvent>
+    private static class TestWorkHandler implements WorkHandler<TestEvent>
     {
-        public int failureCount = 0;
-        public int messagesSeen = 0;
-
-        public TestEventHandler()
-        {
-        }
+        private int seen;
 
         @Override
-        public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception
+        public void onEvent(TestEvent event) throws Exception
         {
-            if (event.sequence != sequence ||
-                event.a != sequence + 13 ||
-                event.b != sequence - 7 ||
-                !("wibble-" + sequence).equals(event.s))
-            {
-                failureCount++;
-            }
-
-            messagesSeen++;
+            seen++;
         }
     }
 
@@ -174,7 +162,7 @@ public class DisruptorStressTest
         public long b;
         public String s;
 
-        public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>()
+        public static final EventFactory<TestEvent> FACTORY = new EventFactory<WorkerStressTest.TestEvent>()
         {
             @Override
             public TestEvent newInstance()
diff --git a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
index ea008d8..42511fa 100644
--- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
+++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java
@@ -32,8 +32,8 @@ import com.lmax.disruptor.dsl.stubs.EvilEqualsEventHandler;
 import com.lmax.disruptor.dsl.stubs.ExceptionThrowingEventHandler;
 import com.lmax.disruptor.dsl.stubs.SleepingEventHandler;
 import com.lmax.disruptor.dsl.stubs.StubExceptionHandler;
-import com.lmax.disruptor.dsl.stubs.StubExecutor;
 import com.lmax.disruptor.dsl.stubs.StubPublisher;
+import com.lmax.disruptor.dsl.stubs.StubThreadFactory;
 import com.lmax.disruptor.dsl.stubs.TestWorkHandler;
 import com.lmax.disruptor.support.TestEvent;
 import org.junit.After;
@@ -44,7 +44,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.lang.Thread.yield;
@@ -56,15 +57,17 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings(value = {"unchecked"})
 public class DisruptorTest
 {
     private static final int TIMEOUT_IN_SECONDS = 2;
-    private Disruptor<TestEvent> disruptor;
-    private StubExecutor executor;
+
     private final Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<DelayedEventHandler>();
     private final Collection<TestWorkHandler> testWorkHandlers = new ArrayList<TestWorkHandler>();
+    private Disruptor<TestEvent> disruptor;
+    private StubThreadFactory executor;
     private RingBuffer<TestEvent> ringBuffer;
     private TestEvent lastPublishedEvent;
 
@@ -91,6 +94,119 @@ public class DisruptorTest
     }
 
     @Test
+    public void shouldProcessMessagesPublishedBeforeStartIsCalled() throws Exception
+    {
+        final CountDownLatch eventCounter = new CountDownLatch(0);
+        disruptor.handleEventsWith(new EventHandler<TestEvent>()
+        {
+            @Override
+            public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
+            {
+                eventCounter.countDown();
+            }
+        });
+
+        disruptor.publishEvent(
+         new EventTranslator<TestEvent>()
+         {
+             @Override
+             public void translateTo(final TestEvent event, final long sequence)
+             {
+                 lastPublishedEvent = event;
+             }
+         });
+
+        disruptor.start();
+
+        disruptor.publishEvent(
+         new EventTranslator<TestEvent>()
+         {
+             @Override
+             public void translateTo(final TestEvent event, final long sequence)
+             {
+                 lastPublishedEvent = event;
+             }
+         });
+
+        if (!eventCounter.await(5, TimeUnit.SECONDS))
+        {
+            fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
+        }
+    }
+
+    @Test
+    public void shouldAddEventProcessorsAfterPublishing() throws Exception
+    {
+        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+        BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>(
+            rb, rb.newBarrier(), new SleepingEventHandler());
+        BatchEventProcessor<TestEvent> b2 = new BatchEventProcessor<TestEvent>(
+            rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler());
+        BatchEventProcessor<TestEvent> b3 = new BatchEventProcessor<TestEvent>(
+            rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler());
+
+        assertThat(b1.getSequence().get(), is(-1L));
+        assertThat(b2.getSequence().get(), is(-1L));
+        assertThat(b3.getSequence().get(), is(-1L));
+
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+
+        disruptor.handleEventsWith(b1, b2, b3);
+
+        assertThat(b1.getSequence().get(), is(5L));
+        assertThat(b2.getSequence().get(), is(5L));
+        assertThat(b3.getSequence().get(), is(5L));
+    }
+
+    @Test
+    public void shouldSetSequenceForHandlerIfAddedAfterPublish() throws Exception
+    {
+        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+        EventHandler<TestEvent> b1 = new SleepingEventHandler();
+        EventHandler<TestEvent> b2 = new SleepingEventHandler();
+        EventHandler<TestEvent> b3 = new SleepingEventHandler();
+
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+
+        disruptor.handleEventsWith(b1, b2, b3);
+
+        assertThat(disruptor.getSequenceValueFor(b1), is(5L));
+        assertThat(disruptor.getSequenceValueFor(b2), is(5L));
+        assertThat(disruptor.getSequenceValueFor(b3), is(5L));
+    }
+
+    @Test
+    public void shouldSetSequenceForWorkProcessorIfAddedAfterPublish() throws Exception
+    {
+        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+        TestWorkHandler wh1 = createTestWorkHandler();
+        TestWorkHandler wh2 = createTestWorkHandler();
+        TestWorkHandler wh3 = createTestWorkHandler();
+
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+        rb.publish(rb.next());
+
+        disruptor.handleEventsWithWorkerPool(wh1, wh2, wh3);
+
+        assertThat(disruptor.getRingBuffer().getMinimumGatingSequence(), is(5L));
+    }
+
+
+    @Test
     public void shouldCreateEventProcessorGroupForFirstEventProcessors()
         throws Exception
     {
@@ -132,6 +248,29 @@ public class DisruptorTest
     }
 
     @Test
+    public void should()
+        throws Exception
+    {
+        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
+        BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>(
+            rb, rb.newBarrier(), new SleepingEventHandler());
+        EventProcessorFactory<TestEvent> b2 = new EventProcessorFactory<TestEvent>()
+        {
+            @Override
+            public EventProcessor createEventProcessor(
+                RingBuffer<TestEvent> ringBuffer, Sequence[] barrierSequences)
+            {
+                return new BatchEventProcessor<TestEvent>(
+                    ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler());
+            }
+        };
+
+        disruptor.handleEventsWith(b1).then(b2);
+
+        disruptor.start();
+    }
+
+    @Test
     public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor()
         throws Exception
     {
@@ -270,7 +409,7 @@ public class DisruptorTest
         final StubPublisher stubPublisher = new StubPublisher(ringBuffer);
         try
         {
-            executor.execute(stubPublisher);
+            executor.newThread(stubPublisher).start();
 
             assertProducerReaches(stubPublisher, 4, true);
 
@@ -338,8 +477,8 @@ public class DisruptorTest
 
         final BatchEventProcessor<TestEvent> processor =
             new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);
-        disruptor.handleEventsWith(processor);
-        disruptor.after(processor).handleEventsWith(handlerWithBarrier);
+
+        disruptor.handleEventsWith(processor).then(handlerWithBarrier);
 
         ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
     }
@@ -399,6 +538,28 @@ public class DisruptorTest
         workHandler2.processEvent();
     }
 
+
+    @Test
+    public void shouldProvideEventsMultipleWorkHandlers() throws Exception
+    {
+        final TestWorkHandler workHandler1 = createTestWorkHandler();
+        final TestWorkHandler workHandler2 = createTestWorkHandler();
+        final TestWorkHandler workHandler3 = createTestWorkHandler();
+        final TestWorkHandler workHandler4 = createTestWorkHandler();
+        final TestWorkHandler workHandler5 = createTestWorkHandler();
+        final TestWorkHandler workHandler6 = createTestWorkHandler();
+        final TestWorkHandler workHandler7 = createTestWorkHandler();
+        final TestWorkHandler workHandler8 = createTestWorkHandler();
+
+        disruptor
+            .handleEventsWithWorkerPool(workHandler1, workHandler2)
+            .thenHandleEventsWithWorkerPool(workHandler3, workHandler4);
+        disruptor
+            .handleEventsWithWorkerPool(workHandler5, workHandler6)
+            .thenHandleEventsWithWorkerPool(workHandler7, workHandler8);
+    }
+
+
     @Test
     public void shouldSupportUsingWorkerPoolAsDependency() throws Exception
     {
@@ -461,8 +622,10 @@ public class DisruptorTest
         final TestWorkHandler workHandler1 = createTestWorkHandler();
         final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
         final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
-        disruptor.handleEventsWith(delayedEventHandler1).and(disruptor.handleEventsWithWorkerPool(workHandler1)).then(
-            delayedEventHandler2);
+        disruptor
+            .handleEventsWith(delayedEventHandler1)
+            .and(disruptor.handleEventsWithWorkerPool(workHandler1))
+            .then(delayedEventHandler2);
 
         publishEvent();
         publishEvent();
@@ -549,18 +712,18 @@ public class DisruptorTest
         final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
 
         disruptor.handleEventsWith(
-                                   new EventProcessorFactory<TestEvent>()
-                                   {
-                                       @Override
-                                       public EventProcessor createEventProcessor(
-                                                                                  final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
-                                       {
-                                           assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
-                                           return new BatchEventProcessor<TestEvent>(
-                                                                                     disruptor.getRingBuffer(), ringBuffer.newBarrier(
-                                                                                                                                      barrierSequences), eventHandler);
-                                       }
-                                   });
+            new EventProcessorFactory<TestEvent>()
+            {
+                @Override
+                public EventProcessor createEventProcessor(
+                    final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
+                {
+                    assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
+                    return new BatchEventProcessor<TestEvent>(
+                        disruptor.getRingBuffer(), ringBuffer.newBarrier(
+                        barrierSequences), eventHandler);
+                }
+            });
 
         ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
     }
@@ -641,14 +804,14 @@ public class DisruptorTest
 
     private void createDisruptor()
     {
-        executor = new StubExecutor();
+        executor = new StubThreadFactory();
         createDisruptor(executor);
     }
 
-    private void createDisruptor(final Executor executor)
+    private void createDisruptor(final ThreadFactory threadFactory)
     {
         disruptor = new Disruptor<TestEvent>(
-            TestEvent.EVENT_FACTORY, 4, executor,
+            TestEvent.EVENT_FACTORY, 4, threadFactory,
             ProducerType.SINGLE, new BlockingWaitStrategy());
     }
 
diff --git a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
similarity index 77%
rename from src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java
rename to src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
index 1167340..0e160e2 100644
--- a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java
+++ b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java
@@ -20,27 +20,30 @@ import org.junit.Assert;
 
 import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class StubExecutor implements Executor
+public final class StubThreadFactory implements ThreadFactory
 {
     private final DaemonThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
     private final Collection<Thread> threads = new CopyOnWriteArrayList<Thread>();
     private final AtomicBoolean ignoreExecutions = new AtomicBoolean(false);
     private final AtomicInteger executionCount = new AtomicInteger(0);
 
-    public void execute(final Runnable command)
+    @Override
+    public Thread newThread(final Runnable command)
     {
         executionCount.getAndIncrement();
-        if (!ignoreExecutions.get())
+        Runnable toExecute = command;
+        if(ignoreExecutions.get())
         {
-            Thread t = threadFactory.newThread(command);
-            t.setName(command.toString());
-            threads.add(t);
-            t.start();
+            toExecute = new NoOpRunnable();
         }
+        final Thread thread = threadFactory.newThread(toExecute);
+        thread.setName(command.toString());
+        threads.add(thread);
+        return thread;
     }
 
     public void joinAllThreads()
@@ -75,4 +78,12 @@ public class StubExecutor implements Executor
     {
         return executionCount.get();
     }
+
+    private static final class NoOpRunnable implements Runnable
+    {
+        @Override
+        public void run()
+        {
+        }
+    }
 }
diff --git a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
index 71727aa..7102523 100644
--- a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
+++ b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java
@@ -1,10 +1,13 @@
 package com.lmax.disruptor.example;
 
-import com.lmax.disruptor.*;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslatorThreeArg;
+import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-
-import java.util.concurrent.Executors;
+import com.lmax.disruptor.util.DaemonThreadFactory;
 
 public class MultiProducerWithTranslator
 {
@@ -72,7 +75,7 @@ public class MultiProducerWithTranslator
     public static void main(String[] args) throws InterruptedException
     {
         Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>(
-            ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI,
+            ObjectBox.FACTORY, RING_SIZE, DaemonThreadFactory.INSTANCE, ProducerType.MULTI,
             new BlockingWaitStrategy());
         disruptor.handleEventsWith(new Consumer()).then(new Consumer());
         final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer();
diff --git a/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java
new file mode 100644
index 0000000..ab2ffe0
--- /dev/null
+++ b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java
@@ -0,0 +1,65 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.support.LongEvent;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class WaitForShutdown
+{
+    private static volatile int value = 0;
+
+    private static class Handler implements EventHandler<LongEvent>, LifecycleAware
+    {
+        private final CountDownLatch latch;
+
+        public Handler(CountDownLatch latch)
+        {
+            this.latch = latch;
+        }
+
+        @Override
+        public void onStart()
+        {
+        }
+
+        @Override
+        public void onShutdown()
+        {
+            latch.countDown();
+        }
+
+        @Override
+        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception
+        {
+            value = 1;
+        }
+    }
+
+    public static void main(String[] args) throws TimeoutException, InterruptedException
+    {
+        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
+            LongEvent.FACTORY, 16, DaemonThreadFactory.INSTANCE
+        );
+
+        CountDownLatch shutdownLatch = new CountDownLatch(2);
+
+        disruptor.handleEventsWith(new Handler(shutdownLatch)).then(new Handler(shutdownLatch));
+        disruptor.start();
+
+        long next = disruptor.getRingBuffer().next();
+        disruptor.getRingBuffer().get(next).set(next);
+        disruptor.getRingBuffer().publish(next);
+
+        disruptor.shutdown(10, TimeUnit.SECONDS);
+
+        shutdownLatch.await();
+
+        System.out.println(value);
+    }
+}

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/disruptor.git



More information about the pkg-java-commits mailing list