Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<logback-access.version>2.0.6</logback-access.version>

<!-- shaded runtime dependencies -->
<disruptor.version>3.4.4</disruptor.version>
<disruptor.version>4.0.0</disruptor.version>

<!-- test dependencies -->
<assertj.version>3.27.6</assertj.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.net.SocketFactory;
Expand Down Expand Up @@ -68,7 +69,6 @@
import ch.qos.logback.core.util.CloseUtil;
import ch.qos.logback.core.util.Duration;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;

/**
Expand Down Expand Up @@ -152,7 +152,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*
* The interpretation of this list is up to the current {@link #connectionStrategy}.
*/
private List<InetSocketAddress> destinations = new ArrayList<>(2);
private final List<InetSocketAddress> destinations = new ArrayList<>(2);

/**
* When connected, this is the index into {@link #destinations}
Expand Down Expand Up @@ -220,6 +220,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr

/**
* Used to create client {@link Socket}s to which to communicate.
* <p>
*
* If set prior to startup, it will be used.
* <p>
Expand All @@ -246,7 +247,7 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
* then the {@link #keepAliveMessage} will be sent to the socket in
* order to keep the connection alive.
*
* When null (the default), no keepAlive messages will be sent.
* <p>When null (the default), no keepAlive messages will be sent.</p>
*/
private Duration keepAliveDuration;

Expand Down Expand Up @@ -288,15 +289,15 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
/**
* Event handler responsible for performing the TCP transmission.
*/
private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware {
private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>> {

/**
* Max number of consecutive failed connection attempts for which
* logback status messages will be logged.
*
* After this many failed attempts, reconnection will still
* <p>After this many failed attempts, reconnection will still
* be attempted, but failures will not be logged again
* (until after the connection is successful, and then fails again.)
* (until after the connection is successful, and then fails again.)</p>
*/
private static final int MAX_REPEAT_CONNECTION_ERROR_LOG = 5;

Expand Down Expand Up @@ -372,11 +373,11 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
* after the calculated {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
* from the last sent event using {@link TcpSendingEventHandler#scheduleKeepAlive(long)}.
*
* When the keepAlive event is processed by the event handler,
* <p>When the keepAlive event is processed by the event handler,
* if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
* has elapsed since the last event was sent,
* then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage}
* to the socket OutputStream.
* to the socket OutputStream.</p>
*
*/
private class KeepAliveRunnable implements Runnable {
Expand Down Expand Up @@ -418,8 +419,8 @@ public void run() {
* Keeps reading the {@link ReaderCallable#inputStream} until the
* end of the stream is reached.
*
* This helps pro-actively detect server-side socket disconnections,
* specifically in the case of Amazon's Elastic Load Balancers (ELB).
* <p>This helps proactively detect server-side socket disconnections,
* specifically in the case of Amazon's Elastic Load Balancers (ELB).</p>
*/
private class ReaderCallable implements Callable<Void> {

Expand Down Expand Up @@ -710,8 +711,8 @@ private synchronized void reopenSocket() {
* Repeatedly tries to open a socket until it is successful,
* or the hander is stopped, or the handler thread is interrupted.
*
* If the socket is non-null when this method returns,
* then it should be able to be used to send.
* <p>If the socket is non-null when this method returns,
* then it should be able to be used to send.</p>
*/
private synchronized void openSocket() {
int errorCount = 0;
Expand Down Expand Up @@ -946,7 +947,6 @@ private synchronized void unscheduleWriteTimeout() {
/**
* Wrap exceptions thrown by {@link Encoder}
*/
@SuppressWarnings("serial")
private static class EncoderException extends Exception {
EncoderException(Throwable cause) {
super(cause);
Expand Down Expand Up @@ -1278,8 +1278,8 @@ public Duration getInitialSendDelay() {
/**
* Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
*
* When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy},
* this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
* <p>When the {@link #connectionStrategy} is a {@link PreferPrimaryDestinationConnectionStrategy},
* this will set its {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.</p>
*
* @see PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)
* @param secondaryConnectionTTL the TTL of a connection when connected to a secondary destination
Expand Down Expand Up @@ -1320,7 +1320,7 @@ public Duration getSecondaryConnectionTTL() {
/**
* Set the connection timeout when establishing a connection to a remote destination.
*
* Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults".
* <p>Use {@code 0} for an "infinite timeout" which often really means "use the OS defaults".</p>
*
* @param connectionTimeout connection timeout
*/
Expand Down Expand Up @@ -1408,7 +1408,7 @@ public Duration getKeepAliveDuration() {
* then the {@link #keepAliveMessage} will be sent to the socket in
* order to keep the connection alive.
*
* When {@code null}, zero or negative, no keepAlive messages will be sent.
* <p>When {@code null}, zero or negative, no keepAlive messages will be sent.</p>
*
* @param keepAliveDuration duration between consecutive keep alive messages
*/
Expand All @@ -1424,15 +1424,15 @@ public String getKeepAliveMessage() {
* Message to send for keeping the connection alive
* if {@link #keepAliveDuration} is non-null and strictly positive.
*
* The following values have special meaning:
* <p>The following values have special meaning:</p>
* <ul>
* <li>{@code null} or empty string = no keep alive.</li>
* <li>"{@code SYSTEM}" = operating system new line (default).</li>
* <li>"{@code UNIX}" = unix line ending (\n).</li>
* <li>"{@code WINDOWS}" = windows line ending (\r\n).</li>
* </ul>
* <p>
* Any other value will be used as-is.
*
* <p>Any other value will be used as-is.</p>
*
* @param keepAliveMessage the keep alive message
*/
Expand Down Expand Up @@ -1473,8 +1473,8 @@ public void setKeepAliveCharset(Charset keepAliveCharset) {
* Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
* If you change the {@link #setThreadFactory(ThreadFactory) threadFactory},
* then this value may not be honored.
* <p>
*
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
Expand Down Expand Up @@ -86,6 +84,7 @@
* needing to explicitly shut down the appender.
* Note that in this case, it is possible for appended log events to not
* be handled (if the child thread has not had a chance to process them yet).
* <p>
*
* By setting {@link #setDaemon(boolean)} to false, you can change this behavior.
* When false, child threads created by this appender will not be daemon threads,
Expand Down Expand Up @@ -133,7 +132,7 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa

/**
* The {@link WaitStrategy} to used by the RingBuffer
* when pulling events to be processed by {@link #eventHandler}.
* when pulling events to be processed by {@link #createEventHandler() event handler}.
* <p>
* By default, a {@link BlockingWaitStrategy} is used, which is the most
* CPU conservative, but results in a higher latency.
Expand Down Expand Up @@ -214,7 +213,7 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
* Defines what happens when there is an exception during
* {@link RingBuffer} processing.
*/
private ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();
private final ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();

/**
* Consecutive number of dropped events.
Expand Down Expand Up @@ -323,7 +322,7 @@ public void translateTo(LogEvent<Event> logEvent, long sequence, Event event) {
* Defines what happens when there is an exception during
* {@link RingBuffer} processing.
*
* Currently, just logs to the logback context.
* <p>Currently, just logs to the logback context.</p>
*/
private class LogEventExceptionHandler implements ExceptionHandler<LogEvent<Event>> {

Expand All @@ -347,7 +346,7 @@ public void handleOnShutdownException(Throwable ex) {
* Clears the event after a delegate event handler has processed the event,
* so that the event can be garbage collected.
*/
private static class EventClearingEventHandler<Event> implements SequenceReportingEventHandler<LogEvent<Event>>, LifecycleAware {
private static class EventClearingEventHandler<Event> implements EventHandler<LogEvent<Event>> {

private final EventHandler<LogEvent<Event>> delegate;
private Sequence sequenceCallback;
Expand Down Expand Up @@ -378,16 +377,13 @@ public void onEvent(LogEvent<Event> event, long sequence, boolean endOfBatch) th

@Override
public void onStart() {
if (delegate instanceof LifecycleAware) {
((LifecycleAware) delegate).onStart();
}
delegate.onStart();
}

@Override
public void onShutdown() {
if (delegate instanceof LifecycleAware) {
((LifecycleAware) delegate).onShutdown();
}
delegate.onShutdown();

}

@Override
Expand Down Expand Up @@ -610,7 +606,7 @@ protected String calculateThreadName() {
}

protected List<Object> getThreadNameFormatParams() {
return Arrays.<Object>asList(
return Arrays.asList(
getName(),
threadNumber.incrementAndGet());
}
Expand Down Expand Up @@ -722,8 +718,8 @@ public ProducerType getProducerType() {
* The {@link ProducerType} to use to configure the Disruptor.
* By default this is {@link ProducerType#MULTI}.
*
* Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only
* one thread will ever be appending to this appender.
* <p>Can be set to {@link ProducerType#SINGLE} for increase performance if (and only if) only
* one thread will ever be appending to this appender.</p>
*
* <p>WARNING: unexpected behavior may occur if this parameter is set to {@link ProducerType#SINGLE}
* and multiple threads are appending to this appender.
Expand Down
Loading