From 27a505a4c6c83d4c2eee47e325c5431ee6d824c6 Mon Sep 17 00:00:00 2001 From: Benjamin Chrobot Date: Tue, 22 Sep 2020 16:15:49 -0400 Subject: [PATCH 1/2] fix: handle empty message payload safely --- src/lib/acker.ts | 25 +++++++++++++++++++++---- src/lib/publisher.ts | 31 ++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/lib/acker.ts b/src/lib/acker.ts index 1b83347..e59105a 100644 --- a/src/lib/acker.ts +++ b/src/lib/acker.ts @@ -7,7 +7,10 @@ export const createAcker = () => { const pool = new Pool({ connectionString: config.databaseUrl }); setTimeout(async () => { - console.log(`Renotifying unacked jobs older than ${REQUEUE_CHECK}`); + const now = new Date().toISOString(); + console.log( + `${now} INFO Renotifying unacked jobs older than ${REQUEUE_CHECK}` + ); try { await pool.query( 'call assemble_worker.renotify_unacked_jobs_queued_for_more_than_30_seconds()' @@ -18,6 +21,16 @@ export const createAcker = () => { }, REQUEUE_CHECK); return async (message: Notification) => { + const messageStr = JSON.stringify(message); + + if (!message.payload) { + const now = new Date().toISOString(); + console.log( + `${now} WARN encountered empty payload acking message: ${messageStr}` + ); + return; + } + try { const splitByBar = message.payload.split('|'); const stringContents = splitByBar.slice(1).join('|'); @@ -26,14 +39,18 @@ export const createAcker = () => { const job_id = json.job_id; if (config.verbose) { - console.log(`Acking job ${job_id}`); + const now = new Date().toISOString(); + console.log(`${now} INFO Acking job ${job_id}`); await pool.query( 'update assemble_worker.jobs set last_acked_at = $1 where id = $2', [new Date(), job_id] ); } - } catch (ex) { - console.error('Could not ack job', ex); + } catch (err) { + const now = new Date().toISOString(); + console.log( + `${now} ERROR encountered error acking job: ${err.message}. Message: ${messageStr}` + ); } }; }; diff --git a/src/lib/publisher.ts b/src/lib/publisher.ts index 3ecd95e..6ea194b 100644 --- a/src/lib/publisher.ts +++ b/src/lib/publisher.ts @@ -7,15 +7,32 @@ export const createPublisher = async exchange => { const channel = await connection.createChannel(); return (message: Notification) => { - const splitByBar = message.payload.split('|'); - const routingKey = splitByBar[0]; - const stringContents = splitByBar.slice(1).join('|'); - const contents = Buffer.from(stringContents); + const messageStr = JSON.stringify(message); - if (config.verbose) { - console.log(`Forwarding message to ${routingKey}: ${stringContents}`); + if (!message.payload) { + const now = new Date().toISOString(); + console.log( + `${now} WARN encountered empty payload for message: ${messageStr}` + ); + return; } - channel.publish(exchange, routingKey, contents); + try { + const splitByBar = message.payload.split('|'); + const routingKey = splitByBar[0]; + const stringContents = splitByBar.slice(1).join('|'); + const contents = Buffer.from(stringContents); + + if (config.verbose) { + console.log(`Forwarding message to ${routingKey}: ${stringContents}`); + } + + channel.publish(exchange, routingKey, contents); + } catch (err) { + const now = new Date().toISOString(); + console.log( + `${now} ERROR encountered error publishing message: ${err.message}. Message: ${messageStr}` + ); + } }; }; From 8413099de8298d94ef1cd3be98d88161ad5f60d9 Mon Sep 17 00:00:00 2001 From: Benjamin Chrobot Date: Tue, 22 Sep 2020 16:18:57 -0400 Subject: [PATCH 2/2] chore: add timestamp --- src/lib/publisher.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lib/publisher.ts b/src/lib/publisher.ts index 6ea194b..72306b4 100644 --- a/src/lib/publisher.ts +++ b/src/lib/publisher.ts @@ -24,7 +24,10 @@ export const createPublisher = async exchange => { const contents = Buffer.from(stringContents); if (config.verbose) { - console.log(`Forwarding message to ${routingKey}: ${stringContents}`); + const now = new Date().toISOString(); + console.log( + `${now} INFO Forwarding message to ${routingKey}: ${stringContents}` + ); } channel.publish(exchange, routingKey, contents);