Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
stream: fix regression on duplex end
Decide the return status of writeOrBuffer before
calling stream.write which can reset state.length

Add unit test for #35926

Refs: #35926

PR-URL: #35941
Fixes: #35926
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
mmomtchev authored and Trott committed Nov 4, 2020
commit 5a3f43b255af6968606f13258447894f946ee3ce
12 changes: 6 additions & 6 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

state.length += len;

// stream._write resets state.length
const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
Expand All @@ -383,12 +389,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.sync = false;
}

const ret = state.length < state.highWaterMark;

// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
Expand Down
32 changes: 32 additions & 0 deletions test/parallel/test-stream-duplex-readable-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict';
// https://github.com/nodejs/node/issues/35926
require('../common');
const assert = require('assert');
const stream = require('stream');

let loops = 5;

const src = new stream.Readable({
read() {
if (loops--)
this.push(Buffer.alloc(20000));
}
});

const dst = new stream.Transform({
transform(chunk, output, fn) {
this.push(null);
fn();
}
});

src.pipe(dst);

function parser_end() {
assert.ok(loops > 0);
dst.removeAllListeners();
}

dst.on('data', () => { });
dst.on('end', parser_end);
dst.on('error', parser_end);