From 62416950f5c4ba20be4f1689c0474c389a146c9f Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Tue, 14 Sep 2021 17:47:00 +0200 Subject: [PATCH 1/4] Refactor JsonGenerator pooling Motivations: (1) For the pooling to be efficient, the feature `USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING` must be disabled when creating JsonGenerator. If not, JsonGenerator creates additional buffers and re-use them per-thread. This pattern is not applicable in our case: there is no relationship between the JsonGenerator to use and the current thread. Pooling Jsongenerator instances and the creation/configuration of these instances (disabling the feature) are therefore related and should ideally be implemented close together, in the same class. (2) Pooling is required only because CompositeJsonFormatter uses Jackson under the cover (JsonGenerator must be given an OutputStream when created). The pooling logic should therefore be isolated and hidden inside the CompositeJsonFormatter itself and considered an implementation detail. This would also lead to a cleaner interface with a single `write(Event event, OutputStream out)` method. This method can be used to write an event to whatever output stream without having to care about pooling at all... (3) The current implementation creates a ReusableByteBuffer and connects the JsonGenerator to it at creation time. They are both pooled at the same time. Content is first generated in the byte buffer before it can be copied in the output stream passed as argument to the write method. This intermediate buffer somehow limits the streaming capability of the implementation. This commit now connects the JsonGenerator to a "DisconnectedOutputStream" when it is created. When the `write(event, out)` method is called, the output stream of the JsonGenerator is connected to the one passed as argument before the generator is invoked. Content produced by the generator is therefore written directly in the target output stream without requiring an intermediate buffer. It is now up to the caller to decide if it needs an intermediate buffer or not... Pooling and buffering are two separate concerns that are now handled separately. (4) The pooling logic is now handled by the `ObjectPool` class and is reused by both the ReusableByteBufferPool and the CompositeJsonFormatter. --- .../AbstractLogstashTcpSocketAppender.java | 33 ++- .../composite/CompositeJsonFormatter.java | 107 ++++++-- .../logback/encoder/CompositeJsonEncoder.java | 43 ++-- .../logback/layout/CompositeJsonLayout.java | 30 ++- .../net/logstash/logback/util/ObjectPool.java | 202 +++++++++++++++ .../logback/util/ProxyOutputStream.java | 130 ++++++++++ .../logback/util/ReusableByteBufferPool.java | 63 +++++ .../util/ReusableJsonFormatterPool.java | 237 ------------------ ...oggingEventCompositeJsonFormatterTest.java | 87 ++++++- .../encoder/CompositeJsonEncoderTest.java | 41 +-- .../logstash/logback/util/ObjectPoolTest.java | 151 +++++++++++ 11 files changed, 814 insertions(+), 310 deletions(-) create mode 100644 src/main/java/net/logstash/logback/util/ObjectPool.java create mode 100644 src/main/java/net/logstash/logback/util/ProxyOutputStream.java create mode 100644 src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java delete mode 100644 src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java create mode 100644 src/test/java/net/logstash/logback/util/ObjectPoolTest.java diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 04682266..857f4db5 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -52,8 +52,10 @@ import net.logstash.logback.appender.destination.DestinationParser; import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy; import net.logstash.logback.appender.listener.TcpAppenderListener; +import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.SeparatorParser; import net.logstash.logback.encoder.StreamingEncoder; +import net.logstash.logback.util.ReusableByteBuffer; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.joran.spi.DefaultClass; @@ -350,6 +352,12 @@ private class TcpSendingEventHandler implements EventHandler>, L */ private Future readerFuture; + /** + * Intermediate ByteBuffer used to store content generated by {@link StreamingEncoder}. + * Stays uninitialized if encoder is a "raw" {@link Encoder}. + */ + private final ReusableByteBuffer buffer; + /** * When run, if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * has elapsed since the last event was sent, @@ -363,7 +371,7 @@ private class TcpSendingEventHandler implements EventHandler>, L * if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} * has elapsed since the last event was sent, * then the event handler will send the {@link AbstractLogstashTcpSocketAppender#keepAliveMessage} - * to the socket outputstream. + * to the socket OutputStream. * */ private class KeepAliveRunnable implements Runnable { @@ -504,6 +512,18 @@ public void run() { } } + + TcpSendingEventHandler() { + if (encoder instanceof CompositeJsonEncoder) { + this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder) encoder).getMinBufferSize()); + } else if (encoder instanceof StreamingEncoder) { + this.buffer = new ReusableByteBuffer(); + } else { + this.buffer = null; + } + } + + @Override public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception { @@ -617,7 +637,16 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent) encoder).encode(event, outputStream); + /* + * Generate content in a temporary buffer to avoid writing "partial" content in the output + * stream if the Encoder throws an exception. + */ + try { + ((StreamingEncoder) encoder).encode(event, buffer); + buffer.writeTo(outputStream); + } finally { + buffer.reset(); + } } else { byte[] data = encoder.encode(event); if (data != null) { diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index 3dc8f8ad..a80fdf77 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -25,6 +25,8 @@ import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.decorate.NullJsonFactoryDecorator; import net.logstash.logback.decorate.NullJsonGeneratorDecorator; +import net.logstash.logback.util.ObjectPool; +import net.logstash.logback.util.ProxyOutputStream; import ch.qos.logback.access.spi.IAccessEvent; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -32,6 +34,7 @@ import ch.qos.logback.core.spi.ContextAwareBase; import ch.qos.logback.core.spi.DeferredProcessingAware; import ch.qos.logback.core.spi.LifeCycle; +import ch.qos.logback.core.util.CloseUtil; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory.Feature; @@ -41,13 +44,23 @@ /** * Formats logstash Events as JSON using {@link JsonProvider}s. - *

* - * The {@link CompositeJsonFormatter} starts the JSON object ('{'), + *

The {@link CompositeJsonFormatter} starts the JSON object ('{'), * then delegates writing the contents of the object to the {@link JsonProvider}s, * and then ends the JSON object ('}'). * + *

Jackson {@link JsonGenerator} are initially created with a "disconnected" output stream so they can be + * reused multiple times with different target output stream. They are kept in an internal pool whose + * size is technically unbounded. It will however never hold more entries than the number of concurrent + * threads accessing it. Entries are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + *

{@link JsonGenerator} instances are *not* reused after they threw an exception. This is to prevent + * reusing an instance whose internal state may be unpredictable. + * * @param type of event ({@link ILoggingEvent} or {@link IAccessEvent}). + * + * @author brenuart */ public abstract class CompositeJsonFormatter extends ContextAwareBase implements LifeCycle { @@ -80,6 +93,9 @@ public abstract class CompositeJsonFormatter pool; + + public CompositeJsonFormatter(ContextAware declaredOrigin) { super(declaredOrigin); } @@ -102,12 +118,15 @@ public void start() { jsonProviders.setContext(context); jsonProviders.setJsonFactory(jsonFactory); jsonProviders.start(); + + pool = new ObjectPool<>(this::createJsonFormatter); started = true; } @Override public void stop() { if (isStarted()) { + pool.clear(); jsonProviders.stop(); jsonFactory = null; started = false; @@ -119,40 +138,97 @@ public boolean isStarted() { return started; } + /** - * Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}. + * Write an event in the given output stream. * - * @param outputStream the output stream used by the {@link JsonFormatter} - * @return {@link JsonFormatter} writing JSON content in the output stream - * @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content + * @param event the event to write + * @param outputStream the output stream to write the event into + * @throws IOException thrown upon failure to write the event */ - public JsonFormatter createJsonFormatter(OutputStream outputStream) throws IOException { + public void writeEvent(Event event, OutputStream outputStream) throws IOException { + Objects.requireNonNull(outputStream); if (!isStarted()) { throw new IllegalStateException("Formatter is not started"); } - JsonGenerator generator = createGenerator(outputStream); - return new JsonFormatter(generator); + try (JsonFormatter formatter = this.pool.acquire()) { + formatter.writeEvent(outputStream, event); + } } - public class JsonFormatter implements Closeable { + /** + * Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}. + * + * @param stream the output stream used by the {@link JsonFormatter} + * @return {@link JsonFormatter} writing JSON content in the output stream + * @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content + */ + private JsonFormatter createJsonFormatter() { + try { + DisconnectedOutputStream outputStream = new DisconnectedOutputStream(); + JsonGenerator generator = createGenerator(outputStream); + return new JsonFormatter(outputStream, generator); + } catch (IOException e) { + throw new IllegalStateException("Unable to initialize Jackson JSON layer", e); + } + + } + + private class JsonFormatter implements ObjectPool.Lifecycle, Closeable { private final JsonGenerator generator; + private final DisconnectedOutputStream stream; + private boolean recyclable = true; - public JsonFormatter(JsonGenerator generator) { + JsonFormatter(DisconnectedOutputStream outputStream, JsonGenerator generator) { + this.stream = Objects.requireNonNull(outputStream); this.generator = Objects.requireNonNull(generator); } - public void writeEvent(Event event) throws IOException { - writeEventToGenerator(generator, event); + public void writeEvent(OutputStream outputStream, Event event) throws IOException { + try { + this.stream.connect(outputStream); + writeEventToGenerator(generator, event); + + } catch (IOException | RuntimeException e) { + this.recyclable = false; + throw e; + + } finally { + this.stream.disconnect(); + } + } + + @Override + public boolean recycle() { + return this.recyclable; + } + + @Override + public void dispose() { + CloseUtil.closeQuietly(this.generator); } @Override public void close() throws IOException { - this.generator.close(); + CompositeJsonFormatter.this.pool.release(this); } } + private static class DisconnectedOutputStream extends ProxyOutputStream { + DisconnectedOutputStream() { + super(null); + } + + public void connect(OutputStream out) { + this.delegate = out; + } + + public void disconnect() { + this.delegate = null; + } + } private JsonFactory createJsonFactory() { ObjectMapper objectMapper = new ObjectMapper() @@ -187,9 +263,6 @@ private JsonFactory decorateFactory(JsonFactory factory) { } protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException { - if (!isStarted()) { - throw new IllegalStateException("Encoding attempted before starting."); - } generator.writeStartObject(); jsonProviders.writeTo(generator, event); generator.writeEndObject(); diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index ccec3dfb..b7860743 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -24,7 +24,8 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; -import net.logstash.logback.util.ReusableJsonFormatterPool; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; @@ -37,7 +38,7 @@ public abstract class CompositeJsonEncoderThe buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes @@ -47,12 +48,16 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; private final CompositeJsonFormatter formatter; - private ReusableJsonFormatterPool formatterPool; - + private String lineSeparator = System.lineSeparator(); private byte[] lineSeparatorBytes; @@ -72,10 +77,7 @@ public void encode(Event event, OutputStream outputStream) throws IOException { throw new IllegalStateException("Encoder is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - encode(cachedFormatter, event); - cachedFormatter.getBuffer().writeTo(outputStream); - } + encode(outputStream, event); } @Override @@ -84,21 +86,26 @@ public byte[] encode(Event event) { throw new IllegalStateException("Encoder is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - encode(cachedFormatter, event); - return cachedFormatter.getBuffer().toByteArray(); + ReusableByteBuffer buffer = bufferPool.acquire(); + + try { + encode(buffer, event); + return buffer.toByteArray(); } catch (IOException e) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; + + } finally { + bufferPool.release(buffer); } } - private void encode(ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter, Event event) throws IOException { - encode(prefix, event, cachedFormatter.getBuffer()); - cachedFormatter.write(event); - encode(suffix, event, cachedFormatter.getBuffer()); - cachedFormatter.getBuffer().write(lineSeparatorBytes); + private void encode(OutputStream outputStream, Event event) throws IOException { + encode(prefix, event, outputStream); + formatter.writeEvent(event, outputStream); + encode(suffix, event, outputStream); + outputStream.write(lineSeparatorBytes); } private void encode(Encoder encoder, Event event, OutputStream outputStream) throws IOException { @@ -127,7 +134,7 @@ public void start() { startWrapped(prefix); startWrapped(suffix); - this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize); + this.bufferPool = ReusableByteBufferPool.create(minBufferSize); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -171,7 +178,7 @@ public void stop() { stopWrapped(prefix); stopWrapped(suffix); - this.formatterPool = null; + bufferPool = null; } } diff --git a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java index 7356e41c..59198339 100644 --- a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java +++ b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java @@ -16,6 +16,7 @@ package net.logstash.logback.layout; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.Objects; @@ -26,7 +27,8 @@ import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.SeparatorParser; -import net.logstash.logback.util.ReusableJsonFormatterPool; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.Layout; import ch.qos.logback.core.LayoutBase; @@ -60,10 +62,13 @@ public abstract class CompositeJsonLayout * unnecessary memory allocations and reduce pressure on the garbage collector. */ private int minBufferSize = 1024; - + + /** + * Pool of reusable byte buffers + */ + private ReusableByteBufferPool bufferPool; private final CompositeJsonFormatter formatter; - private ReusableJsonFormatterPool formatterPool; public CompositeJsonLayout() { super(); @@ -78,21 +83,24 @@ public String doLayout(Event event) { throw new IllegalStateException("Layout is not started"); } - try (ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter = formatterPool.acquire()) { - writeEvent(cachedFormatter, event); - return new String(cachedFormatter.getBuffer().toByteArray()); + ReusableByteBuffer buffer = bufferPool.acquire(); + try { + writeEvent(buffer, event); + return new String(buffer.toByteArray()); } catch (IOException e) { addWarn("Error formatting logging event", e); return null; + } finally { + bufferPool.release(buffer); } } - private void writeEvent(ReusableJsonFormatterPool.ReusableJsonFormatter cachedFormatter, Event event) throws IOException { - try (Writer writer = new OutputStreamWriter(cachedFormatter.getBuffer())) { + private void writeEvent(OutputStream outputStream, Event event) throws IOException { + try (Writer writer = new OutputStreamWriter(outputStream)) { writeLayout(prefix, writer, event); - cachedFormatter.write(event); + formatter.writeEvent(event, outputStream); writeLayout(suffix, writer, event); if (lineSeparator != null) { @@ -128,7 +136,7 @@ public void start() { startWrapped(prefix); startWrapped(suffix); - this.formatterPool = new ReusableJsonFormatterPool<>(formatter, minBufferSize); + this.bufferPool = ReusableByteBufferPool.create(minBufferSize); } private void startWrapped(Layout wrapped) { @@ -162,7 +170,7 @@ public void stop() { stopWrapped(prefix); stopWrapped(suffix); - this.formatterPool = null; + this.bufferPool = null; } private void stopWrapped(Layout wrapped) { diff --git a/src/main/java/net/logstash/logback/util/ObjectPool.java b/src/main/java/net/logstash/logback/util/ObjectPool.java new file mode 100644 index 00000000..6dd620f8 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ObjectPool.java @@ -0,0 +1,202 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.lang.ref.SoftReference; +import java.util.Deque; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Supplier; + + +/** + * Pool of reusable object instances. + * + *

Instances are obtained from the pool by calling {@link #acquire()} and must be returned after use + * by calling {@link #release(Object)}. If not, the instance is simply reclaimed by the garbage collector. + * + *

Instance may also implement the optional {@link ObjectPool.Lifecycle} interface if they wish to be + * notified when they are recycled or disposed. + * + *

The pool is technically unbounded but will never hold more entries than the number of concurrent + * threads accessing it. Entries are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + *

The pool can be cleared at any time by calling {@link #clear()} in which case instances currently + * in the pool will be disposed. + * + * @param type of pooled instances. + * + * @author brenuart + */ +public class ObjectPool { + + /** + * The factory used to create new instances + */ + private final Supplier factory; + + /** + * Instances pool + */ + private volatile SoftReference> poolRef = new SoftReference<>(null); + + + /** + * Create a new instance of the pool. + * + * @param factory the factory used to create new instances. + */ + public ObjectPool(Supplier factory) { + this.factory = Objects.requireNonNull(factory); + } + + /** + * Get an instance out of the pool, creating a new one if needed. + * The instance must be returned to the pool by calling {@link #release(Object)}. If not the + * instance is disposed by the garbage collector. + * + * @return a pooled instance or a new one if none is available + */ + public final T acquire() { + T instance = null; + + Deque pool = this.poolRef.get(); + if (pool != null) { + instance = pool.poll(); + } + + if (instance == null) { + instance = Objects.requireNonNull(createNewInstance()); + } + + return instance; + } + + + /** + * Return an instance to the pool or dispose it if it cannot be recycled. + * + * @param instance the instance to return to the pool + */ + public final void release(T instance) { + if (instance == null) { + return; + } + if (!recycleInstance(instance)) { + disposeInstance(instance); + return; + } + + Deque pool = this.poolRef.get(); + if (pool == null) { + pool = new ConcurrentLinkedDeque<>(); + this.poolRef = new SoftReference<>(pool); + } + + pool.addFirst(instance); // try to reuse the same as much as we can -> add it first + } + + + /** + * Clear the object pool and dispose instances it may contain + */ + public void clear() { + Deque pool = this.poolRef.get(); + if (pool != null) { + while (!pool.isEmpty()) { + disposeInstance(pool.poll()); + } + } + } + + + /** + * Get the number of instances currently in the pool + * + * @return the number of instances in the pool + */ + public int size() { + Deque pool = this.poolRef.get(); + if (pool != null) { + return pool.size(); + } else { + return 0; + } + } + + + /** + * Create a new object instance. + * + * @return a new object instance + */ + protected T createNewInstance() { + return this.factory.get(); + } + + + /** + * Dispose the object instance by calling its life cycle methods. + * + * @param instance the instance to dispose + */ + protected void disposeInstance(T instance) { + if (instance instanceof Lifecycle) { + ((Lifecycle) instance).dispose(); + } + } + + + /** + * Recycle the instance before returning it to the pool. + * + * @param instance the instance to recycle + * @return {@code true} if the instance can be recycled and returned to the pool, {@code false} if not. + */ + protected boolean recycleInstance(T instance) { + if (instance instanceof Lifecycle) { + return ((Lifecycle) instance).recycle(); + } else { + return true; + } + } + + + /** + * Optional interface that pooled instances may implement if they wish to be notified of + * life cycle events. + */ + public interface Lifecycle { + /** + * Indicate whether the instance can be recycled and returned to the pool and perform + * the necessary recycling tasks. + * + * @return {@code true} if the instance can be returned to the pool, {@code false} if + * it must be disposed instead. + */ + default boolean recycle() { + return true; + } + + /** + * Dispose the instance and free allocated resources. + */ + default void dispose() { + // noop + } + } +} diff --git a/src/main/java/net/logstash/logback/util/ProxyOutputStream.java b/src/main/java/net/logstash/logback/util/ProxyOutputStream.java new file mode 100644 index 00000000..1ebff086 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ProxyOutputStream.java @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + + +/** + * A Proxy stream which acts as expected, that is it passes the method calls on + * to the proxied stream and doesn't change which methods are being called (unlike + * JDK {@link FilterOutputStream}). + * + * @author brenuart + */ +public class ProxyOutputStream extends OutputStream { + + protected OutputStream delegate; + + /** + * Constructs a new ProxyOutputStream. + * + * @param delegate the OutputStream to delegate to + */ + public ProxyOutputStream(final OutputStream delegate) { + this.delegate = delegate; + } + + /** + * Invokes the delegate's write(int) method. + * + * @param b the byte to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final int b) throws IOException { + try { + delegate.write(b); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's write(byte[]) method. + * + * @param b the bytes to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final byte[] b) throws IOException { + try { + delegate.write(b); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's write(byte[]) method. + * + * @param b the bytes to write + * @param off The start offset + * @param len The number of bytes to write + * @throws IOException if an I/O error occurs + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + try { + delegate.write(b, off, len); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's flush() method. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void flush() throws IOException { + try { + delegate.flush(); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Invokes the delegate's close() method. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + try { + delegate.close(); + } catch (final IOException e) { + handleIOException(e); + } + } + + /** + * Handle any IOExceptions thrown. + *

+ * This method provides a point to implement custom exception handling. The + * default behavior is to re-throw the exception. + * + * @param e The IOException thrown + * @throws IOException if an I/O error occurs + */ + protected void handleIOException(final IOException e) throws IOException { + throw e; + } +} diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java new file mode 100644 index 00000000..94366fbe --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + + +/** + * A pool of {@link ReusableByteBuffer}. + * + *

The pool is technically unbounded but will never hold more buffers than the number of concurrent + * threads accessing it. Buffers are kept in the pool using soft references so they can be garbage + * collected by the JVM when running low in memory. + * + * @author brenuart + */ +public class ReusableByteBufferPool extends ObjectPool { + + /** + * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. + * + * @param initialCapacity the initial capacity of buffers created by this pool. + */ + private ReusableByteBufferPool(int initialCapacity) { + super(() -> new ReusableByteBuffer(initialCapacity)); + + } + + /** + * Return a buffer to the pool after usage. + * + * @param buffer the buffer to return to the pool. + */ + protected void releaseInstance(ReusableByteBuffer buffer) { + buffer.reset(); + super.release(buffer); + } + + + /** + * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. + * + * @param initialCapacity the initial capacity of buffers created by this pool. + * @return a new {@link ReusableByteBufferPool} + */ + public static ReusableByteBufferPool create(int initialCapacity) { + if (initialCapacity <= 0) { + throw new IllegalArgumentException("initialCapacity must be greater than 0"); + } + return new ReusableByteBufferPool(initialCapacity); + } +} diff --git a/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java b/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java deleted file mode 100644 index cb970c01..00000000 --- a/src/main/java/net/logstash/logback/util/ReusableJsonFormatterPool.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.util; - -import java.io.Closeable; -import java.io.IOException; -import java.lang.ref.SoftReference; -import java.util.Deque; -import java.util.Objects; -import java.util.concurrent.ConcurrentLinkedDeque; - -import net.logstash.logback.composite.CompositeJsonFormatter; -import net.logstash.logback.composite.CompositeJsonFormatter.JsonFormatter; - -import ch.qos.logback.core.spi.DeferredProcessingAware; - -/** - * Pool of {@link ReusableJsonFormatter} that can be safely reused multiple times. - * A {@link ReusableJsonFormatter} is made of an internal {@link ReusableByteBuffer} and a - * {@link CompositeJsonFormatter.JsonFormatter} bound to it. - * - *

Instances must be returned to the pool after use by calling {@link ReusableJsonFormatter#close()} - * or {@link #release(net.logstash.logback.util.ReusableJsonFormatterPool.ReusableJsonFormatter) - * release(ReusableJsonFormatter)}. - * - * Instances are not recycled (and therefore not returned to the pool) after their internal - * {@link CompositeJsonFormatter.JsonFormatter} threw an exception. This is to prevent reusing an - * instance whose internal components are potentially in an unpredictable state. - * - *

The internal byte buffer is created with an initial size of {@link #minBufferSize}. - * The buffer automatically grows above the {@code #minBufferSize} when needed to - * accommodate with larger events. However, only the first {@code minBufferSize} bytes - * will be reused by subsequent invocations. It is therefore strongly advised to set - * the minimum size at least equal to the average size of the encoded events to reduce - * unnecessary memory allocations and reduce pressure on the garbage collector. - * - *

The pool is technically unbounded but will never hold more entries than the number of concurrent - * threads accessing it. Entries are kept in the pool using soft references so they can be garbage - * collected by the JVM when running low in memory. - * - * @author brenuart - */ -public class ReusableJsonFormatterPool { - - /** - * The minimum size of the byte buffer used when encoding events. - */ - private final int minBufferSize; - - /** - * The factory used to create {@link JsonFormatter} instances - */ - private final CompositeJsonFormatter formatterFactory; - - /** - * The pool of reusable JsonFormatter instances. - * May be cleared by the GC when running low in memory. - * - * Note: - * JsonFormatters are not explicitly disposed when the GC clears the SoftReference. This means that - * the underlying Jackson JsonGenerator is not explicitly closed and the associated memory buffers - * are not returned to Jackson's internal memory pools. - * This behavior is desired and makes the associated memory immediately reclaimable - which is what - * we need since we are "running low in memory". - */ - private volatile SoftReference> formatters = new SoftReference<>(null); - - - public ReusableJsonFormatterPool(CompositeJsonFormatter formatterFactory, int minBufferSize) { - this.formatterFactory = Objects.requireNonNull(formatterFactory); - this.minBufferSize = minBufferSize; - } - - /** - * A reusable JsonFormatter holding a JsonFormatter writing inside a dedicated {@link ReusableByteBuffer}. - * Closing the instance returns it to the pool and makes it available for subsequent usage, unless the - * underlying {@link CompositeJsonFormatter.JsonFormatter} threw an exception during its use. - * - *

Note: usage is not thread-safe. - */ - public class ReusableJsonFormatter implements Closeable { - private ReusableByteBuffer buffer; - private CompositeJsonFormatter.JsonFormatter formatter; - private boolean recyclable = true; - - ReusableJsonFormatter(ReusableByteBuffer buffer, CompositeJsonFormatter.JsonFormatter formatter) { - this.buffer = Objects.requireNonNull(buffer); - this.formatter = Objects.requireNonNull(formatter); - } - - /** - * Return the underlying buffer into which the JsonFormatter is writing. - * - * @return the underlying byte buffer - */ - public ReusableByteBuffer getBuffer() { - assertNotDisposed(); - return buffer; - } - - /** - * Write the Event in JSON format into the enclosed buffer using the enclosed JsonFormatter. - * - * @param event the event to write - * @throws IOException thrown when the JsonFormatter has problem to convert the Event into JSON format - */ - public void write(Event event) throws IOException { - assertNotDisposed(); - - try { - this.formatter.writeEvent(event); - - } catch (IOException e) { - // Do not recycle the instance after an exception is thrown: the underlying - // JsonGenerator may not be in a safe state. - this.recyclable = false; - throw e; - } - } - - /** - * Close the JsonFormatter, release associated resources and return it to the pool. - */ - @Override - public void close() throws IOException { - release(this); - } - - /** - * Dispose associated resources - */ - protected void dispose() { - try { - this.formatter.close(); - } catch (IOException e) { - // ignore and proceed - } - - this.formatter = null; - this.buffer = null; - } - - protected boolean isDisposed() { - return buffer == null; - } - - protected void assertNotDisposed() { - if (isDisposed()) { - throw new IllegalStateException("Instance has been disposed and cannot be used anymore. Did you keep a reference to it after it is closed?"); - } - } - } - - - /** - * Get a {@link ReusableJsonFormatter} out of the pool, creating a new one if needed. - * The instance must be closed after use to return it to the pool. - * - * @return a {@link ReusableJsonFormatter} - * @throws IOException thrown when unable to create a new instance - */ - public ReusableJsonFormatter acquire() throws IOException { - ReusableJsonFormatter reusableFormatter = null; - - Deque cachedFormatters = formatters.get(); - if (cachedFormatters != null) { - reusableFormatter = cachedFormatters.poll(); - } - - if (reusableFormatter == null) { - reusableFormatter = createJsonFormatter(); - } - - return reusableFormatter; - } - - - /** - * Return an instance to the pool. - * An alternative is to call {@link ReusableJsonFormatter#close()}. - * - * @param reusableFormatter the instance to return to the pool - */ - public void release(ReusableJsonFormatter reusableFormatter) { - if (reusableFormatter == null) { - return; - } - - /* - * Dispose the formatter instead of returning to the pool when marked not recyclable - */ - if (!reusableFormatter.recyclable) { - reusableFormatter.dispose(); - return; - } - - Deque cachedFormatters = this.formatters.get(); - if (cachedFormatters == null) { - cachedFormatters = new ConcurrentLinkedDeque<>(); - this.formatters = new SoftReference<>(cachedFormatters); - } - - /* - * Reset the internal buffer and return the cached JsonFormatter to the pool. - */ - reusableFormatter.getBuffer().reset(); - cachedFormatters.addFirst(reusableFormatter); // try to reuse the same as much as we can -> add it first - } - - - /** - * Create a new {@link ReusableJsonFormatter} instance by allocating a new {@link ReusableByteBuffer} - * and a {@link CompositeJsonFormatter.JsonFormatter} bound to it. - * - * @return a new {@link ReusableJsonFormatter} - * @throws IOException thrown when the {@link CompositeJsonFormatter} is unable to create a new instance - * of {@link CompositeJsonFormatter.JsonFormatter}. - */ - protected ReusableJsonFormatter createJsonFormatter() throws IOException { - ReusableByteBuffer buffer = new ReusableByteBuffer(this.minBufferSize); - CompositeJsonFormatter.JsonFormatter jsonFormatter = this.formatterFactory.createJsonFormatter(buffer); - return new ReusableJsonFormatter(buffer, jsonFormatter); - } -} diff --git a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java index f8e10c04..16210666 100644 --- a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java +++ b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java @@ -16,16 +16,27 @@ package net.logstash.logback.composite.loggingevent; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import net.logstash.logback.argument.StructuredArguments; -import net.logstash.logback.composite.CompositeJsonFormatter.JsonFormatter; -import net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder; +import net.logstash.logback.composite.AbstractJsonProvider; +import net.logstash.logback.decorate.JsonGeneratorDecorator; +import net.logstash.logback.decorate.NullJsonGeneratorDecorator; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.ContextAware; +import com.fasterxml.jackson.core.JsonGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -34,7 +45,7 @@ @ExtendWith(MockitoExtension.class) public class LoggingEventCompositeJsonFormatterTest { - private LoggingEventCompositeJsonFormatter formatter = new LoggingEventCompositeJsonFormatter(new LoggingEventCompositeJsonEncoder()); + private LoggingEventCompositeJsonFormatter formatter = new LoggingEventCompositeJsonFormatter(mock(ContextAware.class)); @Mock private ILoggingEvent event; @@ -53,9 +64,73 @@ public void testDoesNotFailOnEmptyBeans() throws IOException { * This should not throw an exception, since SerializationFeature.FAIL_ON_EMPTY_BEANS is disabled */ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { - JsonFormatter jsonFormatter = formatter.createJsonFormatter(bos); - assertThatCode(() -> jsonFormatter.writeEvent(event)).doesNotThrowAnyException(); + assertThatCode(() -> formatter.writeEvent(event, bos)).doesNotThrowAnyException(); + } + } + + + /* + * JsonFormatter reused by subsequent invocations + */ + @Test + public void testReused() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.start(); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + formatter.writeEvent(event, bos); + formatter.writeEvent(event, bos); + + // Only once instance should have been created + verify(decorator, times(1)).decorate(any(JsonGenerator.class)); + } + } + + + /* + * JsonFormatter not reused after exception thrown by underlying OutputStream + */ + @Test + public void testNotReusedAfterIOException() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.start(); + + OutputStream bos = mock(OutputStream.class); + doThrow(new IOException()) + .doCallRealMethod() + .when(bos).write(any(byte[].class), any(int.class), any(int.class)); + + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + formatter.writeEvent(event, bos); + + // Two instances created because the first was discarded because of the exception + verify(decorator, times(2)).decorate(any(JsonGenerator.class)); + } + + + /* + * JsonFormatter not reused after exception thrown while calling registered JsonProviders + */ + @Test + public void testNotReusedAfterEncoderException() throws IOException { + JsonGeneratorDecorator decorator = spy(new NullJsonGeneratorDecorator()); + formatter.setJsonGeneratorDecorator(decorator); + formatter.getProviders().addProvider(new AbstractJsonProvider() { + @Override + public void writeTo(JsonGenerator generator, ILoggingEvent event) throws IOException { + throw new IOException("Exception thrown by JsonProvider"); + } + }); + formatter.start(); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + assertThatThrownBy(() -> formatter.writeEvent(event, bos)).isInstanceOf(IOException.class); + + // Two instances created because the first was discarded because of the exception + verify(decorator, times(2)).decorate(any(JsonGenerator.class)); } } - } diff --git a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java index 5412036b..a667ab97 100644 --- a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java +++ b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java @@ -22,11 +22,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,19 +34,19 @@ import net.logstash.logback.TestJsonProvider; import net.logstash.logback.composite.CompositeJsonFormatter; +import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Context; +import ch.qos.logback.core.BasicStatusManager; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import ch.qos.logback.core.status.OnConsoleStatusListener; import ch.qos.logback.core.status.StatusManager; -import ch.qos.logback.core.status.WarnStatus; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonGenerator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -56,27 +54,32 @@ @ExtendWith(MockitoExtension.class) public class CompositeJsonEncoderTest { - @InjectMocks private final TestCompositeJsonEncoder encoder = new TestCompositeJsonEncoder(); private CompositeJsonFormatter formatter; - @Mock(lenient = true) - private Context context; + private LoggerContext context = new LoggerContext(); - @Mock - private StatusManager statusManager; + private StatusManager statusManager = new BasicStatusManager(); @Mock private ILoggingEvent event; @BeforeEach public void setup() { + // Output statuses on the console for easy debugging. Must be initialized early to capture + // warnings emitted by setter/getter methods before the appender is started. + OnConsoleStatusListener consoleListener = new OnConsoleStatusListener(); + consoleListener.start(); + this.statusManager.add(consoleListener); + + this.context.setStatusManager(statusManager); + // suppress line separator to make test platform independent this.encoder.setLineSeparator(""); - this.formatter = encoder.getFormatter(); + this.encoder.setContext(context); - when(context.getStatusManager()).thenReturn(statusManager); + this.formatter = encoder.getFormatter(); } @@ -95,7 +98,7 @@ public void startStop() { assertThat(encoder.isStarted()).isTrue(); assertThat(formatter.isStarted()).isTrue(); assertThat(prefix.isStarted()).isTrue(); - verify(formatter).setContext(context); + assertThat(formatter.getContext()).isEqualTo(context); // providers are not started a second time encoder.start(); @@ -222,13 +225,13 @@ public void testLineEndings() { */ @Test public void testIOException() throws IOException { - encoder.exceptionToThrow = new IOException(); + encoder.exceptionToThrow = new IOException(); encoder.start(); encoder.encode(event); - verify(statusManager).add(new WarnStatus("Error encountered while encoding log event. " - + "Event: " + event, context, encoder.exceptionToThrow)); + assertThat(statusManager.getCopyOfStatusList()) + .anyMatch(s -> s.getMessage().startsWith("Error encountered while encoding log event.")); } @@ -246,8 +249,8 @@ public void testIOException_streaming() throws IOException { assertThatCode(() -> encoder.encode(event, stream)).isInstanceOf(IOException.class); - verify(statusManager, never()).add(new WarnStatus("Error encountered while encoding log event. " - + "Event: " + event, context, exception)); + assertThat(statusManager.getCopyOfStatusList()) + .noneMatch(s -> s.getMessage().startsWith("Error encountered while encoding log event.")); } @@ -255,7 +258,7 @@ public void testIOException_streaming() throws IOException { private static class TestCompositeJsonEncoder extends CompositeJsonEncoder { - private IOException exceptionToThrow; + private IOException exceptionToThrow; @Override protected CompositeJsonFormatter createFormatter() { diff --git a/src/test/java/net/logstash/logback/util/ObjectPoolTest.java b/src/test/java/net/logstash/logback/util/ObjectPoolTest.java new file mode 100644 index 00000000..916dd184 --- /dev/null +++ b/src/test/java/net/logstash/logback/util/ObjectPoolTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import net.logstash.logback.util.ObjectPool.Lifecycle; + +import org.junit.jupiter.api.Test; + +/** + * @author brenuart + * + */ +public class ObjectPoolTest { + + private ObjectPool pool = new ObjectPool<>(this::createInstance); + + + @Test + public void testReuse() { + PooledObject obj1 = pool.acquire(); + assertThat(obj1).isNotNull(); + + // Acquire a second instance and make sure not same as first + PooledObject obj2 = pool.acquire(); + assertThat(obj2).isNotNull(); + assertThat(obj1).isNotSameAs(obj2); + + // Release second and re-acquire - should be the same + pool.release(obj2); + PooledObject obj3 = pool.acquire(); + assertThat(obj3).isSameAs(obj2); + } + + + /* + * Assert Lifecyle#recycle() is invoked when instance is returned to the pool + */ + @Test + public void testRecycle() { + PooledObject obj1 = pool.acquire(); + pool.release(obj1); + + verify(obj1, times(1)).recycle(); + } + + + /* + * Assert instance is disposed and not returned to the pool when Lifecycle#recycle() + * returns false + */ + @Test + public void testNotRecyclable() { + PooledObject obj1 = pool.acquire(); + when(obj1.recycle()).thenReturn(false); + + pool.release(obj1); + + verify(obj1, times(1)).recycle(); + verify(obj1, times(1)).dispose(); + + assertThat(pool.acquire()).isNotSameAs(obj1); + } + + + /* + * Assert pooled instance are disposed when calling #clear() + */ + @Test + public void testClear() { + PooledObject obj1 = pool.acquire(); + pool.release(obj1); + assertThat(pool.size()).isEqualTo(1); + + pool.clear(); + verify(obj1, times(1)).dispose(); + + assertThat(pool.size()).isZero(); + } + + + /* + * Releasing a "null" instance should not throw any exception + */ + @Test + public void testReleaseNull() { + assertThatCode(() -> pool.release(null)).doesNotThrowAnyException(); + } + + + /* + * NullPointer exception thrown if factory returns null + */ + @Test + public void testFactoryReturnsNull() { + pool = new ObjectPool(() -> null); + assertThatThrownBy(() -> pool.acquire()).isInstanceOf(NullPointerException.class); + } + + + /* + * Exception thrown by the factory is propagated to ObjectPool#acquire() + */ + @Test + public void testFactoryThrowsException() { + RuntimeException e = new RuntimeException(); + + pool = new ObjectPool(() -> { + throw e; + }); + + assertThatThrownBy(() -> pool.acquire()).isSameAs(e); + } + + + private PooledObject createInstance() { + return spy(new PooledObject()); + } + + public static class PooledObject implements ObjectPool.Lifecycle { + @Override + public boolean recycle() { + return Lifecycle.super.recycle(); + } + + @Override + public void dispose() { + Lifecycle.super.dispose(); + } + } +} From 46177888f80c31af988223a7b67863cfbf418aa7 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Wed, 15 Sep 2021 05:04:36 +0200 Subject: [PATCH 2/4] Update javadoc --- .../logstash/logback/composite/CompositeJsonFormatter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index a80fdf77..605c1b4a 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -159,11 +159,12 @@ public void writeEvent(Event event, OutputStream outputStream) throws IOExceptio /** - * Create a reusable {@link JsonFormatter} bound to the given {@link OutputStream}. + * Create a reusable {@link JsonFormatter} bound to a {@link DisconnectedOutputStream}. * * @param stream the output stream used by the {@link JsonFormatter} * @return {@link JsonFormatter} writing JSON content in the output stream - * @throws IOException thrown when unable to write in the output stream or when Jackson fails to produce JSON content + * @throws IOException thrown when unable to write in the output stream or when Jackson + * fails to produce JSON content */ private JsonFormatter createJsonFormatter() { try { From 306f316f142d854bd57b057b7888a4b077bfe37d Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Wed, 15 Sep 2021 18:46:05 +0200 Subject: [PATCH 3/4] Fix javadoc --- .../net/logstash/logback/composite/CompositeJsonFormatter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index 605c1b4a..309d95c0 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -161,10 +161,7 @@ public void writeEvent(Event event, OutputStream outputStream) throws IOExceptio /** * Create a reusable {@link JsonFormatter} bound to a {@link DisconnectedOutputStream}. * - * @param stream the output stream used by the {@link JsonFormatter} * @return {@link JsonFormatter} writing JSON content in the output stream - * @throws IOException thrown when unable to write in the output stream or when Jackson - * fails to produce JSON content */ private JsonFormatter createJsonFormatter() { try { From ec713f2bd769809e4ec7d3cf64a32e7024cb3b93 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Wed, 15 Sep 2021 18:51:19 +0200 Subject: [PATCH 4/4] Wait until event handler is started to initialize intermediate buffer used by StreamingEncoder Deciding if an intermediate buffer is required or not in the constructor was not a good idea: the encoder is not yet known at this point (still null). --- .../AbstractLogstashTcpSocketAppender.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 857f4db5..ef209a45 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -354,9 +354,9 @@ private class TcpSendingEventHandler implements EventHandler>, L /** * Intermediate ByteBuffer used to store content generated by {@link StreamingEncoder}. - * Stays uninitialized if encoder is a "raw" {@link Encoder}. + * Set when {@link #onStart()} but stays uninitialized if encoder is a "raw" {@link Encoder}. */ - private final ReusableByteBuffer buffer; + private ReusableByteBuffer buffer; /** * When run, if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration} @@ -511,17 +511,6 @@ public void run() { } } } - - - TcpSendingEventHandler() { - if (encoder instanceof CompositeJsonEncoder) { - this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder) encoder).getMinBufferSize()); - } else if (encoder instanceof StreamingEncoder) { - this.buffer = new ReusableByteBuffer(); - } else { - this.buffer = null; - } - } @Override @@ -664,6 +653,13 @@ private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentN @Override public void onStart() { this.destinationAttemptStartTimes = new long[destinations.size()]; + + if (encoder instanceof CompositeJsonEncoder) { + this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder) encoder).getMinBufferSize()); + } else if (encoder instanceof StreamingEncoder) { + this.buffer = new ReusableByteBuffer(); + } + openSocket(); scheduleKeepAlive(System.nanoTime()); scheduleWriteTimeout(); @@ -825,6 +821,7 @@ private synchronized void closeSocket() { private void closeEncoder() { encoder.stop(); + buffer = null; } private synchronized void scheduleKeepAlive(long basedOnNanoTime) {