Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Limit time waiting for the "retry lock" + stop retrying if the thread…
… is interrupted
  • Loading branch information
brenuart committed Aug 26, 2021
commit 8aeef5b74f9057f10c090e4c7ea3d644b2215178
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ protected void append(Event event) {
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify listeners
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
Expand All @@ -539,6 +539,10 @@ protected void append(Event event) {
} catch (ShutdownInProgressException e) {
// Same message as if Appender#append is called after the appender is stopped...
addWarn("Attempted to append to non started appender [" + this.getName() + "].");

} catch (InterruptedException e) {
// be silent but re-interrupt the thread
Thread.currentThread().interrupt();
}
}

Expand All @@ -551,8 +555,9 @@ protected void append(Event event) {
* could not be added to the ring buffer.
* @throws ShutdownInProgressException thrown when the appender is shutdown while retrying
* to enqueue the event
* @throws InterruptedException thrown when the logging thread is interrupted while retrying
*/
private boolean enqueue(Event event) throws ShutdownInProgressException {
private boolean enqueue(Event event) throws ShutdownInProgressException, InterruptedException {
// Try enqueue the "normal" way
//
if (this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
Expand All @@ -567,31 +572,39 @@ private boolean enqueue(Event event) throws ShutdownInProgressException {

// Determine how long we can retry
//
long deadline = this.appendTimeout.getMilliseconds() < 0
final long waitTime = this.appendTimeout.getMilliseconds() < 0
? Long.MAX_VALUE
: System.currentTimeMillis() + this.appendTimeout.getMilliseconds();
: this.appendTimeout.getMilliseconds();
final long deadline = System.currentTimeMillis() + waitTime;

long backoff = 1L;
long backoffLimit = TimeUnit.MILLISECONDS.toNanos(this.appendRetryFrequency.getMilliseconds());


// Limit retries to a single thread at once to avoid burning CPU cycles "for nothing"
// in CPU constraint environments.
//
lock.lock();
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
return false;
}

try {
do {
if (!isStarted()) {
throw new ShutdownInProgressException();
}

if (deadline <= System.currentTimeMillis()) {
return false;
}

if (!isStarted()) {
throw new ShutdownInProgressException();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}

LockSupport.parkNanos(backoff);
backoff = Math.min(backoff * 2, backoffLimit);

} while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event));

return true;
Expand Down