Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ stream, it emits the following events:

* `parseLine(line)` a function that is used to parse line received from `pino`.

* `expectPinoConfig` a boolean that indicates if the transport expects a pino to add some of its configuration to the stream. Default: `false`.

## Example

### custom parseLine
Expand Down
68 changes: 59 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@
const metadata = Symbol.for('pino.metadata')
const split = require('split2')
const { Duplex } = require('readable-stream')
const { parentPort, workerData } = require('worker_threads')

function createDeferred () {
let resolve
let reject
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve
reject = _reject
})
promise.resolve = resolve
promise.reject = reject
return promise
}

module.exports = function build (fn, opts = {}) {
const waitForConfig = opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig === true
const parseLines = opts.parse === 'lines'
const parseLine = typeof opts.parseLine === 'function' ? opts.parseLine : JSON.parse
const close = opts.close || defaultClose
Expand Down Expand Up @@ -50,27 +64,63 @@ module.exports = function build (fn, opts = {}) {
}
}

if (opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig !== true) {
setImmediate(() => {
stream.emit('error', new Error('This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.'))
})
}

if (opts.metadata !== false) {
stream[metadata] = true
stream.lastTime = 0
stream.lastLevel = 0
stream.lastObj = null
}

let res = fn(stream)
if (waitForConfig) {
let pinoConfig = {}
const configReceived = createDeferred()
parentPort.on('message', function handleMessage (message) {
if (message.code === 'PINO_CONFIG') {
pinoConfig = message.config
configReceived.resolve()
parentPort.off('message', handleMessage)
}
})

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
Object.defineProperties(stream, {
levels: {
get () { return pinoConfig.levels }
},
messageKey: {
get () { return pinoConfig.messageKey }
},
errorKey: {
get () { return pinoConfig.errorKey }
}
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
return configReceived.then(finish)
}

return stream
return finish()

function finish () {
let res = fn(stream)

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
}

return stream
}
}

function defaultClose (err, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"snazzy": "^9.0.0",
"standard": "^17.0.0",
"tap": "^16.0.0",
"thread-stream": "^2.4.1",
"tsd": "^0.31.0"
},
"tsd": {
Expand Down
7 changes: 7 additions & 0 deletions test/base.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ test('rejecting errors the stream', async ({ same, plan }) => {
same(err.message, 'kaboom')
})

test('emits an error if the transport expects pino to send the config, but pino is not going to', async function ({ plan, same }) {
plan(1)
const stream = build(() => {}, { expectPinoConfig: true })
const [err] = await once(stream, 'error')
same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.')
})

test('set metadata', ({ same, plan, equal }) => {
plan(9)

Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/transport-async-iteration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
async function (source) {
for await (const obj of source) {
port.postMessage({
data: obj,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
}
},
opts
)
}
22 changes: 22 additions & 0 deletions test/fixtures/transport-on-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
function (source) {
source.on('data', function (line) {
port.postMessage({
data: line,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
})
},
opts
)
}
24 changes: 24 additions & 0 deletions test/fixtures/transport-transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const { Transform, pipeline } = require('stream')
const build = require('../..')

module.exports = function (threadStreamOpts) {
const { opts = {} } = threadStreamOpts
return build(function (source) {
const transform = new Transform({
objectMode: true,
autoDestroy: true,
transform (chunk, enc, cb) {
chunk.service = 'from transform'
chunk.level = `${source.levels.labels[chunk.level]}(${chunk.level})`
chunk[source.messageKey] = chunk[source.messageKey].toUpperCase()
cb(null, JSON.stringify(chunk) + '\n')
}
})

pipeline(source, transform, () => {})

return transform
}, { ...opts, enablePipelining: true })
}
15 changes: 15 additions & 0 deletions test/fixtures/worker-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

const { pipeline, PassThrough } = require('stream')

module.exports = async function ({ targets }) {
const streams = await Promise.all(targets.map(async (t) => {
const fn = require(t.target)
const stream = await fn(t.options)
return stream
}))

const stream = new PassThrough()
pipeline(stream, ...streams, () => {})
return stream
}
Loading