From e0ff088fb2dcf2988627c0c4f15f9248fc246928 Mon Sep 17 00:00:00 2001 From: Stephen Parente Date: Sat, 16 Jun 2018 20:15:15 -0700 Subject: [PATCH] Add support for timestamp in delivery report --- src/callbacks.cc | 13 +++++++++++++ src/callbacks.h | 1 + 2 files changed, 14 insertions(+) diff --git a/src/callbacks.cc b/src/callbacks.cc index 361e87a0..f6a1ec1c 100644 --- a/src/callbacks.cc +++ b/src/callbacks.cc @@ -317,6 +317,11 @@ void DeliveryReportDispatcher::Flush() { delete persistent; } + if (event.timestamp > -1) { + Nan::Set(jsobj, Nan::New("timestamp").ToLocalChecked(), + Nan::New(event.timestamp)); + } + if (event.m_include_payload) { if (event.payload) { Nan::MaybeLocal buff = Nan::NewBuffer( @@ -359,6 +364,14 @@ DeliveryReport::DeliveryReport(RdKafka::Message &message, bool include_payload) partition = message.partition(); offset = message.offset(); + if (message.timestamp().type != + RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) { + timestamp = message.timestamp().timestamp; + } else { + timestamp = -1; + } + + // Key length. key_len = message.key_len(); diff --git a/src/callbacks.h b/src/callbacks.h index 2b877fb4..405c767e 100644 --- a/src/callbacks.h +++ b/src/callbacks.h @@ -110,6 +110,7 @@ class DeliveryReport { std::string topic_name; int32_t partition; int64_t offset; + int64_t timestamp; // Opaque token used. Local value void* opaque;