Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import net.logstash.logback.appender.destination.DestinationParser;
import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy;
import net.logstash.logback.appender.listener.TcpAppenderListener;
import net.logstash.logback.encoder.CompositeJsonEncoder;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.encoder.StreamingEncoder;
import net.logstash.logback.util.ReusableByteBuffer;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.joran.spi.DefaultClass;
Expand Down Expand Up @@ -350,6 +352,12 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
*/
private Future<?> readerFuture;

/**
* Intermediate ByteBuffer used to store content generated by {@link StreamingEncoder}.
* Stays uninitialized if encoder is a "raw" {@link Encoder}.
*/
private final ReusableByteBuffer buffer;

/**
* When run, if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
* has elapsed since the last event was sent,
Expand All @@ -363,7 +371,7 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
* if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
* has elapsed since the last event was sent,
* then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage}
* to the socket outputstream.
* to the socket OutputStream.
*
*/
private class KeepAliveRunnable implements Runnable {
Expand Down Expand Up @@ -504,6 +512,18 @@ public void run() {
}
}


TcpSendingEventHandler() {
if (encoder instanceof CompositeJsonEncoder) {
this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder<Event>) encoder).getMinBufferSize());
} else if (encoder instanceof StreamingEncoder) {
this.buffer = new ReusableByteBuffer();
} else {
this.buffer = null;
}
}


@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {

Expand Down Expand Up @@ -617,7 +637,16 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
@SuppressWarnings("unchecked")
private void encode(Event event, OutputStream outputStream) throws IOException {
if (encoder instanceof StreamingEncoder) {
((StreamingEncoder<Event>) encoder).encode(event, outputStream);
/*
* Generate content in a temporary buffer to avoid writing "partial" content in the output
* stream if the Encoder throws an exception.
*/
try {
((StreamingEncoder<Event>) encoder).encode(event, buffer);
buffer.writeTo(outputStream);
} finally {
buffer.reset();
}
} else {
byte[] data = encoder.encode(event);
if (data != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.decorate.NullJsonFactoryDecorator;
import net.logstash.logback.decorate.NullJsonGeneratorDecorator;
import net.logstash.logback.util.ObjectPool;
import net.logstash.logback.util.ProxyOutputStream;

import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.ContextAware;
import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.spi.LifeCycle;
import ch.qos.logback.core.util.CloseUtil;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonFactory.Feature;
Expand All @@ -41,13 +44,23 @@

/**
* Formats logstash Events as JSON using {@link JsonProvider}s.
* <p>
*
* The {@link CompositeJsonFormatter} starts the JSON object ('{'),
* <p>The {@link CompositeJsonFormatter} starts the JSON object ('{'),
* then delegates writing the contents of the object to the {@link JsonProvider}s,
* and then ends the JSON object ('}').
*
* <p>Jackson {@link JsonGenerator} are initially created with a "disconnected" output stream so they can be
* reused multiple times with different target output stream. They are kept in an internal pool whose
* size is technically unbounded. It will however never hold more entries than the number of concurrent
* threads accessing it. Entries are kept in the pool using soft references so they can be garbage
* collected by the JVM when running low in memory.
*
* <p>{@link JsonGenerator} instances are *not* reused after they threw an exception. This is to prevent
* reusing an instance whose internal state may be unpredictable.
*
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
*
* @author brenuart
*/
public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAware>
extends ContextAwareBase implements LifeCycle {
Expand Down Expand Up @@ -80,6 +93,9 @@ public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAwa

private volatile boolean started;

private ObjectPool<JsonFormatter> pool;


public CompositeJsonFormatter(ContextAware declaredOrigin) {
super(declaredOrigin);
}
Expand All @@ -102,12 +118,15 @@ public void start() {
jsonProviders.setContext(context);
jsonProviders.setJsonFactory(jsonFactory);
jsonProviders.start();

pool = new ObjectPool<>(this::createJsonFormatter);
started = true;
}

@Override
public void stop() {
if (isStarted()) {
pool.clear();
jsonProviders.stop();
jsonFactory = null;
started = false;
Expand All @@ -119,40 +138,97 @@ public boolean isStarted() {
return started;
}


/**
* Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}.
* Write an event in the given output stream.
*
* @param outputStream the output stream used by the {@link JsonFormatter}
* @return {@link JsonFormatter} writing JSON content in the output stream
* @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content
* @param event the event to write
* @param outputStream the output stream to write the event into
* @throws IOException thrown upon failure to write the event
*/
public JsonFormatter createJsonFormatter(OutputStream outputStream) throws IOException {
public void writeEvent(Event event, OutputStream outputStream) throws IOException {
Objects.requireNonNull(outputStream);
if (!isStarted()) {
throw new IllegalStateException("Formatter is not started");
}

JsonGenerator generator = createGenerator(outputStream);
return new JsonFormatter(generator);
try (JsonFormatter formatter = this.pool.acquire()) {
formatter.writeEvent(outputStream, event);
}
}


public class JsonFormatter implements Closeable {
/**
* Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}.
*
* @param stream the output stream used by the {@link JsonFormatter}
* @return {@link JsonFormatter} writing JSON content in the output stream
* @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content
*/
private JsonFormatter createJsonFormatter() {
try {
DisconnectedOutputStream outputStream = new DisconnectedOutputStream();
JsonGenerator generator = createGenerator(outputStream);
return new JsonFormatter(outputStream, generator);
} catch (IOException e) {
throw new IllegalStateException("Unable to initialize Jackson JSON layer", e);
}

}

private class JsonFormatter implements ObjectPool.Lifecycle, Closeable {
private final JsonGenerator generator;
private final DisconnectedOutputStream stream;
private boolean recyclable = true;

public JsonFormatter(JsonGenerator generator) {
JsonFormatter(DisconnectedOutputStream outputStream, JsonGenerator generator) {
this.stream = Objects.requireNonNull(outputStream);
this.generator = Objects.requireNonNull(generator);
}

public void writeEvent(Event event) throws IOException {
writeEventToGenerator(generator, event);
public void writeEvent(OutputStream outputStream, Event event) throws IOException {
try {
this.stream.connect(outputStream);
writeEventToGenerator(generator, event);

} catch (IOException | RuntimeException e) {
this.recyclable = false;
throw e;

} finally {
this.stream.disconnect();
}
}

@Override
public boolean recycle() {
return this.recyclable;
}

@Override
public void dispose() {
CloseUtil.closeQuietly(this.generator);
}

@Override
public void close() throws IOException {
this.generator.close();
CompositeJsonFormatter.this.pool.release(this);
}
}

private static class DisconnectedOutputStream extends ProxyOutputStream {
DisconnectedOutputStream() {
super(null);
}

public void connect(OutputStream out) {
this.delegate = out;
}

public void disconnect() {
this.delegate = null;
}
}

private JsonFactory createJsonFactory() {
ObjectMapper objectMapper = new ObjectMapper()
Expand Down Expand Up @@ -187,9 +263,6 @@ private JsonFactory decorateFactory(JsonFactory factory) {
}

protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException {
if (!isStarted()) {
throw new IllegalStateException("Encoding attempted before starting.");
}
generator.writeStartObject();
jsonProviders.writeTo(generator, event);
generator.writeEndObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.util.ReusableJsonFormatterPool;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
Expand All @@ -37,7 +38,7 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
private static final byte[] EMPTY_BYTES = new byte[0];

/**
* The minimum size of the byte buffer used when encoding events.
* The minimum size of the byte buffer used when encoding events using {@link #encode(DeferredProcessingAware)}.
*
* <p>The buffer automatically grows above the {@code #minBufferSize} when needed to
* accommodate with larger events. However, only the first {@code minBufferSize} bytes
Expand All @@ -47,12 +48,16 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
*/
private int minBufferSize = 1024;

/**
* Pool of reusable byte buffers used when calling {@link #encode(DeferredProcessingAware)}
*/
private ReusableByteBufferPool bufferPool;

private Encoder<Event> prefix;
private Encoder<Event> suffix;

private final CompositeJsonFormatter<Event> formatter;
private ReusableJsonFormatterPool<Event> formatterPool;


private String lineSeparator = System.lineSeparator();

private byte[] lineSeparatorBytes;
Expand All @@ -72,10 +77,7 @@ public void encode(Event event, OutputStream outputStream) throws IOException {
throw new IllegalStateException("Encoder is not started");
}

try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
cachedFormatter.getBuffer().writeTo(outputStream);
}
encode(outputStream, event);
}

@Override
Expand All @@ -84,21 +86,26 @@ public byte[] encode(Event event) {
throw new IllegalStateException("Encoder is not started");
}

try (ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) {
encode(cachedFormatter, event);
return cachedFormatter.getBuffer().toByteArray();
ReusableByteBuffer buffer = bufferPool.acquire();

try {
encode(buffer, event);
return buffer.toByteArray();

} catch (IOException e) {
addWarn("Error encountered while encoding log event. Event: " + event, e);
return EMPTY_BYTES;

} finally {
bufferPool.release(buffer);
}
}

private void encode(ReusableJsonFormatterPool<Event>.ReusableJsonFormatter cachedFormatter, Event event) throws IOException {
encode(prefix, event, cachedFormatter.getBuffer());
cachedFormatter.write(event);
encode(suffix, event, cachedFormatter.getBuffer());
cachedFormatter.getBuffer().write(lineSeparatorBytes);
private void encode(OutputStream outputStream, Event event) throws IOException {
encode(prefix, event, outputStream);
formatter.writeEvent(event, outputStream);
encode(suffix, event, outputStream);
outputStream.write(lineSeparatorBytes);
}

private void encode(Encoder<Event> encoder, Event event, OutputStream outputStream) throws IOException {
Expand Down Expand Up @@ -127,7 +134,7 @@ public void start() {
startWrapped(prefix);
startWrapped(suffix);

this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize);
this.bufferPool = ReusableByteBufferPool.create(minBufferSize);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -171,7 +178,7 @@ public void stop() {
stopWrapped(prefix);
stopWrapped(suffix);

this.formatterPool = null;
bufferPool = null;
}
}

Expand Down
Loading