Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Wait until event handler is started to initialize intermediate buffer…
… used by StreamingEncoder

Deciding if an intermediate buffer is required or not in the constructor was not a good idea: the encoder is not yet known at this point (still null).
  • Loading branch information
brenuart committed Sep 15, 2021
commit ec713f2bd769809e4ec7d3cf64a32e7024cb3b93
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,9 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L

/**
* Intermediate ByteBuffer used to store content generated by {@link StreamingEncoder}.
* Stays uninitialized if encoder is a "raw" {@link Encoder}.
* Set when {@link #onStart()} but stays uninitialized if encoder is a "raw" {@link Encoder}.
*/
private final ReusableByteBuffer buffer;
private ReusableByteBuffer buffer;

/**
* When run, if the {@link AbstractLogstashTcpSocketAppender#keepAliveDuration}
Expand Down Expand Up @@ -511,17 +511,6 @@ public void run() {
}
}
}


TcpSendingEventHandler() {
if (encoder instanceof CompositeJsonEncoder) {
this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder<Event>) encoder).getMinBufferSize());
} else if (encoder instanceof StreamingEncoder) {
this.buffer = new ReusableByteBuffer();
} else {
this.buffer = null;
}
}


@Override
Expand Down Expand Up @@ -664,6 +653,13 @@ private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentN
@Override
public void onStart() {
this.destinationAttemptStartTimes = new long[destinations.size()];

if (encoder instanceof CompositeJsonEncoder) {
this.buffer = new ReusableByteBuffer(((CompositeJsonEncoder<Event>) encoder).getMinBufferSize());
} else if (encoder instanceof StreamingEncoder) {
this.buffer = new ReusableByteBuffer();
}

openSocket();
scheduleKeepAlive(System.nanoTime());
scheduleWriteTimeout();
Expand Down Expand Up @@ -825,6 +821,7 @@ private synchronized void closeSocket() {

private void closeEncoder() {
encoder.stop();
buffer = null;
}

private synchronized void scheduleKeepAlive(long basedOnNanoTime) {
Expand Down