Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -289,12 +290,6 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
*/
private static final int MAX_REPEAT_CONNECTION_ERROR_LOG = 5;

/**
* Number of times we try to write an event before it is discarded.
* Between each attempt, the socket will be reconnected.
*/
private static final int MAX_REPEAT_WRITE_ATTEMPTS = 5;

/**
* The destination socket to which to send events.
*/
Expand Down Expand Up @@ -505,7 +500,7 @@ public void run() {
long elapsedSendTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastSendStart);
if (elapsedSendTimeInMillis > writeTimeout.getMilliseconds()) {
lastDetectedStartNanoTime = lastSendStart;
addWarn(peerId + "Detected write timeout after " + elapsedSendTimeInMillis + "ms. Write timeout=" + getWriteTimeout() + ". Closing socket to force reconnect");
addWarn(peerId + "Detected write timeout after " + elapsedSendTimeInMillis + "ms (writeTimeout=" + getWriteTimeout() + "). Closing socket to force reconnect.");
closeSocket();
}
}
Expand All @@ -516,8 +511,7 @@ public void run() {
@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {

Exception sendFailureException = null;
for (int i = 0; i < MAX_REPEAT_WRITE_ATTEMPTS; i++) {
while (true) {
/*
* Save local references to the outputStream and socket
* in case the WriteTimeoutRunnable closes the socket.
Expand All @@ -531,56 +525,71 @@ public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch)
*
* This will occur if shutdown occurred during reopen()
*/
sendFailureException = SHUTDOWN_IN_PROGRESS_EXCEPTION;
fireEventSendFailure(logEvent.event, SHUTDOWN_IN_PROGRESS_EXCEPTION);
break;
}

Future<?> readerFuture = this.readerFuture; // volatile read
if (readerFuture.isDone() || socket == null) {
/*
* If readerFuture.isDone(), then the destination has shut down its output (our input),
* and the destination is probably no longer listening to its input (our output).
* This will be the case for Amazon's Elastic Load Balancers (ELB)
* when an instance behind the ELB becomes unhealthy while we're connected to it.
*
* If socket == null here, it means that a write timed out,
* and the socket was closed by the WriteTimeoutRunnable.
*
* Therefore, attempt reconnection.
*/
addInfo(peerId + "destination terminated the connection. Reconnecting.");
/*
* If socket == null here, it means that a write timed out,
* and the socket was closed by the WriteTimeoutRunnable.
*
* Note: a warning status has already been emitted by WriteTimeoutRunnable,
* no need to repeat here.
*/
if (socket == null) {
reopenSocket();
continue;
}

/*
* If readerFuture.isDone(), then the destination has shut down its output (our input),
* and the destination is probably no longer listening to its input (our output).
* This will be the case for Amazon's Elastic Load Balancers (ELB)
* when an instance behind the ELB becomes unhealthy while we're connected to it.
*/
Future<?> readerFuture = this.readerFuture; // volatile read
if (readerFuture.isDone()) {
String msg = "destination terminated the connection";
try {
readerFuture.get();
sendFailureException = NOT_CONNECTED_EXCEPTION;
} catch (Exception e) {
sendFailureException = e;
} catch (ExecutionException e) {
msg += " (cause: " + e.getCause().getMessage() + ")";
}

addInfo(peerId + msg + ". Reconnecting.");
reopenSocket();
continue;
}

/*
* Write the event in the output stream.
* Drop event if encoder throws an exception.
* Reconnect if an exception is thrown by the connection stream itself.
*/
try {
writeEvent(socket, outputStream, logEvent, endOfBatch);
return;

} catch (EncoderException e) {
/*
* Encoding threw an exception. Warn and drop event before it becomes a "poison".
*/
addWarn(peerId + "Encoder failed to encode event. Dropping event.", e.getCause());
fireEventSendFailure(logEvent.event, e.getCause());
break;

} catch (Exception e) {
sendFailureException = e;
addWarn(peerId + "unable to send event: " + e.getMessage() + " Reconnecting.", e);
/*
* Need to re-open the socket in case of IOExceptions.
*
* Reopening the socket probably won't help other exceptions
* (like NullPointerExceptions),
* but we're doing so anyway, just in case.
* Any other exception is thrown by the socket stream (or bug in the code).
* Re-open the socket and get a fresh new stream.
*/
addWarn(peerId + "Unable to send event. Reconnecting.", e);
reopenSocket();
}
}

if (logEvent.event != null) {
fireEventSendFailure(logEvent.event, sendFailureException);
}
}

private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event> logEvent, boolean endOfBatch) throws IOException {
private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event> logEvent, boolean endOfBatch) throws IOException, EncoderException {

long startWallTime = System.currentTimeMillis();
long startNanoTime = System.nanoTime();
Expand Down Expand Up @@ -624,20 +633,29 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event


@SuppressWarnings("unchecked")
private void encode(Event event, OutputStream outputStream) throws IOException {
private void encode(Event event, OutputStream outputStream) throws IOException, EncoderException {
if (encoder instanceof StreamingEncoder) {
/*
* Generate content in a temporary buffer to avoid writing "partial" content in the output
* Use a temporary buffer to avoid writing "partial" content in the output
* stream if the Encoder throws an exception.
*/
try {
((StreamingEncoder<Event>) encoder).encode(event, buffer);
try {
((StreamingEncoder<Event>) encoder).encode(event, buffer);
} catch (Exception e) {
throw new EncoderException(e);
}
buffer.writeTo(outputStream);
} finally {
buffer.reset();
}
} else {
byte[] data = encoder.encode(event);
byte[] data;
try {
data = encoder.encode(event);
} catch (Exception e) {
throw new EncoderException(e);
}
if (data != null) {
outputStream.write(data);
}
Expand Down Expand Up @@ -890,6 +908,17 @@ private synchronized void unscheduleWriteTimeout() {
}
}
}


/**
* Wrap exceptions thrown by {@link Encoder}
*/
@SuppressWarnings("serial")
private static class EncoderException extends Exception {
EncoderException(Throwable cause) {
super(cause);
}
}

/**
* An extension of logback's {@link ConfigurableSSLSocketFactory}
Expand Down Expand Up @@ -1048,11 +1077,15 @@ protected Future<?> scheduleReaderCallable(Callable<Void> readerCallable) {
}

protected void fireEventSent(Socket socket, Event event, long durationInNanos) {
safelyFireEvent(l -> l.eventSent(this, socket, event, durationInNanos));
if (event != null) {
safelyFireEvent(l -> l.eventSent(this, socket, event, durationInNanos));
}
}

protected void fireEventSendFailure(Event event, Throwable reason) {
safelyFireEvent(l -> l.eventSendFailure(this, event, reason));
if (event != null) {
safelyFireEvent(l -> l.eventSendFailure(this, event, reason));
}
}

protected void fireConnectionOpened(Socket socket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -54,11 +56,13 @@
import net.logstash.logback.appender.destination.RoundRobinDestinationConnectionStrategy;
import net.logstash.logback.appender.listener.TcpAppenderListener;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.encoder.StreamingEncoder;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.BasicStatusManager;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.status.StatusManager;
Expand Down Expand Up @@ -622,6 +626,108 @@ public void testReconnectToSecondaryOnKeepAlive() throws Exception {
}


/**
* Assert that nothing is written in the socket output stream when a *non* {@link StreamingEncoder}
* throws an exception.
*/
@Test
public void testEncoderThrowsException() throws Exception {
// Use a ByteArrayOutputStream to capture actual output
ByteArrayOutputStream bos = new ByteArrayOutputStream();
when(socket.getOutputStream())
.thenReturn(bos);

// Encoder throws an exception
when(encoder.encode(event1)).thenThrow(new RuntimeException("Exception thrown by the Encoder"));

// Configure and start appender
appender.addDestination("localhost:10000");
appender.start();


// This event will cause the encoder to throw an exception
appender.append(event1);

// Event dropped
verify(listener, async()).eventSendFailure(eq(appender), eq(event1), any());

// Nothing written in the socket output stream
assertThat(bos.size()).isZero();

// A warn status is emitted
assertThat(statusManager.getCopyOfStatusList()).anySatisfy(status -> {
assertThat(status.getLevel()).isEqualTo(Status.WARN);
assertThat(status.getMessage()).contains("Encoder failed to encode event. Dropping event.");
});
}


/**
* Assert that nothing is written in the socket output stream when a {@link StreamingEncoder} throws
* an exception after having written a few bytes.
*
* Also assert that the StreamingEncoder interface is used instead of the legacy Encoder.
*/
@Test
public void testStreamingEncoderThrowsException() throws Exception {
// Use a ByteArrayOutputStream to capture actual output
ByteArrayOutputStream bos = new ByteArrayOutputStream();
when(socket.getOutputStream())
.thenReturn(bos);

// StreamingEncoder throwing an exception
BadStreamingEncoder badEncoder = spy(new BadStreamingEncoder());
appender.setEncoder(badEncoder);

// Configure and start appender
appender.addDestination("localhost:10000");
appender.start();


// This event will cause the encoder to throw an exception
appender.append(event1);

// Event dropped
verify(listener, async()).eventSendFailure(eq(appender), eq(event1), any());

// Streaming interface used instead of standard Encoder
verify(badEncoder, times(1)).encode(eq(event1), any(OutputStream.class));
verify(badEncoder, never()).encode(any());

// Nothing written in the socket output stream
assertThat(bos.size()).isZero();

// A warn status is emitted
assertThat(statusManager.getCopyOfStatusList()).anySatisfy(status -> {
assertThat(status.getLevel()).isEqualTo(Status.WARN);
assertThat(status.getMessage()).contains("Encoder failed to encode event. Dropping event.");
});
}

private static class BadStreamingEncoder extends EncoderBase<ILoggingEvent> implements StreamingEncoder<ILoggingEvent> {
@Override
public byte[] headerBytes() {
return null;
}

@Override
public byte[] encode(ILoggingEvent event) {
return null;
}

@Override
public byte[] footerBytes() {
return null;
}

@Override
public void encode(ILoggingEvent event, OutputStream outputStream) throws IOException {
outputStream.write("First few bytes".getBytes());
throw new IOException("Exception thrown after some bytes are written");
}
}


/**
* At least one valid destination must be configured.
* The appender refuses to start in case of error.
Expand Down