diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 04682266..ef209a45 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -52,8 +52,10 @@ import net.logstash.logback.appender.destination.DestinationParser; import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy; import net.logstash.logback.appender.listener.TcpAppenderListener; +import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.SeparatorParser; import net.logstash.logback.encoder.StreamingEncoder; +import net.logstash.logback.util.ReusableByteBuffer; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.joran.spi.DefaultClass; @@ -350,6 +352,12 @@ private class TcpSendingEventHandler implements EventHandler>, L */ private Future readerFuture; + /** + * Intermediate ByteBuffer used to store content generated by {@link StreamingEncoder}. + * Set when {@link #onStart()} but stays uninitialized if encoder is a "raw" {@link Encoder}. + */ + private ReusableByteBuffer buffer; + /** * When run, if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * has elapsed since the last event was sent, @@ -363,7 +371,7 @@ private class TcpSendingEventHandler implements EventHandler>, L * if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * has elapsed since the last event was sent, * then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage} - * to the socket outputstream. + * to the socket OutputStream. * */ private class KeepAliveRunnable implements Runnable { @@ -503,7 +511,8 @@ public void run() { } } } - + + @Override public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception { @@ -617,7 +626,16 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent) encoder).encode(event, outputStream); + /* + * Generate content in a temporary buffer to avoid writing "partial" content in the output + * stream if the Encoder throws an exception. + */ + try { + ((StreamingEncoder) encoder).encode(event, buffer); + buffer.writeTo(outputStream); + } finally { + buffer.reset(); + } } else { byte[] data = encoder.encode(event); if (data != null) { @@ -635,6 +653,13 @@ private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentN @Override public void onStart() { this.destinationAttemptStartTimes = new long[destinations.size()]; + + if (encoder instanceof CompositeJsonEncoder) { + this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder) encoder).getMinBufferSize()); + } else if (encoder instanceof StreamingEncoder) { + this.buffer = new ReusableByteBuffer(); + } + openSocket(); scheduleKeepAlive(System.nanoTime()); scheduleWriteTimeout(); @@ -796,6 +821,7 @@ private synchronized void closeSocket() { private void closeEncoder() { encoder.stop(); + buffer = null; } private synchronized void scheduleKeepAlive(long basedOnNanoTime) { diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index 3dc8f8ad..309d95c0 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -25,6 +25,8 @@ import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.decorate.NullJsonFactoryDecorator; import net.logstash.logback.decorate.NullJsonGeneratorDecorator; +import net.logstash.logback.util.ObjectPool; +import net.logstash.logback.util.ProxyOutputStream; import ch.qos.logback.access.spi.IAccessEvent; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -32,6 +34,7 @@ import ch.qos.logback.core.spi.ContextAwareBase; import ch.qos.logback.core.spi.DeferredProcessingAware; import ch.qos.logback.core.spi.LifeCycle; +import ch.qos.logback.core.util.CloseUtil; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory.Feature; @@ -41,13 +44,23 @@ /** * Formats logstash Events as JSON using {@link JsonProvider}s. - *

* - * The {@link CompositeJsonFormatter} starts the JSON object ('{'), + *

The {@link CompositeJsonFormatter} starts the JSON object ('{'), * then delegates writing the contents of the object to the {@link JsonProvider}s, * and then ends the JSON object ('}'). * + *

Jackson {@link JsonGenerator} are initially created with a "disconnected" output stream so they can be + * reused multiple times with different target output stream. They are kept in an internal pool whose + * size is technically unbounded. It will however never hold more entries than the number of concurrent + * threads accessing it. Entries are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + *

{@link JsonGenerator} instances are *not* reused after they threw an exception. This is to prevent + * reusing an instance whose internal state may be unpredictable. + * * @param type of event ({@link ILoggingEvent} or {@link IAccessEvent}). + * + * @author brenuart */ public abstract class CompositeJsonFormatter extends ContextAwareBase implements LifeCycle { @@ -80,6 +93,9 @@ public abstract class CompositeJsonFormatter pool; + + public CompositeJsonFormatter(ContextAware declaredOrigin) { super(declaredOrigin); } @@ -102,12 +118,15 @@ public void start() { jsonProviders.setContext(context); jsonProviders.setJsonFactory(jsonFactory); jsonProviders.start(); + + pool = new ObjectPool<>(this::createJsonFormatter); started = true; } @Override public void stop() { if (isStarted()) { + pool.clear(); jsonProviders.stop(); jsonFactory = null; started = false; @@ -119,40 +138,95 @@ public boolean isStarted() { return started; } + /** - * Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}. + * Write an event in the given output stream. * - * @param outputStream the output stream used by the {@link JsonFormatter} - * @return {@link JsonFormatter} writing JSON content in the output stream - * @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content + * @param event the event to write + * @param outputStream the output stream to write the event into + * @throws IOException thrown upon failure to write the event */ - public JsonFormatter createJsonFormatter(OutputStream outputStream) throws IOException { + public void writeEvent(Event event, OutputStream outputStream) throws IOException { + Objects.requireNonNull(outputStream); if (!isStarted()) { throw new IllegalStateException("Formatter is not started"); } - JsonGenerator generator = createGenerator(outputStream); - return new JsonFormatter(generator); + try (JsonFormatter formatter = this.pool.acquire()) { + formatter.writeEvent(outputStream, event); + } } - public class JsonFormatter implements Closeable { + /** + * Create a reusable {@link JsonFormatter} bound to a {@link DisconnectedOutputStream}. + * + * @return {@link JsonFormatter} writing JSON content in the output stream + */ + private JsonFormatter createJsonFormatter() { + try { + DisconnectedOutputStream outputStream = new DisconnectedOutputStream(); + JsonGenerator generator = createGenerator(outputStream); + return new JsonFormatter(outputStream, generator); + } catch (IOException e) { + throw new IllegalStateException("Unable to initialize Jackson JSON layer", e); + } + + } + + private class JsonFormatter implements ObjectPool.Lifecycle, Closeable { private final JsonGenerator generator; + private final DisconnectedOutputStream stream; + private boolean recyclable = true; - public JsonFormatter(JsonGenerator generator) { + JsonFormatter(DisconnectedOutputStream outputStream, JsonGenerator generator) { + this.stream = Objects.requireNonNull(outputStream); this.generator = Objects.requireNonNull(generator); } - public void writeEvent(Event event) throws IOException { - writeEventToGenerator(generator, event); + public void writeEvent(OutputStream outputStream, Event event) throws IOException { + try { + this.stream.connect(outputStream); + writeEventToGenerator(generator, event); + + } catch (IOException | RuntimeException e) { + this.recyclable = false; + throw e; + + } finally { + this.stream.disconnect(); + } + } + + @Override + public boolean recycle() { + return this.recyclable; + } + + @Override + public void dispose() { + CloseUtil.closeQuietly(this.generator); } @Override public void close() throws IOException { - this.generator.close(); + CompositeJsonFormatter.this.pool.release(this); } } + private static class DisconnectedOutputStream extends ProxyOutputStream { + DisconnectedOutputStream() { + super(null); + } + + public void connect(OutputStream out) { + this.delegate = out; + } + + public void disconnect() { + this.delegate = null; + } + } private JsonFactory createJsonFactory() { ObjectMapper objectMapper = new ObjectMapper() @@ -187,9 +261,6 @@ private JsonFactory decorateFactory(JsonFactory factory) { } protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException { - if (!isStarted()) { - throw new IllegalStateException("Encoding attempted before starting."); - } generator.writeStartObject(); jsonProviders.writeTo(generator, event); generator.writeEndObject(); diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index ccec3dfb..b7860743 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -24,7 +24,8 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; -import net.logstash.logback.util.ReusableJsonFormatterPool; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; @@ -37,7 +38,7 @@ public abstract class CompositeJsonEncoderThe buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes @@ -47,12 +48,16 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; private final CompositeJsonFormatter formatter; - private ReusableJsonFormatterPool formatterPool; - + private String lineSeparator = System.lineSeparator(); private byte[] lineSeparatorBytes; @@ -72,10 +77,7 @@ public void encode(Event event, OutputStream outputStream) throws IOException { throw new IllegalStateException("Encoder is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - encode(cachedFormatter, event); - cachedFormatter.getBuffer().writeTo(outputStream); - } + encode(outputStream, event); } @Override @@ -84,21 +86,26 @@ public byte[] encode(Event event) { throw new IllegalStateException("Encoder is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - encode(cachedFormatter, event); - return cachedFormatter.getBuffer().toByteArray(); + ReusableByteBuffer buffer = bufferPool.acquire(); + + try { + encode(buffer, event); + return buffer.toByteArray(); } catch (IOException e) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; + + } finally { + bufferPool.release(buffer); } } - private void encode(ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter, Event event) throws IOException { - encode(prefix, event, cachedFormatter.getBuffer()); - cachedFormatter.write(event); - encode(suffix, event, cachedFormatter.getBuffer()); - cachedFormatter.getBuffer().write(lineSeparatorBytes); + private void encode(OutputStream outputStream, Event event) throws IOException { + encode(prefix, event, outputStream); + formatter.writeEvent(event, outputStream); + encode(suffix, event, outputStream); + outputStream.write(lineSeparatorBytes); } private void encode(Encoder encoder, Event event, OutputStream outputStream) throws IOException { @@ -127,7 +134,7 @@ public void start() { startWrapped(prefix); startWrapped(suffix); - this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize); + this.bufferPool = ReusableByteBufferPool.create(minBufferSize); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -171,7 +178,7 @@ public void stop() { stopWrapped(prefix); stopWrapped(suffix); - this.formatterPool = null; + bufferPool = null; } } diff --git a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java index 7356e41c..59198339 100644 --- a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java +++ b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java @@ -16,6 +16,7 @@ package net.logstash.logback.layout; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.Objects; @@ -26,7 +27,8 @@ import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.SeparatorParser; -import net.logstash.logback.util.ReusableJsonFormatterPool; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.Layout; import ch.qos.logback.core.LayoutBase; @@ -60,10 +62,13 @@ public abstract class CompositeJsonLayout * unnecessary memory allocations and reduce pressure on the garbage collector. */ private int minBufferSize = 1024; - + + /** + * Pool of reusable byte buffers + */ + private ReusableByteBufferPool bufferPool; private final CompositeJsonFormatter formatter; - private ReusableJsonFormatterPool formatterPool; public CompositeJsonLayout() { super(); @@ -78,21 +83,24 @@ public String doLayout(Event event) { throw new IllegalStateException("Layout is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - writeEvent(cachedFormatter, event); - return new String(cachedFormatter.getBuffer().toByteArray()); + ReusableByteBuffer buffer = bufferPool.acquire(); + try { + writeEvent(buffer, event); + return new String(buffer.toByteArray()); } catch (IOException e) { addWarn("Error formatting logging event", e); return null; + } finally { + bufferPool.release(buffer); } } - private void writeEvent(ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter, Event event) throws IOException { - try (Writer writer = new OutputStreamWriter(cachedFormatter.getBuffer())) { + private void writeEvent(OutputStream outputStream, Event event) throws IOException { + try (Writer writer = new OutputStreamWriter(outputStream)) { writeLayout(prefix, writer, event); - cachedFormatter.write(event); + formatter.writeEvent(event, outputStream); writeLayout(suffix, writer, event); if (lineSeparator != null) { @@ -128,7 +136,7 @@ public void start() { startWrapped(prefix); startWrapped(suffix); - this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize); + this.bufferPool = ReusableByteBufferPool.create(minBufferSize); } private void startWrapped(Layout wrapped) { @@ -162,7 +170,7 @@ public void stop() { stopWrapped(prefix); stopWrapped(suffix); - this.formatterPool = null; + this.bufferPool = null; } private void stopWrapped(Layout wrapped) { diff --git a/src/main/java/net/logstash/logback/util/ObjectPool.java b/src/main/java/net/logstash/logback/util/ObjectPool.java new file mode 100644 index 00000000..6dd620f8 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ObjectPool.java @@ -0,0 +1,202 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.lang.ref.SoftReference; +import java.util.Deque; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Supplier; + + +/** + * Pool of reusable object instances. + * + *

Instances are obtained from the pool by calling {@link #acquire()} and must be returned after use + * by calling {@link #release(Object)}. If not, the instance is simply reclaimed by the garbage collector. + * + *

Instance may also implement the optional {@link ObjectPool.Lifecycle} interface if they wish to be + * notified when they are recycled or disposed. + * + *

The pool is technically unbounded but will never hold more entries than the number of concurrent + * threads accessing it. Entries are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + *

The pool can be cleared at any time by calling {@link #clear()} in which case instances currently + * in the pool will be disposed. + * + * @param type of pooled instances. + * + * @author brenuart + */ +public class ObjectPool { + + /** + * The factory used to create new instances + */ + private final Supplier factory; + + /** + * Instances pool + */ + private volatile SoftReference> poolRef = new SoftReference<>(null); + + + /** + * Create a new instance of the pool. + * + * @param factory the factory used to create new instances. + */ + public ObjectPool(Supplier factory) { + this.factory = Objects.requireNonNull(factory); + } + + /** + * Get an instance out of the pool, creating a new one if needed. + * The instance must be returned to the pool by calling {@link #release(Object)}. If not the + * instance is disposed by the garbage collector. + * + * @return a pooled instance or a new one if none is available + */ + public final T acquire() { + T instance = null; + + Deque pool = this.poolRef.get(); + if (pool != null) { + instance = pool.poll(); + } + + if (instance == null) { + instance = Objects.requireNonNull(createNewInstance()); + } + + return instance; + } + + + /** + * Return an instance to the pool or dispose it if it cannot be recycled. + * + * @param instance the instance to return to the pool + */ + public final void release(T instance) { + if (instance == null) { + return; + } + if (!recycleInstance(instance)) { + disposeInstance(instance); + return; + } + + Deque pool = this.poolRef.get(); + if (pool == null) { + pool = new ConcurrentLinkedDeque<>(); + this.poolRef = new SoftReference<>(pool); + } + + pool.addFirst(instance); // try to reuse the same as much as we can -> add it first + } + + + /** + * Clear the object pool and dispose instances it may contain + */ + public void clear() { + Deque pool = this.poolRef.get(); + if (pool != null) { + while (!pool.isEmpty()) { + disposeInstance(pool.poll()); + } + } + } + + + /** + * Get the number of instances currently in the pool + * + * @return the number of instances in the pool + */ + public int size() { + Deque pool = this.poolRef.get(); + if (pool != null) { + return pool.size(); + } else { + return 0; + } + } + + + /** + * Create a new object instance. + * + * @return a new object instance + */ + protected T createNewInstance() { + return this.factory.get(); + } + + + /** + * Dispose the object instance by calling its life cycle methods. + * + * @param instance the instance to dispose + */ + protected void disposeInstance(T instance) { + if (instance instanceof Lifecycle) { + ((Lifecycle) instance).dispose(); + } + } + + + /** + * Recycle the instance before returning it to the pool. + * + * @param instance the instance to recycle + * @return {@code true} if the instance can be recycled and returned to the pool, {@code false} if not. + */ + protected boolean recycleInstance(T instance) { + if (instance instanceof Lifecycle) { + return ((Lifecycle) instance).recycle(); + } else { + return true; + } + } + + + /** + * Optional interface that pooled instances may implement if they wish to be notified of + * life cycle events. + */ + public interface Lifecycle { + /** + * Indicate whether the instance can be recycled and returned to the pool and perform + * the necessary recycling tasks. + * + * @return {@code true} if the instance can be returned to the pool, {@code false} if + * it must be disposed instead. + */ + default boolean recycle() { + return true; + } + + /** + * Dispose the instance and free allocated resources. + */ + default void dispose() { + // noop + } + } +} diff --git a/src/main/java/net/logstash/logback/util/ProxyOutputStream.java b/src/main/java/net/logstash/logback/util/ProxyOutputStream.java new file mode 100644 index 00000000..1ebff086 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ProxyOutputStream.java @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + + +/** + * A Proxy stream which acts as expected, that is it passes the method calls on + * to the proxied stream and doesn't change which methods are being called (unlike + * JDK {@link FilterOutputStream}). + * + * @author brenuart + */ +public class ProxyOutputStream extends OutputStream { + + protected OutputStream delegate; + + /** + * Constructs a new ProxyOutputStream. + * + * @param delegate the OutputStream to delegate to + */ + public ProxyOutputStream(final OutputStream delegate) { + this.delegate = delegate; + } + + /** + * Invokes the delegate's write(int) method. + * + * @param b the byte to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final int b) throws IOException { + try { + delegate.write(b); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's write(byte[]) method. + * + * @param b the bytes to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final byte[] b) throws IOException { + try { + delegate.write(b); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's write(byte[]) method. + * + * @param b the bytes to write + * @param off The start offset + * @param len The number of bytes to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + try { + delegate.write(b, off, len); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's flush() method. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void flush() throws IOException { + try { + delegate.flush(); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's close() method. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + try { + delegate.close(); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Handle any IOExceptions thrown. + *

+ * This method provides a point to implement custom exception handling. The + * default behavior is to re-throw the exception. + * + * @param e The IOException thrown + * @throws IOException if an I/O error occurs + */ + protected void handleIOException(final IOException e) throws IOException { + throw e; + } +} diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java new file mode 100644 index 00000000..94366fbe --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + + +/** + * A pool of {@link ReusableByteBuffer}. + * + *

The pool is technically unbounded but will never hold more buffers than the number of concurrent + * threads accessing it. Buffers are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + * @author brenuart + */ +public class ReusableByteBufferPool extends ObjectPool { + + /** + * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. + * + * @param initialCapacity the initial capacity of buffers created by this pool. + */ + private ReusableByteBufferPool(int initialCapacity) { + super(() -> new ReusableByteBuffer(initialCapacity)); + + } + + /** + * Return a buffer to the pool after usage. + * + * @param buffer the buffer to return to the pool. + */ + protected void releaseInstance(ReusableByteBuffer buffer) { + buffer.reset(); + super.release(buffer); + } + + + /** + * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. + * + * @param initialCapacity the initial capacity of buffers created by this pool. + * @return a new {@link ReusableByteBufferPool} + */ + public static ReusableByteBufferPool create(int initialCapacity) { + if (initialCapacity <= 0) { + throw new IllegalArgumentException("initialCapacity must be greater than 0"); + } + return new ReusableByteBufferPool(initialCapacity); + } +} diff --git a/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java b/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java deleted file mode 100644 index cb970c01..00000000 --- a/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.util; - -import java.io.Closeable; -import java.io.IOException; -import java.lang.ref.SoftReference; -import java.util.Deque; -import java.util.Objects; -import java.util.concurrent.ConcurrentLinkedDeque; - -import net.logstash.logback.composite.CompositeJsonFormatter; -import net.logstash.logback.composite.CompositeJsonFormatter.JsonFormatter; - -import ch.qos.logback.core.spi.DeferredProcessingAware; - -/** - * Pool of {@link ReusableJsonFormatter} that can be safely reused multiple times. - * A {@link ReusableJsonFormatter} is made of an internal {@link ReusableByteBuffer} and a - * {@link CompositeJsonFormatter.JsonFormatter} bound to it. - * - *

Instances must be returned to the pool after use by calling {@link ReusableJsonFormatter#close()} - * or {@link #release(net.logstash.logback.util.ReusableJsonFormatterPool.ReusableJsonFormatter) - * release(ReusableJsonFormatter)}. - * - * Instances are not recycled (and therefore not returned to the pool) after their internal - * {@link CompositeJsonFormatter.JsonFormatter} threw an exception. This is to prevent reusing an - * instance whose internal components are potentially in an unpredictable state. - * - *

The internal byte buffer is created with an initial size of {@link #minBufferSize}. - * The buffer automatically grows above the {@code #minBufferSize} when needed to - * accommodate with larger events. However, only the first {@code minBufferSize} bytes - * will be reused by subsequent invocations. It is therefore strongly advised to set - * the minimum size at least equal to the average size of the encoded events to reduce - * unnecessary memory allocations and reduce pressure on the garbage collector. - * - *

The pool is technically unbounded but will never hold more entries than the number of concurrent - * threads accessing it. Entries are kept in the pool using soft references so they can be garbage - * collected by the JVM when running low in memory. - * - * @author brenuart - */ -public class ReusableJsonFormatterPool { - - /** - * The minimum size of the byte buffer used when encoding events. - */ - private final int minBufferSize; - - /** - * The factory used to create {@link JsonFormatter} instances - */ - private final CompositeJsonFormatter formatterFactory; - - /** - * The pool of reusable JsonFormatter instances. - * May be cleared by the GC when running low in memory. - * - * Note: - * JsonFormatters are not explicitly disposed when the GC clears the SoftReference. This means that - * the underlying Jackson JsonGenerator is not explicitly closed and the associated memory buffers - * are not returned to Jackson's internal memory pools. - * This behavior is desired and makes the associated memory immediately reclaimable - which is what - * we need since we are "running low in memory". - */ - private volatile SoftReference> formatters = new SoftReference<>(null); - - - public ReusableJsonFormatterPool(CompositeJsonFormatter formatterFactory, int minBufferSize) { - this.formatterFactory = Objects.requireNonNull(formatterFactory); - this.minBufferSize = minBufferSize; - } - - /** - * A reusable JsonFormatter holding a JsonFormatter writing inside a dedicated {@link ReusableByteBuffer}. - * Closing the instance returns it to the pool and makes it available for subsequent usage, unless the - * underlying {@link CompositeJsonFormatter.JsonFormatter} threw an exception during its use. - * - *

Note: usage is not thread-safe. - */ - public class ReusableJsonFormatter implements Closeable { - private ReusableByteBuffer buffer; - private CompositeJsonFormatter.JsonFormatter formatter; - private boolean recyclable = true; - - ReusableJsonFormatter(ReusableByteBuffer buffer, CompositeJsonFormatter.JsonFormatter formatter) { - this.buffer = Objects.requireNonNull(buffer); - this.formatter = Objects.requireNonNull(formatter); - } - - /** - * Return the underlying buffer into which the JsonFormatter is writing. - * - * @return the underlying byte buffer - */ - public ReusableByteBuffer getBuffer() { - assertNotDisposed(); - return buffer; - } - - /** - * Write the Event in JSON format into the enclosed buffer using the enclosed JsonFormatter. - * - * @param event the event to write - * @throws IOException thrown when the JsonFormatter has problem to convert the Event into JSON format - */ - public void write(Event event) throws IOException { - assertNotDisposed(); - - try { - this.formatter.writeEvent(event); - - } catch (IOException e) { - // Do not recycle the instance after an exception is thrown: the underlying - // JsonGenerator may not be in a safe state. - this.recyclable = false; - throw e; - } - } - - /** - * Close the JsonFormatter, release associated resources and return it to the pool. - */ - @Override - public void close() throws IOException { - release(this); - } - - /** - * Dispose associated resources - */ - protected void dispose() { - try { - this.formatter.close(); - } catch (IOException e) { - // ignore and proceed - } - - this.formatter = null; - this.buffer = null; - } - - protected boolean isDisposed() { - return buffer == null; - } - - protected void assertNotDisposed() { - if (isDisposed()) { - throw new IllegalStateException("Instance has been disposed and cannot be used anymore. Did you keep a reference to it after it is closed?"); - } - } - } - - - /** - * Get a {@link ReusableJsonFormatter} out of the pool, creating a new one if needed. - * The instance must be closed after use to return it to the pool. - * - * @return a {@link ReusableJsonFormatter} - * @throws IOException thrown when unable to create a new instance - */ - public ReusableJsonFormatter acquire() throws IOException { - ReusableJsonFormatter reusableFormatter = null; - - Deque cachedFormatters = formatters.get(); - if (cachedFormatters != null) { - reusableFormatter = cachedFormatters.poll(); - } - - if (reusableFormatter == null) { - reusableFormatter = createJsonFormatter(); - } - - return reusableFormatter; - } - - - /** - * Return an instance to the pool. - * An alternative is to call {@link ReusableJsonFormatter#close()}. - * - * @param reusableFormatter the instance to return to the pool - */ - public void release(ReusableJsonFormatter reusableFormatter) { - if (reusableFormatter == null) { - return; - } - - /* - * Dispose the formatter instead of returning to the pool when marked not recyclable - */ - if (!reusableFormatter.recyclable) { - reusableFormatter.dispose(); - return; - } - - Deque cachedFormatters = this.formatters.get(); - if (cachedFormatters == null) { - cachedFormatters = new ConcurrentLinkedDeque<>(); - this.formatters = new SoftReference<>(cachedFormatters); - } - - /* - * Reset the internal buffer and return the cached JsonFormatter to the pool. - */ - reusableFormatter.getBuffer().reset(); - cachedFormatters.addFirst(reusableFormatter); // try to reuse the same as much as we can -> add it first - } - - - /** - * Create a new {@link ReusableJsonFormatter} instance by allocating a new {@link ReusableByteBuffer} - * and a {@link CompositeJsonFormatter.JsonFormatter} bound to it. - * - * @return a new {@link ReusableJsonFormatter} - * @throws IOException thrown when the {@link CompositeJsonFormatter} is unable to create a new instance - * of {@link CompositeJsonFormatter.JsonFormatter}. - */ - protected ReusableJsonFormatter createJsonFormatter() throws IOException { - ReusableByteBuffer buffer = new ReusableByteBuffer(this.minBufferSize); - CompositeJsonFormatter.JsonFormatter jsonFormatter = this.formatterFactory.createJsonFormatter(buffer); - return new ReusableJsonFormatter(buffer, jsonFormatter); - } -} diff --git a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java index f8e10c04..16210666 100644 --- a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java +++ b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java @@ -16,16 +16,27 @@ package net.logstash.logback.composite.loggingevent; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import net.logstash.logback.argument.StructuredArguments; -import net.logstash.logback.composite.CompositeJsonFormatter.JsonFormatter; -import net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder; +import net.logstash.logback.composite.AbstractJsonProvider; +import net.logstash.logback.decorate.JsonGeneratorDecorator; +import net.logstash.logback.decorate.NullJsonGeneratorDecorator; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.ContextAware; +import com.fasterxml.jackson.core.JsonGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -34,7 +45,7 @@ @ExtendWith(MockitoExtension.class) public class LoggingEventCompositeJsonFormatterTest { - private LoggingEventCompositeJsonFormatter formatter = new LoggingEventCompositeJsonFormatter(new LoggingEventCompositeJsonEncoder()); + private LoggingEventCompositeJsonFormatter formatter = new LoggingEventCompositeJsonFormatter(mock(ContextAware.class)); @Mock private ILoggingEvent event; @@ -53,9 +64,73 @@ public void testDoesNotFailOnEmptyBeans() throws IOException { * This should not throw an exception, since SerializationFeature.FAIL_ON_EMPTY_BEANS is disabled */ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { - JsonFormatter jsonFormatter = formatter.createJsonFormatter(bos); - assertThatCode(() -> jsonFormatter.writeEvent(event)).doesNotThrowAnyException(); + assertThatCode(() -> formatter.writeEvent(event, bos)).doesNotThrowAnyException(); + } + } + + + /* + * JsonFormatter reused by subsequent invocations + */ + @Test + public void testReused() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.start(); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + formatter.writeEvent(event, bos); + formatter.writeEvent(event, bos); + + // Only once instance should have been created + verify(decorator, times(1)).decorate(any(JsonGenerator.class)); + } + } + + + /* + * JsonFormatter not reused after exception thrown by underlying OutputStream + */ + @Test + public void testNotReusedAfterIOException() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.start(); + + OutputStream bos = mock(OutputStream.class); + doThrow(new IOException()) + .doCallRealMethod() + .when(bos).write(any(byte[].class), any(int.class), any(int.class)); + + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + formatter.writeEvent(event, bos); + + // Two instances created because the first was discarded because of the exception + verify(decorator, times(2)).decorate(any(JsonGenerator.class)); + } + + + /* + * JsonFormatter not reused after exception thrown while calling registered JsonProviders + */ + @Test + public void testNotReusedAfterEncoderException() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.getProviders().addProvider(new AbstractJsonProvider() { + @Override + public void writeTo(JsonGenerator generator, ILoggingEvent event) throws IOException { + throw new IOException("Exception thrown by JsonProvider"); + } + }); + formatter.start(); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + + // Two instances created because the first was discarded because of the exception + verify(decorator, times(2)).decorate(any(JsonGenerator.class)); } } - } diff --git a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java index 5412036b..a667ab97 100644 --- a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java +++ b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java @@ -22,11 +22,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,19 +34,19 @@ import net.logstash.logback.TestJsonProvider; import net.logstash.logback.composite.CompositeJsonFormatter; +import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Context; +import ch.qos.logback.core.BasicStatusManager; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import ch.qos.logback.core.status.OnConsoleStatusListener; import ch.qos.logback.core.status.StatusManager; -import ch.qos.logback.core.status.WarnStatus; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonGenerator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -56,27 +54,32 @@ @ExtendWith(MockitoExtension.class) public class CompositeJsonEncoderTest { - @InjectMocks private final TestCompositeJsonEncoder encoder = new TestCompositeJsonEncoder(); private CompositeJsonFormatter formatter; - @Mock(lenient = true) - private Context context; + private LoggerContext context = new LoggerContext(); - @Mock - private StatusManager statusManager; + private StatusManager statusManager = new BasicStatusManager(); @Mock private ILoggingEvent event; @BeforeEach public void setup() { + // Output statuses on the console for easy debugging. Must be initialized early to capture + // warnings emitted by setter/getter methods before the appender is started. + OnConsoleStatusListener consoleListener = new OnConsoleStatusListener(); + consoleListener.start(); + this.statusManager.add(consoleListener); + + this.context.setStatusManager(statusManager); + // suppress line separator to make test platform independent this.encoder.setLineSeparator(""); - this.formatter = encoder.getFormatter(); + this.encoder.setContext(context); - when(context.getStatusManager()).thenReturn(statusManager); + this.formatter = encoder.getFormatter(); } @@ -95,7 +98,7 @@ public void startStop() { assertThat(encoder.isStarted()).isTrue(); assertThat(formatter.isStarted()).isTrue(); assertThat(prefix.isStarted()).isTrue(); - verify(formatter).setContext(context); + assertThat(formatter.getContext()).isEqualTo(context); // providers are not started a second time encoder.start(); @@ -222,13 +225,13 @@ public void testLineEndings() { */ @Test public void testIOException() throws IOException { - encoder.exceptionToThrow = new IOException(); + encoder.exceptionToThrow = new IOException(); encoder.start(); encoder.encode(event); - verify(statusManager).add(new WarnStatus("Error encountered while encoding log event. " - + "Event: " + event, context, encoder.exceptionToThrow)); + assertThat(statusManager.getCopyOfStatusList()) + .anyMatch(s -> s.getMessage().startsWith("Error encountered while encoding log event.")); } @@ -246,8 +249,8 @@ public void testIOException_streaming() throws IOException { assertThatCode(() -> encoder.encode(event, stream)).isInstanceOf(IOException.class); - verify(statusManager, never()).add(new WarnStatus("Error encountered while encoding log event. " - + "Event: " + event, context, exception)); + assertThat(statusManager.getCopyOfStatusList()) + .noneMatch(s -> s.getMessage().startsWith("Error encountered while encoding log event.")); } @@ -255,7 +258,7 @@ public void testIOException_streaming() throws IOException { private static class TestCompositeJsonEncoder extends CompositeJsonEncoder { - private IOException exceptionToThrow; + private IOException exceptionToThrow; @Override protected CompositeJsonFormatter createFormatter() { diff --git a/src/test/java/net/logstash/logback/util/ObjectPoolTest.java b/src/test/java/net/logstash/logback/util/ObjectPoolTest.java new file mode 100644 index 00000000..916dd184 --- /dev/null +++ b/src/test/java/net/logstash/logback/util/ObjectPoolTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import net.logstash.logback.util.ObjectPool.Lifecycle; + +import org.junit.jupiter.api.Test; + +/** + * @author brenuart + * + */ +public class ObjectPoolTest { + + private ObjectPool pool = new ObjectPool<>(this::createInstance); + + + @Test + public void testReuse() { + PooledObject obj1 = pool.acquire(); + assertThat(obj1).isNotNull(); + + // Acquire a second instance and make sure not same as first + PooledObject obj2 = pool.acquire(); + assertThat(obj2).isNotNull(); + assertThat(obj1).isNotSameAs(obj2); + + // Release second and re-acquire - should be the same + pool.release(obj2); + PooledObject obj3 = pool.acquire(); + assertThat(obj3).isSameAs(obj2); + } + + + /* + * Assert Lifecyle#recycle() is invoked when instance is returned to the pool + */ + @Test + public void testRecycle() { + PooledObject obj1 = pool.acquire(); + pool.release(obj1); + + verify(obj1, times(1)).recycle(); + } + + + /* + * Assert instance is disposed and not returned to the pool when Lifecycle#recycle() + * returns false + */ + @Test + public void testNotRecyclable() { + PooledObject obj1 = pool.acquire(); + when(obj1.recycle()).thenReturn(false); + + pool.release(obj1); + + verify(obj1, times(1)).recycle(); + verify(obj1, times(1)).dispose(); + + assertThat(pool.acquire()).isNotSameAs(obj1); + } + + + /* + * Assert pooled instance are disposed when calling #clear() + */ + @Test + public void testClear() { + PooledObject obj1 = pool.acquire(); + pool.release(obj1); + assertThat(pool.size()).isEqualTo(1); + + pool.clear(); + verify(obj1, times(1)).dispose(); + + assertThat(pool.size()).isZero(); + } + + + /* + * Releasing a "null" instance should not throw any exception + */ + @Test + public void testReleaseNull() { + assertThatCode(() -> pool.release(null)).doesNotThrowAnyException(); + } + + + /* + * NullPointer exception thrown if factory returns null + */ + @Test + public void testFactoryReturnsNull() { + pool = new ObjectPool(() -> null); + assertThatThrownBy(() -> pool.acquire()).isInstanceOf(NullPointerException.class); + } + + + /* + * Exception thrown by the factory is propagated to ObjectPool#acquire() + */ + @Test + public void testFactoryThrowsException() { + RuntimeException e = new RuntimeException(); + + pool = new ObjectPool(() -> { + throw e; + }); + + assertThatThrownBy(() -> pool.acquire()).isSameAs(e); + } + + + private PooledObject createInstance() { + return spy(new PooledObject()); + } + + public static class PooledObject implements ObjectPool.Lifecycle { + @Override + public boolean recycle() { + return Lifecycle.super.recycle(); + } + + @Override + public void dispose() { + Lifecycle.super.dispose(); + } + } +}