Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@
<version>${org.mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
159 changes: 139 additions & 20 deletions src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.Arrays;
import java.util.Formatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import net.logstash.logback.appender.listener.AppenderListener;
import net.logstash.logback.status.LevelFilteringStatusListener;
Expand All @@ -37,6 +39,7 @@
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.util.Duration;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
Expand All @@ -46,7 +49,6 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
Expand Down Expand Up @@ -96,6 +98,10 @@
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
*/
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
/**
* Time in nanos to wait between drain attempts during the shutdown phase
*/
private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms

protected static final String APPENDER_NAME_FORMAT = "%1$s";
protected static final String THREAD_INDEX_FORMAT = "%2$d";
Expand Down Expand Up @@ -250,6 +256,29 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
protected final List<Listener> listeners = new ArrayList<>();

/**
* Maximum time to wait when appending events to the ring buffer when full before the event
* is dropped. Use the following values:
* <ul>
* <li>{@code -1} to disable timeout and wait until space becomes available.
* <li>{@code 0} for no timeout and drop the event immediately when the buffer is full.
* <li>{@code > 0} to retry during the specified amount of time.
* </ul>
*/
private Duration appendTimeout = Duration.buildByMilliseconds(0);

/**
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when
* full.
*/
private Duration appendRetryFrequency = Duration.buildByMilliseconds(100);

/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);


/**
* Event wrapper object used for each element of the {@link RingBuffer}.
*/
Expand Down Expand Up @@ -363,7 +392,6 @@ public void onShutdown() {

}

@SuppressWarnings("unchecked")
@Override
public void start() {
if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
Expand Down Expand Up @@ -422,57 +450,127 @@ public void stop() {
if (!super.isStarted()) {
return;
}

/*
* Don't allow any more events to be appended.
*/
super.stop();
try {
this.disruptor.shutdown(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {


/*
* Shutdown Disruptor
*
* Calling Disruptor#shutdown() will wait until all enqueued events are fully processed,
* but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured
* grace period before asking the Disruptor for an immediate shutdown.
*/
long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds();
while (!isRingBufferEmpty() && (System.currentTimeMillis() < deadline)) {
LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN);
}

this.disruptor.halt();


if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Shutdown executor service
*/
this.executorService.shutdown();

try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
// ignored
}


/*
* Notify listeners
*/
fireAppenderStopped();
}


/**
* Test whether the ring buffer is empty or not
*
* @return {@code true} if the ring buffer is empty, {@code false} otherwise
*/
protected boolean isRingBufferEmpty() {
return this.disruptor.getRingBuffer().hasAvailableCapacity(this.getRingBufferSize());
}

@Override
protected void append(Event event) {
long startTime = System.nanoTime();

try {
prepareForDeferredProcessing(event);
} catch (RuntimeException e) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}

if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
} else {
long endTime = System.nanoTime();
if (enqueueEvent(event)) {
// Enqueue success - notify if we had errors previously
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppended(event, endTime - startTime);

// Notify parties
//
fireEventAppended(event, System.nanoTime() - startTime);

} else {
// Log a warning status about the failure
//
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify parties
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
}
}

protected void prepareForDeferredProcessing(Event event) {
event.prepareForDeferredProcessing();
}

/**
* Enqueue the given {@code event} in the ring buffer.
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
protected boolean enqueueEvent(Event event) {
long timeout = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds();

while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
// Check for timeout
//
if (System.currentTimeMillis() >= timeout) {
return false;
}

// Wait before retry
//
long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), System.currentTimeMillis() - timeout);
if (waitDuration > 0) {
LockSupport.parkNanos(waitDuration * 1_000_000L);
}
}

return true;
}

protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
Expand Down Expand Up @@ -580,7 +678,28 @@ public void setWaitStrategy(WaitStrategy waitStrategy) {
public void setWaitStrategyType(String waitStrategyType) {
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
}


public Duration getAppendRetryFrequency() {
return appendRetryFrequency;
}
public void setAppendRetryFrequency(Duration appendRetryFrequency) {
this.appendRetryFrequency = Objects.requireNonNull(appendRetryFrequency);
}

public Duration getAppendTimeout() {
return appendTimeout;
}
public void setAppendTimeout(Duration appendTimeout) {
this.appendTimeout = Objects.requireNonNull(appendTimeout);
}

public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod);
}
public Duration getShutdownGracePeriod() {
return shutdownGracePeriod;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Loading