Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ The structure of the output, and the data it contains, is fully configurable.
* [Write Timeouts](#write-timeouts)
* [SSL](#ssl)
* [Async Appenders](#async-appenders)
* [RingBuffer Full](#ringbuffer-full)
* [Graceful Shutdown](#graceful-shutdown)
* [Wait Strategy](#wait-strategy)
* [Appender Listeners](#appender-listeners)
* [Encoders / Layouts](#encoders--layouts)
* [LoggingEvent Fields](#loggingevent-fields)
Expand Down Expand Up @@ -667,7 +670,8 @@ The behaviour of the appender when the RingBuffer is controlled by the `appendTi
| `> 0` | retry during the specified amount of time |


Logging threads waiting for space in the RingBuffer wake up periodically at a frequency defined by `appendRetryFrequency` (default `50ms`). You may increase this frequency for faster reaction time at the expense of higher CPU usage.
Logging threads waiting for space in the RingBuffer wake up periodically at a frequency starting at `1ns` and increasing exponentially up to `appendRetryFrequency` (default `5ms`).
Only one thread is allowed to retry at a time. If a thread is already retrying, additional threads are waiting on a lock until the first is finished. This strategy should help to limit CPU consumption while providing good enough latency and throughput when the ring buffer is at (or close) to its maximal capacity.

When the appender drops an event, it emits a warning status message every `droppedWarnFrequency` consecutive dropped events. Another status message is emitted when the drop period is over and a first event is succesfully enqueued reporting the total number of events that were dropped.

Expand All @@ -683,8 +687,7 @@ Events still in the buffer after this period is elapsed are dropped and the appe

#### Wait Strategy

By default, the [`BlockingWaitStrategy`](https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/BlockingWaitStrategy.html)
is used by the worker thread spawned by this appender.
By default, the [`BlockingWaitStrategy`](https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/BlockingWaitStrategy.html) is used by the worker thread spawned by this appender.
The `BlockingWaitStrategy` minimizes CPU utilization, but results in slower latency and throughput.
If you need faster latency and throughput (at the expense of higher CPU utilization), consider
a different [wait strategy](https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/WaitStrategy.html) offered by the disruptor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import net.logstash.logback.appender.listener.AppenderListener;
Expand Down Expand Up @@ -259,13 +260,18 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
* Delay between consecutive attempts to append an event in the ring buffer when
* full.
*/
private Duration appendRetryFrequency = Duration.buildByMilliseconds(50);
private Duration appendRetryFrequency = Duration.buildByMilliseconds(5);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5ms seems really fast.

What is the reasoning behind this value?


/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);

/**
* Lock used to limit the number of concurrent threads retrying at the same time
*/
private final ReentrantLock lock = new ReentrantLock();


/**
* Event wrapper object used for each element of the {@link RingBuffer}.
Expand Down Expand Up @@ -425,7 +431,7 @@ public void start() {
getStatusManager().add(statusListener);
}

this.disruptor = new Disruptor<LogEvent<Event>>(
this.disruptor = new Disruptor<>(
this.eventFactory,
this.ringBufferSize,
this.threadFactory,
Expand Down Expand Up @@ -504,18 +510,19 @@ protected void append(Event event) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}


// Add event to the buffer, retrying as many times as allowed by the configuration
//
long deadline = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds();

while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
// Wait before retrying
//
long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), deadline - System.currentTimeMillis());
if (waitDuration > 0) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(waitDuration));

try {
if (enqueue(event)) {
// Log warning if we had drop before
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify listeners
//
fireEventAppended(event, System.nanoTime() - startTime);

} else {
// Log a warning status about the failure
//
Expand All @@ -527,30 +534,86 @@ protected void append(Event event) {
// Notify listeners
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
return;
}

// Give up if appender is stopped meanwhile
//
if (!isStarted()) {
// Same message as if Appender#append is called after the appender is stopped...
addWarn("Attempted to append to non started appender [" + this.getName() + "].");
return;
}
} catch (ShutdownInProgressException e) {
// Same message as if Appender#append is called after the appender is stopped...
addWarn("Attempted to append to non started appender [" + this.getName() + "].");

} catch (InterruptedException e) {
// be silent but re-interrupt the thread
Thread.currentThread().interrupt();
}
}


/**
* Enqueue an event in the ring buffer, retrying if allowed by the configuration.
*
* @param event the event to add to the ring buffer
* @return {@code true} if the event is successfully enqueued, {@code false} if the event
* could not be added to the ring buffer.
* @throws ShutdownInProgressException thrown when the appender is shutdown while retrying
* to enqueue the event
* @throws InterruptedException thrown when the logging thread is interrupted while retrying
*/
private boolean enqueue(Event event) throws ShutdownInProgressException, InterruptedException {
// Try enqueue the "normal" way
//
if (this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
return true;
}

// Enqueue success - notify end of error period
// Drop event immediately when no retry
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
if (this.appendTimeout.getMilliseconds() == 0) {
return false;
}

// Notify listeners
// Determine how long we can retry
//
fireEventAppended(event, System.nanoTime() - startTime);
}
final long waitTime = this.appendTimeout.getMilliseconds() < 0
? Long.MAX_VALUE
: this.appendTimeout.getMilliseconds();
final long deadline = System.currentTimeMillis() + waitTime;

long backoff = 1L;
long backoffLimit = TimeUnit.MILLISECONDS.toNanos(this.appendRetryFrequency.getMilliseconds());


// Limit retries to a single thread at once to avoid burning CPU cycles "for nothing"
// in CPU constraint environments.
//
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
return false;
}

try {
do {
if (!isStarted()) {
throw new ShutdownInProgressException();
}

if (deadline <= System.currentTimeMillis()) {
return false;
}

if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}

LockSupport.parkNanos(backoff);
backoff = Math.min(backoff * 2, backoffLimit);

} while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event));

return true;

} finally {
lock.unlock();
}
}

protected void prepareForDeferredProcessing(Event event) {
event.prepareForDeferredProcessing();
}
Expand Down