From fb26dd56e03c570e279a97f8c52c1de47508030c Mon Sep 17 00:00:00 2001 From: Phil Clay Date: Sun, 19 Oct 2025 16:06:52 -0600 Subject: [PATCH] Upgrade lmax disruptor to 4.0.0 disruptor 4.0.0 folded LifecycleAware and SequenceReportingEventHandler into EventHandler interface --- pom.xml | 2 +- .../AbstractLogstashTcpSocketAppender.java | 44 +++++++++---------- .../appender/AsyncDisruptorAppender.java | 26 +++++------ 3 files changed, 34 insertions(+), 38 deletions(-) diff --git a/pom.xml b/pom.xml index b688b88b..8e50b352 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ 2.0.6 - 3.4.4 + 4.0.0 3.27.6 diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 7596e061..7812abcb 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; @@ -68,7 +69,6 @@ import ch.qos.logback.core.util.CloseUtil; import ch.qos.logback.core.util.Duration; import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.RingBuffer; /** @@ -152,7 +152,7 @@ public abstract class AbstractLogstashTcpSocketAppender destinations = new ArrayList<>(2); + private final List destinations = new ArrayList<>(2); /** * When connected, this is the index into {@link #destinations} @@ -220,6 +220,7 @@ public abstract class AbstractLogstashTcpSocketAppender * * If set prior to startup, it will be used. *

@@ -246,7 +247,7 @@ public abstract class AbstractLogstashTcpSocketAppenderWhen null (the default), no keepAlive messages will be sent.

*/ private Duration keepAliveDuration; @@ -288,15 +289,15 @@ public abstract class AbstractLogstashTcpSocketAppender>, LifecycleAware { + private class TcpSendingEventHandler implements EventHandler> { /** * Max number of consecutive failed connection attempts for which * logback status messages will be logged. * - * After this many failed attempts, reconnection will still + *

After this many failed attempts, reconnection will still * be attempted, but failures will not be logged again - * (until after the connection is successful, and then fails again.) + * (until after the connection is successful, and then fails again.)

*/ private static final int MAX_REPEAT_CONNECTION_ERROR_LOG = 5; @@ -372,11 +373,11 @@ private class TcpSendingEventHandler implements EventHandler>, L * after the calculated {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * from the last sent event using {@link TcpSendingEventHandler#scheduleKeepAlive(long)}. * - * When the keepAlive event is processed by the event handler, + *

When the keepAlive event is processed by the event handler, * if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * has elapsed since the last event was sent, * then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage} - * to the socket OutputStream. + * to the socket OutputStream.

* */ private class KeepAliveRunnable implements Runnable { @@ -418,8 +419,8 @@ public void run() { * Keeps reading the {@link ReaderCallable#inputStream} until the * end of the stream is reached. * - * This helps pro-actively detect server-side socket disconnections, - * specifically in the case of Amazon's Elastic Load Balancers (ELB). + *

This helps proactively detect server-side socket disconnections, + * specifically in the case of Amazon's Elastic Load Balancers (ELB).

*/ private class ReaderCallable implements Callable { @@ -710,8 +711,8 @@ private synchronized void reopenSocket() { * Repeatedly tries to open a socket until it is successful, * or the hander is stopped, or the handler thread is interrupted. * - * If the socket is non-null when this method returns, - * then it should be able to be used to send. + *

If the socket is non-null when this method returns, + * then it should be able to be used to send.

*/ private synchronized void openSocket() { int errorCount = 0; @@ -946,7 +947,6 @@ private synchronized void unscheduleWriteTimeout() { /** * Wrap exceptions thrown by {@link Encoder} */ - @SuppressWarnings("serial") private static class EncoderException extends Exception { EncoderException(Throwable cause) { super(cause); @@ -1278,8 +1278,8 @@ public Duration getInitialSendDelay() { /** * Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}. * - * When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy}, - * this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}. + *

When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy}, + * this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.

* * @see PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration) * @param secondaryConnectionTTL the TTL of a connection when connected to a secondary destination @@ -1320,7 +1320,7 @@ public Duration getSecondaryConnectionTTL() { /** * Set the connection timeout when establishing a connection to a remote destination. * - * Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults". + *

Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults".

* * @param connectionTimeout connection timeout */ @@ -1408,7 +1408,7 @@ public Duration getKeepAliveDuration() { * then the {@link #keepAliveMessage} will be sent to the socket in * order to keep the connection alive. * - * When {@code null}, zero or negative, no keepAlive messages will be sent. + *

When {@code null}, zero or negative, no keepAlive messages will be sent.

* * @param keepAliveDuration duration between consecutive keep alive messages */ @@ -1424,15 +1424,15 @@ public String getKeepAliveMessage() { * Message to send for keeping the connection alive * if {@link #keepAliveDuration} is non-null and strictly positive. * - * The following values have special meaning: + *

The following values have special meaning:

*
    *
  • {@code null} or empty string = no keep alive.
  • *
  • "{@code SYSTEM}" = operating system new line (default).
  • *
  • "{@code UNIX}" = unix line ending (\n).
  • *
  • "{@code WINDOWS}" = windows line ending (\r\n).
  • *
- *

- * Any other value will be used as-is. + * + *

Any other value will be used as-is.

* * @param keepAliveMessage the keep alive message */ @@ -1473,8 +1473,8 @@ public void setKeepAliveCharset(Charset keepAliveCharset) { * Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}. *

* - * If you change the {@link #threadFactory}, then this - * value may not be honored. + * If you change the {@link #setThreadFactory(ThreadFactory) threadFactory}, + * then this value may not be honored. *

* * The string is a format pattern understood by {@link Formatter#format(String, Object...)}. diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index 3f6db21a..94c2cfea 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -45,11 +45,9 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceReportingEventHandler; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; @@ -86,6 +84,7 @@ * needing to explicitly shut down the appender. * Note that in this case, it is possible for appended log events to not * be handled (if the child thread has not had a chance to process them yet). + *

* * By setting {@link #setDaemon(boolean)} to false, you can change this behavior. * When false, child threads created by this appender will not be daemon threads, @@ -133,7 +132,7 @@ public abstract class AsyncDisruptorAppender * By default, a {@link BlockingWaitStrategy} is used, which is the most * CPU conservative, but results in a higher latency. @@ -214,7 +213,7 @@ public abstract class AsyncDisruptorAppender> exceptionHandler = new LogEventExceptionHandler(); + private final ExceptionHandler> exceptionHandler = new LogEventExceptionHandler(); /** * Consecutive number of dropped events. @@ -323,7 +322,7 @@ public void translateTo(LogEvent logEvent, long sequence, Event event) { * Defines what happens when there is an exception during * {@link RingBuffer} processing. * - * Currently, just logs to the logback context. + *

Currently, just logs to the logback context.

*/ private class LogEventExceptionHandler implements ExceptionHandler> { @@ -347,7 +346,7 @@ public void handleOnShutdownException(Throwable ex) { * Clears the event after a delegate event handler has processed the event, * so that the event can be garbage collected. */ - private static class EventClearingEventHandler implements SequenceReportingEventHandler>, LifecycleAware { + private static class EventClearingEventHandler implements EventHandler> { private final EventHandler> delegate; private Sequence sequenceCallback; @@ -378,16 +377,13 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) th @Override public void onStart() { - if (delegate instanceof LifecycleAware) { - ((LifecycleAware) delegate).onStart(); - } + delegate.onStart(); } @Override public void onShutdown() { - if (delegate instanceof LifecycleAware) { - ((LifecycleAware) delegate).onShutdown(); - } + delegate.onShutdown(); + } @Override @@ -610,7 +606,7 @@ protected String calculateThreadName() { } protected List getThreadNameFormatParams() { - return Arrays.asList( + return Arrays.asList( getName(), threadNumber.incrementAndGet()); } @@ -722,8 +718,8 @@ public ProducerType getProducerType() { * The {@link ProducerType} to use to configure the Disruptor. * By default this is {@link ProducerType#MULTI}. * - * Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only - * one thread will ever be appending to this appender. + *

Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only + * one thread will ever be appending to this appender.

* *

WARNING: unexpected behavior may occur if this parameter is set to {@link ProducerType#SINGLE} * and multiple threads are appending to this appender.