Skip to content

Commit 97fdd54

Browse files
authored
Handle SIGPIPEs (#547)
1 parent b2cfc4f commit 97fdd54

File tree

9 files changed

+198
-3
lines changed

9 files changed

+198
-3
lines changed

src/concurrently.spec.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ it('log output is passed to output stream if logger is specified in options', ()
6262
expect(outputStream.write).toHaveBeenCalledWith('bar');
6363
});
6464

65+
it('log output is not passed to output stream after it has errored', () => {
66+
const logger = new Logger({ hide: [] });
67+
const outputStream = new Writable();
68+
jest.spyOn(outputStream, 'write');
69+
70+
create(['foo'], { logger, outputStream });
71+
outputStream.emit('error', new Error());
72+
logger.log('foo', 'bar');
73+
74+
expect(outputStream.write).not.toHaveBeenCalled();
75+
});
76+
6577
it('spawns commands up to configured limit at once', () => {
6678
create(['foo', 'bar', 'baz', 'qux'], { maxProcesses: 2 });
6779
expect(spawn).toHaveBeenCalledTimes(2);

src/concurrently.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import assert from 'assert';
22
import _ from 'lodash';
33
import { cpus } from 'os';
4+
import { takeUntil } from 'rxjs';
45
import { Writable } from 'stream';
56
import treeKill from 'tree-kill';
67

@@ -223,7 +224,10 @@ export function concurrently(
223224
group: !!options.group,
224225
commands,
225226
});
226-
options.logger.output.subscribe(({ command, text }) => outputWriter.write(command, text));
227+
options.logger.output
228+
// Stop trying to write after there's been an error.
229+
.pipe(takeUntil(outputWriter.error))
230+
.subscribe(({ command, text }) => outputWriter.write(command, text));
227231
}
228232

229233
const commandsLeft = commands.slice();
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { Writable } from 'stream';
2+
3+
import { FakeCommand } from '../fixtures/fake-command';
4+
import { OutputErrorHandler } from './output-error-handler';
5+
6+
let controller: OutputErrorHandler;
7+
let outputStream: Writable;
8+
let abortController: AbortController;
9+
let commands: FakeCommand[];
10+
beforeEach(() => {
11+
commands = [new FakeCommand(), new FakeCommand()];
12+
13+
abortController = new AbortController();
14+
outputStream = new Writable();
15+
controller = new OutputErrorHandler({ abortController, outputStream });
16+
});
17+
18+
it('returns same commands', () => {
19+
expect(controller.handle(commands)).toMatchObject({ commands });
20+
});
21+
22+
describe('on output stream error', () => {
23+
beforeEach(() => {
24+
controller.handle(commands);
25+
outputStream.emit('error', new Error());
26+
});
27+
28+
it('kills every command', () => {
29+
expect(commands[0].kill).toHaveBeenCalled();
30+
expect(commands[1].kill).toHaveBeenCalled();
31+
});
32+
33+
it('sends abort signal', () => {
34+
expect(abortController.signal.aborted).toBe(true);
35+
});
36+
});
37+
38+
describe('on finish', () => {
39+
it('unsubscribes from output stream error', () => {
40+
const { onFinish } = controller.handle(commands);
41+
onFinish();
42+
43+
outputStream.on('error', jest.fn());
44+
outputStream.emit('error', new Error());
45+
46+
expect(commands[0].kill).not.toHaveBeenCalled();
47+
expect(commands[1].kill).not.toHaveBeenCalled();
48+
expect(abortController.signal.aborted).toBe(false);
49+
});
50+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { Writable } from 'stream';
2+
3+
import { Command } from '../command';
4+
import { fromSharedEvent } from '../observables';
5+
import { FlowController } from './flow-controller';
6+
7+
/**
8+
* Kills processes and aborts further command spawning on output stream error (namely, SIGPIPE).
9+
*/
10+
export class OutputErrorHandler implements FlowController {
11+
private readonly outputStream: Writable;
12+
private readonly abortController: AbortController;
13+
14+
constructor({
15+
abortController,
16+
outputStream,
17+
}: {
18+
abortController: AbortController;
19+
outputStream: Writable;
20+
}) {
21+
this.abortController = abortController;
22+
this.outputStream = outputStream;
23+
}
24+
25+
handle(commands: Command[]): { commands: Command[]; onFinish(): void } {
26+
const subscription = fromSharedEvent(this.outputStream, 'error').subscribe(() => {
27+
commands.forEach((command) => command.kill());
28+
29+
// Avoid further commands from spawning, e.g. if `RestartProcess` is used.
30+
this.abortController.abort();
31+
});
32+
33+
return {
34+
commands,
35+
onFinish: () => subscription.unsubscribe(),
36+
};
37+
}
38+
}

src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { LogExit } from './flow-control/log-exit';
1818
import { LogOutput } from './flow-control/log-output';
1919
import { LogTimings } from './flow-control/log-timings';
2020
import { LoggerPadding } from './flow-control/logger-padding';
21+
import { OutputErrorHandler } from './flow-control/output-error-handler';
2122
import { RestartDelay, RestartProcess } from './flow-control/restart-process';
2223
import { Teardown } from './flow-control/teardown';
2324
import { Logger } from './logger';
@@ -145,6 +146,7 @@ export function concurrently(
145146
}
146147

147148
const abortController = new AbortController();
149+
const outputStream = options.outputStream || process.stdout;
148150

149151
return createConcurrently(commands, {
150152
maxProcesses: options.maxProcesses,
@@ -153,7 +155,7 @@ export function concurrently(
153155
cwd: options.cwd,
154156
hide,
155157
logger,
156-
outputStream: options.outputStream || process.stdout,
158+
outputStream,
157159
group: options.group,
158160
abortSignal: abortController.signal,
159161
controllers: [
@@ -182,6 +184,7 @@ export function concurrently(
182184
killSignal: options.killSignal,
183185
abortController,
184186
}),
187+
new OutputErrorHandler({ abortController, outputStream }),
185188
new LogTimings({
186189
logger: options.timings ? logger : undefined,
187190
timestampFormat: options.timestampFormat,

src/observables.spec.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import EventEmitter from 'events';
2+
3+
import { fromSharedEvent } from './observables';
4+
5+
describe('fromSharedEvent()', () => {
6+
it('returns same observable for event emitter/name pair', () => {
7+
const emitter = new EventEmitter();
8+
const obs1 = fromSharedEvent(emitter, 'foo');
9+
const obs2 = fromSharedEvent(emitter, 'foo');
10+
expect(obs1).toBe(obs2);
11+
});
12+
13+
it('returns different observables for different event emitter/name pairs', () => {
14+
const emitter = new EventEmitter();
15+
const obs1 = fromSharedEvent(emitter, 'foo');
16+
const obs2 = fromSharedEvent(emitter, 'bar');
17+
expect(obs1).not.toBe(obs2);
18+
19+
const emitter2 = new EventEmitter();
20+
const obs3 = fromSharedEvent(emitter2, 'foo');
21+
const obs4 = fromSharedEvent(emitter2, 'bar');
22+
expect(obs1).not.toBe(obs3);
23+
expect(obs2).not.toBe(obs4);
24+
});
25+
26+
it('sets up listener only once per event emitter/name pair', () => {
27+
const emitter = new EventEmitter();
28+
const observable = fromSharedEvent(emitter, 'foo');
29+
observable.subscribe();
30+
observable.subscribe();
31+
32+
expect(emitter.listenerCount('foo')).toBe(1);
33+
});
34+
});

src/observables.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import EventEmitter from 'events';
2+
import { fromEvent, Observable, share } from 'rxjs';
3+
4+
const sharedEvents = new WeakMap<EventEmitter, Map<string, Observable<unknown>>>();
5+
6+
/**
7+
* Creates an observable for a specific event of an `EventEmitter` instance.
8+
*
9+
* The underlying event listener is set up only once across the application for that event emitter/name pair.
10+
*/
11+
export function fromSharedEvent(emitter: EventEmitter, event: string): Observable<unknown> {
12+
let emitterEvents = sharedEvents.get(emitter);
13+
if (!emitterEvents) {
14+
emitterEvents = new Map();
15+
sharedEvents.set(emitter, emitterEvents);
16+
}
17+
18+
let observable = emitterEvents.get(event);
19+
if (!observable) {
20+
observable = fromEvent(emitter, event).pipe(share());
21+
emitterEvents.set(event, observable);
22+
}
23+
24+
return observable;
25+
}

src/output-writer.spec.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,18 @@ beforeEach(() => {
3030
];
3131
});
3232

33+
it('throws if outputStream already is in errored state', () => {
34+
Object.assign(outputStream, { errored: new Error() });
35+
expect(() => createWriter()).toThrow(TypeError);
36+
});
37+
3338
describe('#write()', () => {
39+
it('throws if outputStream has errored', () => {
40+
const writer = createWriter();
41+
Object.assign(outputStream, { errored: new Error() });
42+
expect(() => writer.write(commands[0], 'hello')).toThrow(TypeError);
43+
});
44+
3445
describe('with group=false', () => {
3546
it('writes instantly', () => {
3647
const writer = createWriter({ group: false });

src/output-writer.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as Rx from 'rxjs';
22
import { Writable } from 'stream';
33

44
import { Command } from './command';
5+
import { fromSharedEvent } from './observables';
56

67
/**
78
* Class responsible for actually writing output onto a writable stream.
@@ -12,6 +13,11 @@ export class OutputWriter {
1213
readonly buffers: string[][];
1314
activeCommandIndex = 0;
1415

16+
readonly error: Rx.Observable<unknown>;
17+
private get errored() {
18+
return this.outputStream.errored;
19+
}
20+
1521
constructor({
1622
outputStream,
1723
group,
@@ -22,6 +28,9 @@ export class OutputWriter {
2228
commands: Command[];
2329
}) {
2430
this.outputStream = outputStream;
31+
this.ensureWritable();
32+
33+
this.error = fromSharedEvent(this.outputStream, 'error');
2534
this.group = group;
2635
this.buffers = commands.map(() => []);
2736

@@ -42,7 +51,14 @@ export class OutputWriter {
4251
}
4352
}
4453

54+
private ensureWritable() {
55+
if (this.errored) {
56+
throw new TypeError('outputStream is in errored state', { cause: this.errored });
57+
}
58+
}
59+
4560
write(command: Command | undefined, text: string) {
61+
this.ensureWritable();
4662
if (this.group && command) {
4763
if (command.index <= this.activeCommandIndex) {
4864
this.outputStream.write(text);
@@ -56,7 +72,9 @@ export class OutputWriter {
5672
}
5773

5874
private flushBuffer(index: number) {
59-
this.buffers[index].forEach((t) => this.outputStream.write(t));
75+
if (!this.errored) {
76+
this.buffers[index].forEach((t) => this.outputStream.write(t));
77+
}
6078
this.buffers[index] = [];
6179
}
6280
}

0 commit comments

Comments
 (0)