Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Write Pino transports easily.

## Install

```
```sh
npm i pino-abstract-transport
```

Expand Down Expand Up @@ -43,9 +43,12 @@ module.exports = function (opts) {
```

## Typescript usage

Install the type definitions for node. Make sure the major version of the type definitions matches the node version you are using.

#### Node 16
```

```sh
npm i -D @types/node@16
```

Expand Down Expand Up @@ -78,6 +81,8 @@ stream, it emits the following events:

* `parseLine(line)` a function that is used to parse line received from `pino`.

* `expectPinoConfig` a boolean that indicates if the transport expects Pino to add some of its configuration to the stream. Default: `false`.

## Example

### custom parseLine
Expand Down Expand Up @@ -142,6 +147,26 @@ pipeline(process.stdin, buildTransform(), buildDestination(), function (err) {
})
```

### Using pino config

Setting `expectPinoConfig` to `true` will make the transport wait for pino to send its configuration before starting to process logs. It will add `levels`, `messageKey` and `errorKey` to the stream.

When used with an incompatible version of pino, the stream will immediately error.

```js
import build from 'pino-abstract-transport'

export default function (opts) {
return build(async function (source) {
for await (const obj of source) {
console.log(`[${source.levels.labels[obj.level]}]: ${obj[source.messageKey]}`)
}
}, {
expectPinoConfig: true
})
}
```

## License

MIT
30 changes: 30 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type BuildOptions = {
* `metadata` If set to false, do not add metadata properties to the returned stream
*/
metadata?: false;

/**
* `expectPinoConfig` If set to true, the transport will wait for pino to send its
* configuration before starting to process logs.
*/
expectPinoConfig?: boolean;
};

/**
Expand All @@ -50,6 +56,17 @@ type EnablePipelining = BuildOptions & {
enablePipelining: true;
};

/**
* Create a split2 instance and returns it. This same instance is also passed
* to the given function, which is called after pino has sent its configuration.
*
* @returns {Promise<Transform>} the split2 instance
*/
declare function build(
fn: (transform: Transform & build.OnUnknown) => void | Promise<void>,
opts: BuildOptions & { expectPinoConfig: true }
): Promise<Transform & build.OnUnknown>;

/**
* Create a split2 instance and returns it. This same instance is also passed
* to the given function, which is called synchronously.
Expand All @@ -61,6 +78,19 @@ declare function build(
opts?: BuildOptions
): Transform & build.OnUnknown;

/**
* Creates a split2 instance and passes it to the given function, which is called
* after pino has sent its configuration. Then wraps the split2 instance and
* the returned stream into a Duplex, so they can be concatenated into multiple
* transports.
*
* @returns {Promise<Transform>} the wrapped split2 instance
*/
declare function build(
fn: (transform: Transform & build.OnUnknown) => Transform & build.OnUnknown,
opts: EnablePipelining & { expectPinoConfig: true }
): Promise<Transform>;

/**
* Creates a split2 instance and passes it to the given function, which is called
* synchronously. Then wraps the split2 instance and the returned stream into a
Expand Down
68 changes: 59 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@
const metadata = Symbol.for('pino.metadata')
const split = require('split2')
const { Duplex } = require('readable-stream')
const { parentPort, workerData } = require('worker_threads')

function createDeferred () {
let resolve
let reject
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve
reject = _reject
})
promise.resolve = resolve
promise.reject = reject
return promise
}

module.exports = function build (fn, opts = {}) {
const waitForConfig = opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig === true
const parseLines = opts.parse === 'lines'
const parseLine = typeof opts.parseLine === 'function' ? opts.parseLine : JSON.parse
const close = opts.close || defaultClose
Expand Down Expand Up @@ -50,27 +64,63 @@ module.exports = function build (fn, opts = {}) {
}
}

if (opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig !== true) {
setImmediate(() => {
stream.emit('error', new Error('This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.'))
})
}

if (opts.metadata !== false) {
stream[metadata] = true
stream.lastTime = 0
stream.lastLevel = 0
stream.lastObj = null
}

let res = fn(stream)
if (waitForConfig) {
let pinoConfig = {}
const configReceived = createDeferred()
parentPort.on('message', function handleMessage (message) {
if (message.code === 'PINO_CONFIG') {
pinoConfig = message.config
configReceived.resolve()
parentPort.off('message', handleMessage)
}
})

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
Object.defineProperties(stream, {
levels: {
get () { return pinoConfig.levels }
},
messageKey: {
get () { return pinoConfig.messageKey }
},
errorKey: {
get () { return pinoConfig.errorKey }
}
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
return configReceived.then(finish)
}

return stream
return finish()

function finish () {
let res = fn(stream)

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
}

return stream
}
}

function defaultClose (err, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"snazzy": "^9.0.0",
"standard": "^17.0.0",
"tap": "^16.0.0",
"thread-stream": "^2.4.1",
"tsd": "^0.31.0"
},
"tsd": {
Expand Down
7 changes: 7 additions & 0 deletions test/base.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ test('rejecting errors the stream', async ({ same, plan }) => {
same(err.message, 'kaboom')
})

test('emits an error if the transport expects pino to send the config, but pino is not going to', async function ({ plan, same }) {
plan(1)
const stream = build(() => {}, { expectPinoConfig: true })
const [err] = await once(stream, 'error')
same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.')
})

test('set metadata', ({ same, plan, equal }) => {
plan(9)

Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/transport-async-iteration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
async function (source) {
for await (const obj of source) {
port.postMessage({
data: obj,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
}
},
opts
)
}
22 changes: 22 additions & 0 deletions test/fixtures/transport-on-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
function (source) {
source.on('data', function (line) {
port.postMessage({
data: line,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
})
},
opts
)
}
24 changes: 24 additions & 0 deletions test/fixtures/transport-transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const { Transform, pipeline } = require('stream')
const build = require('../..')

module.exports = function (threadStreamOpts) {
const { opts = {} } = threadStreamOpts
return build(function (source) {
const transform = new Transform({
objectMode: true,
autoDestroy: true,
transform (chunk, enc, cb) {
chunk.service = 'from transform'
chunk.level = `${source.levels.labels[chunk.level]}(${chunk.level})`
chunk[source.messageKey] = chunk[source.messageKey].toUpperCase()
cb(null, JSON.stringify(chunk) + '\n')
}
})

pipeline(source, transform, () => {})

return transform
}, { ...opts, enablePipelining: true })
}
15 changes: 15 additions & 0 deletions test/fixtures/worker-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

const { pipeline, PassThrough } = require('stream')

module.exports = async function ({ targets }) {
const streams = await Promise.all(targets.map(async (t) => {
const fn = require(t.target)
const stream = await fn(t.options)
return stream
}))

const stream = new PassThrough()
pipeline(stream, ...streams, () => {})
return stream
}
10 changes: 10 additions & 0 deletions test/types/index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,22 @@ import { Transform } from "stream";
*/
expectType<Transform>(build((source) => source, { enablePipelining: true }));

/**
* If expectPinoConfig is set with enablePipelining, build returns a promise
*/
expectType<(Promise<Transform>)>(build((source) => source, { enablePipelining: true, expectPinoConfig: true }));

/**
* If enablePipelining is not set the unknown event can be listened to on
* the returned stream.
*/
expectType<Transform & OnUnknown>(build((source) => {}));

/**
* If expectPinoConfig is set, build returns a promise
*/
expectType<(Promise<Transform & OnUnknown>)>(build((source) => {}, { expectPinoConfig: true }));

/**
* build also accepts an async function
*/
Expand Down
Loading