Skip to content

Commit 7cb2344

Browse files
committed
updated timestamp to nanoseconds instead of millis
1 parent e7575d7 commit 7cb2344

File tree

2 files changed

+29
-21
lines changed

2 files changed

+29
-21
lines changed

sources/shared/change-dispatcher/src/main.rs

+23-18
Original file line numberDiff line numberDiff line change
@@ -151,22 +151,24 @@ async fn process_changes(
151151
);
152152

153153
let mut dispatch_event = change_event.clone();
154-
dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherStart_ms"] =
155-
match serde_json::to_value(chrono::Utc::now().timestamp_millis()) {
154+
155+
// Start time, measured in nanoseconds
156+
dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherStart_ns"] =
157+
match serde_json::to_value(chrono::Utc::now().timestamp_nanos()) {
156158
Ok(val) => val,
157159
Err(_) => {
158160
return Err(Box::<dyn std::error::Error>::from(
159161
"Error serializing timestamp into json value",
160162
));
161163
}
162164
};
163-
dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherEnd_ms"] =
164-
match serde_json::to_value(0) {
165-
Ok(val) => val,
166-
Err(_) => {
167-
unreachable!();
168-
}
169-
};
165+
// dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherEnd_ms"] =
166+
// match serde_json::to_value(0) {
167+
// Ok(val) => val,
168+
// Err(_) => {
169+
// unreachable!();
170+
// }
171+
// };
170172

171173
let subscriptions = match change_event["subscriptions"].as_array() {
172174
Some(subs) => subs.clone(),
@@ -185,15 +187,7 @@ async fn process_changes(
185187
for query_node_id in query_nodes {
186188
let app_id = format!("{}-publish-api", query_node_id);
187189

188-
dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherEnd_ms"] =
189-
match serde_json::to_value(chrono::Utc::now().timestamp_millis()) {
190-
Ok(val) => val,
191-
Err(_) => {
192-
return Err(Box::<dyn std::error::Error>::from(
193-
"Error serializing timestamp into json value",
194-
));
195-
}
196-
};
190+
197191
let queries: Vec<_> = subscriptions
198192
.iter()
199193
.filter(|x| x["queryNodeId"] == query_node_id)
@@ -213,6 +207,17 @@ async fn process_changes(
213207
headers.insert("traceparent".to_string(), traceparent.clone());
214208
}
215209
let headers = Headers::new(headers);
210+
211+
// End time, measured in nanoseconds
212+
dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherEnd_ns"] =
213+
match serde_json::to_value(chrono::Utc::now().timestamp_nanos()) {
214+
Ok(val) => val,
215+
Err(_) => {
216+
return Err(Box::<dyn std::error::Error>::from(
217+
"Error serializing timestamp into json value",
218+
));
219+
}
220+
};
216221
match invoker
217222
.invoke(
218223
Payload::Json(dispatch_event.clone()),

sources/shared/change-router/src/main.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,11 @@ async fn process_changes(
266266
}
267267
if let Some(changes) = changes.as_array() {
268268
for change in changes {
269-
let change_router_start = chrono::Utc::now().timestamp_millis();
270269
let change_id = Uuid::new_v4().to_string();
270+
info!("Begin processing change with id: {}", change_id);
271+
// Capture the start time of this change
272+
let change_router_start = chrono::Utc::now().timestamp_nanos();
273+
271274

272275
info!(
273276
"Processing change - db:{}, type:{}, id:{}",
@@ -523,8 +526,8 @@ async fn process_changes(
523526
"source": {
524527
"seq": change["payload"]["source"]["lsn"],
525528
"reactivator_ms": change["ts_ms"],
526-
"changeSvcStart_ms": change_router_start,
527-
"changeSvcEnd_ms": chrono::Utc::now().timestamp_millis()
529+
"changeSvcStart_ns": change_router_start,
530+
"changeSvcEnd_ns": chrono::Utc::now().timestamp_nanos(),
528531
}
529532
}
530533
}

0 commit comments

Comments
 (0)