Skip to content

Commit

Permalink
allow passing callback to flush (#1827)
Browse files Browse the repository at this point in the history
* allow passing callback to flush

* Update api.md

* Update docs/api.md

Co-authored-by: James Sumners <321201+jsumners@users.noreply.github.com>

* update sonic boom with the latest fix

---------

Co-authored-by: James Sumners <321201+jsumners@users.noreply.github.com>
  • Loading branch information
rluvaton and jsumners committed Oct 9, 2023
1 parent 7837851 commit 19ab286
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 11 deletions.
6 changes: 4 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -964,12 +964,12 @@ console.log(anotherChild.bindings())
```
<a id="flush"></a>
### `logger.flush()`
### `logger.flush([cb])`
Flushes the content of the buffer when using `pino.destination({
sync: false })`.
This is an asynchronous, fire and forget, operation.
This is an asynchronous, best used as fire and forget, operation.
The use case is primarily for asynchronous logging, which may buffer
log lines while others are being written. The `logger.flush` method can be
Expand All @@ -978,6 +978,8 @@ on a long interval, say ten seconds. Such a strategy can provide an
optimum balance between extremely efficient logging at high demand periods
and safer logging at low demand periods.
If there is a need to wait for the logs to be flushed, a callback should be used.
* See [`destination` parameter](#destination)
* See [Asynchronous Logging ⇗](/docs/asynchronous.md)
Expand Down
11 changes: 9 additions & 2 deletions lib/proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,14 @@ function write (_obj, msg, num) {

function noop () {}

function flush () {
function flush (cb) {
if (cb != null && typeof cb !== 'function') {
throw Error('callback must be a function')
}

const stream = this[streamSym]
if ('flush' in stream) stream.flush(noop)

if ('flush' in stream) {
stream.flush(cb || noop)
} else if (cb) cb()
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
"quick-format-unescaped": "^4.0.3",
"real-require": "^0.2.0",
"safe-stable-stringify": "^2.3.1",
"sonic-boom": "^3.1.0",
"sonic-boom": "^3.7.0",
"thread-stream": "^2.0.0"
},
"tsd": {
Expand Down
3 changes: 2 additions & 1 deletion pino.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ export interface LoggerExtras<Options = LoggerOptions> extends EventEmitter {

/**
* Flushes the content of the buffer when using pino.destination({ sync: false }).
* call the callback when finished
*/
flush(): void;
flush(cb?: (err?: Error) => void): void;
}


Expand Down
78 changes: 73 additions & 5 deletions test/syncfalse.test.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
'use strict'

const os = require('os')
const { createWriteStream } = require('fs')
const {
createWriteStream
} = require('fs')
const { readFile } = require('fs').promises
const { join } = require('path')
const { test } = require('tap')
const { fork } = require('child_process')
const writer = require('flush-write-stream')
const { once, getPathToNull } = require('./helper')
const {
once,
getPathToNull,
file,
watchFileCreated
} = require('./helper')
const { promisify } = require('util')

const sleep = promisify(setTimeout)

test('asynchronous logging', async ({ equal, teardown }) => {
test('asynchronous logging', async ({
equal,
teardown
}) => {
const now = Date.now
const hostname = os.hostname
const proc = process
Expand Down Expand Up @@ -63,7 +74,10 @@ test('asynchronous logging', async ({ equal, teardown }) => {
})
})

test('sync false with child', async ({ equal, teardown }) => {
test('sync false with child', async ({
equal,
teardown
}) => {
const now = Date.now
const hostname = os.hostname
const proc = process
Expand All @@ -87,7 +101,9 @@ test('sync false with child', async ({ equal, teardown }) => {
})).child({ hello: 'world' })

const dest = createWriteStream(getPathToNull())
dest.write = function (s) { actual += s }
dest.write = function (s) {
actual += s
}
const asyncLogger = pino(dest).child({ hello: 'world' })

let i = 500
Expand Down Expand Up @@ -121,3 +137,55 @@ test('flush does nothing with sync true (default)', async ({ equal }) => {
const instance = require('..')()
equal(instance.flush(), undefined)
})

test('should still call flush callback even when does nothing with sync true (default)', (t) => {
t.plan(3)
const instance = require('..')()
instance.flush((...args) => {
t.ok('flush called')
t.same(args, [])

// next tick to make flush not called more than once
process.nextTick(() => {
t.ok('flush next tick called')
})
})
})

test('should call the flush callback when flushed the data for async logger', async (t) => {
const outputPath = file()
async function getOutputLogLines () {
return (await readFile(outputPath)).toString().trim().split('\n').map(JSON.parse)
}

const pino = require('../')

const instance = pino({}, pino.destination({
dest: outputPath,

// to make sure it does not flush on its own
minLength: 4096
}))
const flushPromise = promisify(instance.flush).bind(instance)

instance.info('hello')
await flushPromise()
await watchFileCreated(outputPath)

const [firstFlushData] = await getOutputLogLines()

t.equal(firstFlushData.msg, 'hello')

// should not flush this as no data accumulated that's bigger than min length
instance.info('world')

// Making sure data is not flushed yet
const afterLogData = await getOutputLogLines()
t.equal(afterLogData.length, 1)

await flushPromise()

// Making sure data is not flushed yet
const afterSecondFlush = (await getOutputLogLines())[1]
t.equal(afterSecondFlush.msg, 'world')
})
35 changes: 35 additions & 0 deletions test/transport/syncfalse.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { join } = require('path')
const { test } = require('tap')
const { readFile } = require('fs').promises
const { watchFileCreated, file } = require('../helper')
const { promisify } = require('util')

const { pid } = process
const hostname = os.hostname()
Expand All @@ -31,3 +32,37 @@ test('thread-stream async flush', async ({ equal, same }) => {
msg: 'hello'
})
})

test('thread-stream async flush should call the passed callback', async (t) => {
const outputPath = file()
async function getOutputLogLines () {
return (await readFile(outputPath)).toString().trim().split('\n').map(JSON.parse)
}
const transport = pino.transport({
target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
options: { destination: outputPath }
})
const instance = pino(transport)
const flushPromise = promisify(instance.flush).bind(instance)

instance.info('hello')
await flushPromise()
await watchFileCreated(outputPath)

const [firstFlushData] = await getOutputLogLines()

t.equal(firstFlushData.msg, 'hello')

// should not flush this as no data accumulated that's bigger than min length
instance.info('world')

// Making sure data is not flushed yet
const afterLogData = await getOutputLogLines()
t.equal(afterLogData.length, 1)

await flushPromise()

// Making sure data is not flushed yet
const afterSecondFlush = (await getOutputLogLines())[1]
t.equal(afterSecondFlush.msg, 'world')
})
1 change: 1 addition & 0 deletions test/types/pino.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pino({ base: null });
if ("pino" in log) console.log(`pino version: ${log.pino}`);

expectType<void>(log.flush());
log.flush((err?: Error) => undefined);
log.child({ a: "property" }).info("hello child!");
log.level = "error";
log.info("nope");
Expand Down

0 comments on commit 19ab286

Please sign in to comment.