diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index 0d6525c1..60a5ecf7 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -49,6 +49,8 @@ import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceReportingEventHandler; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; @@ -353,9 +355,10 @@ public void handleOnShutdownException(Throwable ex) { * Clears the event after a delegate event handler has processed the event, * so that the event can be garbage collected. */ - private static class EventClearingEventHandler implements EventHandler>, LifecycleAware { + private static class EventClearingEventHandler implements SequenceReportingEventHandler>, LifecycleAware { private final EventHandler> delegate; + private Sequence sequenceCallback; EventClearingEventHandler(EventHandler> delegate) { super(); @@ -371,6 +374,13 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) th * Clear the event so that it can be garbage collected. */ event.event = null; + + /* + * Notify the BatchEventProcessor that the sequence has progressed. + * Without this callback the sequence would not be progressed + * until the batch has completely finished. + */ + sequenceCallback.set(sequence); } } @@ -388,6 +398,10 @@ public void onShutdown() { } } + @Override + public void setSequenceCallback(final Sequence sequenceCallback) { + this.sequenceCallback = sequenceCallback; + } } @Override diff --git a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java index ea97615f..c3134d7c 100644 --- a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -374,6 +375,58 @@ public void appendBlockingReleasedOnStop() { } + /* + * Assert that LogEvent are released from the RingBuffer before the end of a batch. + */ + @Test + public void logEventsClearedBeforeEndOfBatch() throws Exception { + final CyclicBarrier barrier = new CyclicBarrier(2); + + try { + TestEventHandler eventHandler = new TestEventHandler(barrier); + appender.setRingBufferSize(4); + appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events... + appender.setEventHandler(eventHandler); + appender.setAddDefaultStatusListener(true); + appender.start(); + + /* + * Append enough events to fill the buffer + */ + appender.append(event1); + appender.append(event1); + appender.append(event1); + appender.append(event1); + + /* + * We now have 1 event followed by a batch of 3. + * Release 2 events which means the batch is not yet fully processed but we should have room + * for 2 additional events in the buffer. + */ + barrier.await(); + barrier.await(); + await().until(() -> eventHandler.getEvents().size() == 2); + + appender.append(event1); + appender.append(event1); + verify(listener, times(0)).eventAppendFailed(eq(appender), any(), any()); // nothing dropped - they all fit in the buffer + + /* + * Release them all and assert we got 6 in total + */ + barrier.await(); + barrier.await(); + barrier.await(); + barrier.await(); + + await().until(() -> eventHandler.getEvents().size() == 6); + + } finally { + barrier.reset(); + } + } + + @Test public void configRingBufferSize_negative() { appender.setRingBufferSize(-1); @@ -421,15 +474,25 @@ private Future execute(Runnable runnable) { private static class TestEventHandler implements EventHandler> { private final List events = new ArrayList<>(); private final CountDownLatch waiter; + private final CyclicBarrier barrier; TestEventHandler(CountDownLatch waiter) { this.waiter = waiter; + this.barrier = null; + } + TestEventHandler(CyclicBarrier barrier) { + this.waiter = null; + this.barrier = barrier; } + @Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { if (waiter != null) { waiter.await(); } + if (barrier != null) { + barrier.await(); + } this.events.add(event.event); } diff --git a/src/test/java/net/logstash/logback/appender/DelegatingAsyncDisruptorAppenderTest.java b/src/test/java/net/logstash/logback/appender/DelegatingAsyncDisruptorAppenderTest.java index 1d870520..bd5144d5 100644 --- a/src/test/java/net/logstash/logback/appender/DelegatingAsyncDisruptorAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/DelegatingAsyncDisruptorAppenderTest.java @@ -251,7 +251,7 @@ public void flushFlushable() throws Exception { appender.append(event); verify(flushableDelegate, timeout(VERIFICATION_TIMEOUT)).doAppend(event); - verify(flushableDelegate).flush(); + verify(flushableDelegate, timeout(VERIFICATION_TIMEOUT)).flush(); }