diff --git a/README.md b/README.md index 8b4c71c..acb91f3 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Write Pino transports easily. ## Install -``` +```sh npm i pino-abstract-transport ``` @@ -43,9 +43,12 @@ module.exports = function (opts) { ``` ## Typescript usage + Install the type definitions for node. Make sure the major version of the type definitions matches the node version you are using. + #### Node 16 -``` + +```sh npm i -D @types/node@16 ``` @@ -78,6 +81,8 @@ stream, it emits the following events: * `parseLine(line)` a function that is used to parse line received from `pino`. +* `expectPinoConfig` a boolean that indicates if the transport expects Pino to add some of its configuration to the stream. Default: `false`. + ## Example ### custom parseLine @@ -142,6 +147,26 @@ pipeline(process.stdin, buildTransform(), buildDestination(), function (err) { }) ``` +### Using pino config + +Setting `expectPinoConfig` to `true` will make the transport wait for pino to send its configuration before starting to process logs. It will add `levels`, `messageKey` and `errorKey` to the stream. + +When used with an incompatible version of pino, the stream will immediately error. + +```js +import build from 'pino-abstract-transport' + +export default function (opts) { + return build(async function (source) { + for await (const obj of source) { + console.log(`[${source.levels.labels[obj.level]}]: ${obj[source.messageKey]}`) + } + }, { + expectPinoConfig: true + }) +} +``` + ## License MIT diff --git a/index.d.ts b/index.d.ts index 74b58a8..1ac49a5 100644 --- a/index.d.ts +++ b/index.d.ts @@ -40,6 +40,12 @@ type BuildOptions = { * `metadata` If set to false, do not add metadata properties to the returned stream */ metadata?: false; + + /** + * `expectPinoConfig` If set to true, the transport will wait for pino to send its + * configuration before starting to process logs. + */ + expectPinoConfig?: boolean; }; /** @@ -50,6 +56,17 @@ type EnablePipelining = BuildOptions & { enablePipelining: true; }; +/** + * Create a split2 instance and returns it. This same instance is also passed + * to the given function, which is called after pino has sent its configuration. + * + * @returns {Promise} the split2 instance + */ +declare function build( + fn: (transform: Transform & build.OnUnknown) => void | Promise, + opts: BuildOptions & { expectPinoConfig: true } +): Promise; + /** * Create a split2 instance and returns it. This same instance is also passed * to the given function, which is called synchronously. @@ -61,6 +78,19 @@ declare function build( opts?: BuildOptions ): Transform & build.OnUnknown; +/** + * Creates a split2 instance and passes it to the given function, which is called + * after pino has sent its configuration. Then wraps the split2 instance and + * the returned stream into a Duplex, so they can be concatenated into multiple + * transports. + * + * @returns {Promise} the wrapped split2 instance + */ +declare function build( + fn: (transform: Transform & build.OnUnknown) => Transform & build.OnUnknown, + opts: EnablePipelining & { expectPinoConfig: true } +): Promise; + /** * Creates a split2 instance and passes it to the given function, which is called * synchronously. Then wraps the split2 instance and the returned stream into a diff --git a/index.js b/index.js index 2f6167f..c05564a 100644 --- a/index.js +++ b/index.js @@ -3,8 +3,22 @@ const metadata = Symbol.for('pino.metadata') const split = require('split2') const { Duplex } = require('readable-stream') +const { parentPort, workerData } = require('worker_threads') + +function createDeferred () { + let resolve + let reject + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve + reject = _reject + }) + promise.resolve = resolve + promise.reject = reject + return promise +} module.exports = function build (fn, opts = {}) { + const waitForConfig = opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig === true const parseLines = opts.parse === 'lines' const parseLine = typeof opts.parseLine === 'function' ? opts.parseLine : JSON.parse const close = opts.close || defaultClose @@ -50,6 +64,12 @@ module.exports = function build (fn, opts = {}) { } } + if (opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig !== true) { + setImmediate(() => { + stream.emit('error', new Error('This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.')) + }) + } + if (opts.metadata !== false) { stream[metadata] = true stream.lastTime = 0 @@ -57,20 +77,50 @@ module.exports = function build (fn, opts = {}) { stream.lastObj = null } - let res = fn(stream) + if (waitForConfig) { + let pinoConfig = {} + const configReceived = createDeferred() + parentPort.on('message', function handleMessage (message) { + if (message.code === 'PINO_CONFIG') { + pinoConfig = message.config + configReceived.resolve() + parentPort.off('message', handleMessage) + } + }) - if (res && typeof res.catch === 'function') { - res.catch((err) => { - stream.destroy(err) + Object.defineProperties(stream, { + levels: { + get () { return pinoConfig.levels } + }, + messageKey: { + get () { return pinoConfig.messageKey } + }, + errorKey: { + get () { return pinoConfig.errorKey } + } }) - // set it to null to not retain a reference to the promise - res = null - } else if (opts.enablePipelining && res) { - return Duplex.from({ writable: stream, readable: res }) + return configReceived.then(finish) } - return stream + return finish() + + function finish () { + let res = fn(stream) + + if (res && typeof res.catch === 'function') { + res.catch((err) => { + stream.destroy(err) + }) + + // set it to null to not retain a reference to the promise + res = null + } else if (opts.enablePipelining && res) { + return Duplex.from({ writable: stream, readable: res }) + } + + return stream + } } function defaultClose (err, cb) { diff --git a/package.json b/package.json index 47f4269..56754f8 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "snazzy": "^9.0.0", "standard": "^17.0.0", "tap": "^16.0.0", + "thread-stream": "^2.4.1", "tsd": "^0.31.0" }, "tsd": { diff --git a/test/base.test.js b/test/base.test.js index e025525..e804135 100644 --- a/test/base.test.js +++ b/test/base.test.js @@ -139,6 +139,13 @@ test('rejecting errors the stream', async ({ same, plan }) => { same(err.message, 'kaboom') }) +test('emits an error if the transport expects pino to send the config, but pino is not going to', async function ({ plan, same }) { + plan(1) + const stream = build(() => {}, { expectPinoConfig: true }) + const [err] = await once(stream, 'error') + same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.') +}) + test('set metadata', ({ same, plan, equal }) => { plan(9) diff --git a/test/fixtures/transport-async-iteration.js b/test/fixtures/transport-async-iteration.js new file mode 100644 index 0000000..ddcdaf3 --- /dev/null +++ b/test/fixtures/transport-async-iteration.js @@ -0,0 +1,22 @@ +'use strict' + +const build = require('../..') + +module.exports = async function (threadStreamOpts) { + const { port, opts = {} } = threadStreamOpts + return build( + async function (source) { + for await (const obj of source) { + port.postMessage({ + data: obj, + pinoConfig: { + levels: source.levels, + messageKey: source.messageKey, + errorKey: source.errorKey + } + }) + } + }, + opts + ) +} diff --git a/test/fixtures/transport-on-data.js b/test/fixtures/transport-on-data.js new file mode 100644 index 0000000..58143fa --- /dev/null +++ b/test/fixtures/transport-on-data.js @@ -0,0 +1,22 @@ +'use strict' + +const build = require('../..') + +module.exports = async function (threadStreamOpts) { + const { port, opts = {} } = threadStreamOpts + return build( + function (source) { + source.on('data', function (line) { + port.postMessage({ + data: line, + pinoConfig: { + levels: source.levels, + messageKey: source.messageKey, + errorKey: source.errorKey + } + }) + }) + }, + opts + ) +} diff --git a/test/fixtures/transport-transform.js b/test/fixtures/transport-transform.js new file mode 100644 index 0000000..66c84dc --- /dev/null +++ b/test/fixtures/transport-transform.js @@ -0,0 +1,24 @@ +'use strict' + +const { Transform, pipeline } = require('stream') +const build = require('../..') + +module.exports = function (threadStreamOpts) { + const { opts = {} } = threadStreamOpts + return build(function (source) { + const transform = new Transform({ + objectMode: true, + autoDestroy: true, + transform (chunk, enc, cb) { + chunk.service = 'from transform' + chunk.level = `${source.levels.labels[chunk.level]}(${chunk.level})` + chunk[source.messageKey] = chunk[source.messageKey].toUpperCase() + cb(null, JSON.stringify(chunk) + '\n') + } + }) + + pipeline(source, transform, () => {}) + + return transform + }, { ...opts, enablePipelining: true }) +} diff --git a/test/fixtures/worker-pipeline.js b/test/fixtures/worker-pipeline.js new file mode 100644 index 0000000..38af252 --- /dev/null +++ b/test/fixtures/worker-pipeline.js @@ -0,0 +1,15 @@ +'use strict' + +const { pipeline, PassThrough } = require('stream') + +module.exports = async function ({ targets }) { + const streams = await Promise.all(targets.map(async (t) => { + const fn = require(t.target) + const stream = await fn(t.options) + return stream + })) + + const stream = new PassThrough() + pipeline(stream, ...streams, () => {}) + return stream +} diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 3fa9169..b5f6a85 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -9,12 +9,22 @@ import { Transform } from "stream"; */ expectType(build((source) => source, { enablePipelining: true })); +/** + * If expectPinoConfig is set with enablePipelining, build returns a promise + */ +expectType<(Promise)>(build((source) => source, { enablePipelining: true, expectPinoConfig: true })); + /** * If enablePipelining is not set the unknown event can be listened to on * the returned stream. */ expectType(build((source) => {})); +/** + * If expectPinoConfig is set, build returns a promise + */ +expectType<(Promise)>(build((source) => {}, { expectPinoConfig: true })); + /** * build also accepts an async function */ diff --git a/test/worker.test.js b/test/worker.test.js new file mode 100644 index 0000000..e49976e --- /dev/null +++ b/test/worker.test.js @@ -0,0 +1,364 @@ +'use strict' + +const { once } = require('events') +const { join } = require('path') +const ThreadStream = require('thread-stream') +const { MessageChannel } = require('worker_threads') +const { test } = require('tap') + +workerTest('transport-on-data.js') +workerTest('transport-async-iteration.js', ' when using async iteration') + +function workerTest (filename, description = '') { + test(`does not wait for pino to send config by default${description}`, function ({ same, plan }) { + plan(4) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', filename), + workerData: { port: port1 }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }] + + const emptyPinoConfig = { + levels: undefined, + messageKey: undefined, + errorKey: undefined + } + + port2.on('message', function (message) { + same(expected.shift(), message.data) + same(emptyPinoConfig, message.pinoConfig) + }) + + const lines = expected.map(JSON.stringify).join('\n') + stream.write(lines) + stream.end() + }) + + test(`does not wait for pino to send config if transport is not expecting it${description}`, function ({ same, plan }) { + plan(4) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', filename), + workerData: { + port: port1, + pinoWillSendConfig: true + }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }] + + const emptyPinoConfig = { + levels: undefined, + messageKey: undefined, + errorKey: undefined + } + + const pinoConfig = { + levels: { + labels: { 30: 'info' }, + values: { info: 30 } + }, + messageKey: 'msg', + errorKey: 'err' + } + + stream.worker.postMessage({ code: 'PINO_CONFIG', config: pinoConfig }) + // stream.emit('message', { code: 'PINO_CONFIG', config: pinoConfig }) + + port2.on('message', function (message) { + same(expected.shift(), message.data) + same(emptyPinoConfig, message.pinoConfig) + }) + + const lines = expected.map(JSON.stringify).join('\n') + stream.write(lines) + stream.end() + }) + + test(`waits for the pino config when pino intends to send it and the transport requests it${description}`, function ({ same, plan }) { + plan(4) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', filename), + workerData: { + port: port1, + pinoWillSendConfig: true, + opts: { + expectPinoConfig: true + } + }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }] + + const pinoConfig = { + levels: { + labels: { 30: 'info' }, + values: { info: 30 } + }, + messageKey: 'msg', + errorKey: 'err' + } + + port2.on('message', function (message) { + same(expected.shift(), message.data) + same(pinoConfig, message.pinoConfig) + }) + + const lines = expected.map(JSON.stringify).join('\n') + stream.worker.postMessage({ code: 'PINO_CONFIG', config: pinoConfig }) + // stream.emit('message', { code: 'PINO_CONFIG', config: pinoConfig }) + stream.write(lines) + stream.end() + }) + + test(`continues to listen if it receives a message that is not PINO_CONFIG${description}`, function ({ same, plan }) { + plan(4) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', 'transport-on-data.js'), + workerData: { + port: port1, + pinoWillSendConfig: true, + opts: { + expectPinoConfig: true + } + }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }] + + const pinoConfig = { + levels: { + labels: { 30: 'info' }, + values: { info: 30 } + }, + messageKey: 'msg', + errorKey: 'err' + } + + port2.on('message', function (message) { + same(expected.shift(), message.data) + same(pinoConfig, message.pinoConfig) + }) + + const lines = expected.map(JSON.stringify).join('\n') + stream.worker.postMessage('not a PINO_CONFIG') + // stream.emit('message', 'not a PINO_CONFIG') + stream.worker.postMessage({ code: 'NOT_PINO_CONFIG', config: { levels: 'foo', messageKey: 'bar', errorKey: 'baz' } }) + // stream.emit('message', { code: 'NOT_PINO_CONFIG', config: { levels: 'foo', messageKey: 'bar', errorKey: 'baz' } }) + stream.worker.postMessage({ code: 'PINO_CONFIG', config: pinoConfig }) + // stream.emit('message', { code: 'PINO_CONFIG', config: pinoConfig }) + stream.write(lines) + stream.end() + }) + + test(`waits for the pino config even if it is sent after write${description}`, function ({ same, plan }) { + plan(4) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', filename), + workerData: { + port: port1, + pinoWillSendConfig: true, + opts: { + expectPinoConfig: true + } + }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }] + + const pinoConfig = { + levels: { + labels: { 30: 'info' }, + values: { info: 30 } + }, + messageKey: 'msg', + errorKey: 'err' + } + + port2.on('message', function (message) { + same(expected.shift(), message.data) + same(pinoConfig, message.pinoConfig) + }) + + const lines = expected.map(JSON.stringify).join('\n') + stream.write(lines) + stream.worker.postMessage({ code: 'PINO_CONFIG', config: pinoConfig }) + // stream.emit('message', { code: 'PINO_CONFIG', config: pinoConfig }) + stream.end() + }) + + test(`emits an error if the transport expects pino to send the config, but pino is not going to${description}`, async function ({ plan, same, ok }) { + plan(2) + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', filename), + workerData: { + opts: { + expectPinoConfig: true + } + } + }) + const [err] = await once(stream, 'error') + same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.') + ok(stream.destroyed) + }) +} + +test('waits for the pino config when pipelining', function ({ same, plan }) { + plan(2) + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'fixtures', 'worker-pipeline.js'), + workerData: { + pinoWillSendConfig: true, + targets: [{ + target: './transport-transform.js', + options: { + opts: { expectPinoConfig: true } + } + }, { + target: './transport-on-data.js', + options: { + port: port1 + } + }] + }, + workerOpts: { + transferList: [port1] + } + }) + + const expected = [{ + level: 'info(30)', + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'HELLO WORLD', + service: 'from transform' + }, { + level: 'info(30)', + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'ANOTHER MESSAGE', + prop: 42, + service: 'from transform' + }] + + const lines = [{ + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'hello world' + }, { + level: 30, + time: 1617955768092, + pid: 2942, + hostname: 'MacBook-Pro.local', + msg: 'another message', + prop: 42 + }].map(JSON.stringify).join('\n') + + const pinoConfig = { + levels: { + labels: { 30: 'info' }, + values: { info: 30 } + }, + messageKey: 'msg', + errorKey: 'err' + } + + port2.on('message', function (message) { + same(expected.shift(), message.data) + }) + + stream.worker.postMessage({ code: 'PINO_CONFIG', config: pinoConfig }) + // stream.emit('message', { code: 'PINO_CONFIG', config: pinoConfig }) + stream.write(lines) + stream.end() +})