Skip to content

Commit 54e5777

Browse files
committed
updated relational sources to use ns
1 parent 7cb2344 commit 54e5777

File tree

14 files changed

+128
-66
lines changed

14 files changed

+128
-66
lines changed

query-container/query-host/src/api.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ pub struct ChangeEvent {
162162
#[derive(Serialize, Deserialize, Debug)]
163163
struct ChangeEventTime {
164164
seq: u64,
165-
ms: u64,
165+
ns: u64,
166166
}
167167

168168
#[derive(Serialize, Deserialize, Debug)]
@@ -246,7 +246,7 @@ enum ElementType {
246246

247247
impl ChangeEvent {
248248
pub fn get_timestamp(&self) -> u64 {
249-
self.time.ms
249+
self.time.ns
250250
}
251251

252252
pub fn has_query(&self, query_id: &str) -> bool {
@@ -266,7 +266,7 @@ impl ChangeEvent {
266266
source_id: future_ref.element_ref.source_id.to_string(),
267267
time: ChangeEventTime {
268268
seq: 0,
269-
ms: future_ref.original_time,
269+
ns: future_ref.original_time,
270270
},
271271
queries: vec![query_id.to_string()],
272272
op: ChangeType::Future,
@@ -293,7 +293,7 @@ impl TryInto<SourceChange> for ChangeEvent {
293293
&self.source_id,
294294
self.element_type
295295
.ok_or(ApiError::BadRequest("missing element type".into()))?,
296-
self.time.ms,
296+
self.time.ns,
297297
)?,
298298
},
299299
ChangeType::Update => SourceChange::Update {
@@ -304,12 +304,12 @@ impl TryInto<SourceChange> for ChangeEvent {
304304
&self.source_id,
305305
self.element_type
306306
.ok_or(ApiError::BadRequest("missing element type".into()))?,
307-
self.time.ms,
307+
self.time.ns,
308308
)?,
309309
},
310310
ChangeType::Delete => SourceChange::Delete {
311311
metadata: ElementMetadata {
312-
effective_from: self.time.ms,
312+
effective_from: self.time.ns,
313313
labels: Arc::from(match &self.before {
314314
Some(before) => match &before.labels {
315315
Some(labels) => labels.iter().map(|l| Arc::from(l.as_str())).collect(),
@@ -326,7 +326,7 @@ impl TryInto<SourceChange> for ChangeEvent {
326326
ChangeType::Future => SourceChange::Future {
327327
future_ref: FutureElementRef {
328328
element_ref: ElementReference::new(&self.source_id, &self.id),
329-
original_time: self.time.ms,
329+
original_time: self.time.ns,
330330
due_time: self
331331
.future_due_time
332332
.ok_or(ApiError::BadRequest("Missing future due time".to_string()))?,

sources/cosmosdb/cosmosdb-ffcf-reactivator/Program.cs

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
// Copyright 2024 The Drasi Authors.
2-
//
3-
// Licensed under the Apache License, Version 2.0 (the "License");
4-
// you may not use this file except in compliance with the License.
5-
// You may obtain a copy of the License at
6-
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
8-
//
9-
// Unless required by applicable law or agreed to in writing, software
10-
// distributed under the License is distributed on an "AS IS" BASIS,
11-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12-
// See the License for the specific language governing permissions and
13-
// limitations under the License.
14-
1+
// Copyright 2024 The Drasi Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
1515
using System.Net;
1616
using System.Text;
1717
using Microsoft.Azure.Cosmos;
@@ -131,7 +131,7 @@ private static JObject FormatDebeziumEvent(JObject feedEvent, string changeEvent
131131
throw new NotSupportedException();
132132
}
133133

134-
result["ts_ms"] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
134+
result["ts_ns"] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000.0;
135135
result["schema"] = "";
136136

137137
var isRelation = (bool)(source["_isEdge"] ?? false);

sources/relational/debezium-reactivator/Dockerfile

+41-3
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,52 @@
1616
# Build stage
1717
#
1818
FROM --platform=$BUILDPLATFORM maven:3.9.9-amazoncorretto-21 AS build
19+
WORKDIR /home/app
20+
21+
# Copy the source code, pom.xml, and lib folder
1922
COPY src /home/app/src
20-
COPY pom.xml /home/app
23+
COPY pom.xml /home/app/
24+
COPY lib /home/app/lib
25+
26+
# Run the Maven build
2127
RUN mvn -f /home/app/pom.xml clean package
2228

23-
RUN cd /home/app && mvn dependency:tree
29+
# Optional: Uncomment to debug dependency tree
30+
# RUN mvn dependency:tree
31+
2432
#
2533
# Package stage
2634
#
2735
FROM --platform=$TARGETPLATFORM mcr.microsoft.com/openjdk/jdk:21-distroless
2836
COPY --from=build /home/app/target /home/app
29-
CMD ["-jar","/home/app/debezium-reactivator-1.0.jar"]
37+
CMD ["-jar", "/home/app/debezium-reactivator-1.0.jar"]
38+
39+
# # Copyright 2024 The Drasi Authors.
40+
# #
41+
# # Licensed under the Apache License, Version 2.0 (the "License");
42+
# # you may not use this file except in compliance with the License.
43+
# # You may obtain a copy of the License at
44+
# #
45+
# # http://www.apache.org/licenses/LICENSE-2.0
46+
# #
47+
# # Unless required by applicable law or agreed to in writing, software
48+
# # distributed under the License is distributed on an "AS IS" BASIS,
49+
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
50+
# # See the License for the specific language governing permissions and
51+
# # limitations under the License.
52+
53+
# #
54+
# # Build stage
55+
# #
56+
# FROM --platform=$BUILDPLATFORM maven:3.9.9-amazoncorretto-21 AS build
57+
# COPY src /home/app/src
58+
# COPY pom.xml /home/app
59+
# RUN mvn -f /home/app/pom.xml clean package
60+
61+
# RUN cd /home/app && mvn dependency:tree
62+
# #
63+
# # Package stage
64+
# #
65+
# FROM --platform=$TARGETPLATFORM mcr.microsoft.com/openjdk/jdk:21-distroless
66+
# COPY --from=build /home/app/target /home/app
67+
# CMD ["-jar","/home/app/debezium-reactivator-1.0.jar"]
Binary file not shown.

sources/relational/debezium-reactivator/pom.xml

+22-1
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,30 @@
128128
</configuration>
129129
</plugin>
130130
<plugin>
131+
<groupId>org.apache.maven.plugins</groupId>
132+
<artifactId>maven-install-plugin</artifactId>
133+
<version>3.1.1</version>
134+
<executions>
135+
<execution>
136+
<id>install-local-jar</id>
137+
<phase>initialize</phase>
138+
<goals>
139+
<goal>install-file</goal>
140+
</goals>
141+
<configuration>
142+
<file>lib/source.sdk-0.1.4.jar</file>
143+
<groupId>io.drasi</groupId>
144+
<artifactId>source.sdk</artifactId>
145+
<version>0.1.4</version>
146+
<packaging>jar</packaging>
147+
</configuration>
148+
</execution>
149+
</executions>
150+
</plugin>
151+
<!-- <plugin>
131152
<artifactId>maven-install-plugin</artifactId>
132153
<version>2.5.2</version>
133-
</plugin>
154+
</plugin> -->
134155
<plugin>
135156
<artifactId>maven-deploy-plugin</artifactId>
136157
<version>2.8.2</version>

sources/relational/debezium-reactivator/src/main/java/io/drasi/RelationalChangeConsumer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ private SourceChange ExtractDrasiChange(JsonNode sourceChange) {
124124
}
125125

126126
var nodeId = createNodeId(mapping.tableName, item.path(mapping.keyField).asText());
127-
var timestamp = payload.path("ts_ms").asLong();
127+
System.out.println("Payload: " + payload.toPrettyString()); // or payload.toString() for compact format
128+
129+
var timestamp = payload.path("ts_ns").asLong();
130+
System.out.println("Timestamp: " + timestamp);
131+
128132
var lsn = dbStrategy.extractLsn(source);
129133

130134
return switch (changeType) {

sources/sdk/java/src/main/java/io/drasi/source/sdk/models/SourceChange.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public abstract class SourceChange {
2727
protected abstract Op getOp();
2828

2929
private String id;
30-
private long tsMS;
30+
private long tsNS;
3131
private JsonNode properties;
3232
private Map<String, Object> metadata;
3333
private List<String> labels;
@@ -38,9 +38,9 @@ public abstract class SourceChange {
3838
private String sourceTable;
3939

4040

41-
protected SourceChange(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long lsn) {
41+
protected SourceChange(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long lsn) {
4242
this.id = id;
43-
this.tsMS = tsMS;
43+
this.tsNS = tsNS;
4444
this.properties = properties;
4545
this.metadata = metadata;
4646
this.labels = labels;
@@ -49,9 +49,9 @@ protected SourceChange(String id, long tsMS, JsonNode properties, Map<String, Ob
4949
this.sourceTable = "node";
5050
}
5151

52-
protected SourceChange(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long lsn, String startId, String endId) {
52+
protected SourceChange(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long lsn, String startId, String endId) {
5353
this.id = id;
54-
this.tsMS = tsMS;
54+
this.tsNS = tsNS;
5555
this.properties = properties;
5656
this.metadata = metadata;
5757
this.labels = labels;
@@ -67,7 +67,7 @@ public String toJson() {
6767
rgSource.put("db", Reactivator.SourceId());
6868
rgSource.put("table", sourceTable);
6969
rgSource.put("lsn", lsn);
70-
rgSource.put("ts_ms", sourceTsMS);
70+
rgSource.put("ts_ns", sourceTsMS);
7171

7272
var payload = JsonNodeFactory.instance.objectNode();
7373
payload.set("source", rgSource);
@@ -93,7 +93,7 @@ public String toJson() {
9393
result.put("op", "d");
9494
break;
9595
}
96-
result.put("ts_ms", tsMS);
96+
result.put("ts_ns", tsNS);
9797
result.set("payload", payload);
9898

9999
return result.toString();

sources/sdk/java/src/main/java/io/drasi/source/sdk/models/SourceDelete.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,30 @@ protected Op getOp() {
3535
* Create a delete operation of a node with the given id, timestamp, metadata and labels.
3636
*
3737
* @param id The id of the element.
38-
* @param tsMS The timestamp of the event in milliseconds.
38+
* @param tsNS The timestamp of the event in nanoseconds.
3939
* @param metadata The metadata of the element.
4040
* @param labels The labels of the element.
4141
* @param sourceTsMS The timestamp of the event in the source database in milliseconds.
4242
* @param sequenceNumber The sequence number of the event in the source database.
4343
*/
44-
public SourceDelete(String id, long tsMS, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
45-
super(id, tsMS, null, metadata, labels, sourceTsMS, sequenceNumber);
44+
public SourceDelete(String id, long tsNS, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
45+
super(id, tsNS, null, metadata, labels, sourceTsMS, sequenceNumber);
4646

4747
}
4848

4949
/**
5050
* Create a delete operation of a relation with the given id, timestamp, metadata, labels, startId and endId.
5151
*
5252
* @param id The id of the element.
53-
* @param tsMS The timestamp of the event in milliseconds.
53+
* @param tsNS The timestamp of the event in nanoseconds.
5454
* @param metadata The metadata of the element.
5555
* @param labels The labels of the element.
5656
* @param sourceTsMS The timestamp of the event in the source database in milliseconds.
5757
* @param sequenceNumber The sequence number of the event in the source database.
5858
* @param startId The id of the start node.
5959
* @param endId The id of the end node.
6060
*/
61-
public SourceDelete(String id, long tsMS, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
62-
super(id, tsMS, null, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
61+
public SourceDelete(String id, long tsNS, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
62+
super(id, tsNS, null, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
6363
}
6464
}

sources/sdk/java/src/main/java/io/drasi/source/sdk/models/SourceInsert.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,23 @@ protected Op getOp() {
3535
* Create a new insert operation of a node with the given id, timestamp, properties, metadata and labels.
3636
*
3737
* @param id The id of the element.
38-
* @param tsMS The timestamp of the event in milliseconds.
38+
* @param tsNS The timestamp of the event in nanoseconds.
3939
* @param properties The properties of the element.
4040
* @param metadata The metadata of the element.
4141
* @param labels The labels of the element.
4242
* @param sourceTsMS The timestamp of the event in the source database in milliseconds.
4343
* @param sequenceNumber The sequence number of the event in the source database.
4444
*/
45-
public SourceInsert(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
46-
super(id, tsMS, properties, metadata, labels, sourceTsMS, sequenceNumber);
45+
public SourceInsert(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
46+
super(id, tsNS, properties, metadata, labels, sourceTsMS, sequenceNumber);
4747

4848
}
4949

5050
/**
5151
* Create a new insert operation of a relation with the given id, timestamp, properties, metadata, labels, startId and endId.
5252
*
5353
* @param id The id of the element.
54-
* @param tsMS The timestamp of the event in milliseconds.
54+
* @param tsNS The timestamp of the event in nanoseconds.
5555
* @param properties The properties of the element.
5656
* @param metadata The metadata of the element.
5757
* @param labels The labels of the element.
@@ -60,7 +60,7 @@ public SourceInsert(String id, long tsMS, JsonNode properties, Map<String, Objec
6060
* @param startId The id of the start node.
6161
* @param endId The id of the end node.
6262
*/
63-
public SourceInsert(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
64-
super(id, tsMS, properties, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
63+
public SourceInsert(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
64+
super(id, tsNS, properties, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
6565
}
6666
}

sources/sdk/java/src/main/java/io/drasi/source/sdk/models/SourceUpdate.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,23 @@ protected Op getOp() {
3535
* Create an update operation of a node with the given id, timestamp, properties, metadata and labels.
3636
*
3737
* @param id The id of the element.
38-
* @param tsMS The timestamp of the event in milliseconds.
38+
* @param tsNS The timestamp of the event in nanoseconds.
3939
* @param properties The properties of the element.
4040
* @param metadata The metadata of the element.
4141
* @param labels The labels of the element.
4242
* @param sourceTsMS The timestamp of the event in the source database in milliseconds.
4343
* @param sequenceNumber The sequence number of the event in the source database.
4444
*/
45-
public SourceUpdate(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
46-
super(id, tsMS, properties, metadata, labels, sourceTsMS, sequenceNumber);
45+
public SourceUpdate(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber) {
46+
super(id, tsNS, properties, metadata, labels, sourceTsMS, sequenceNumber);
4747

4848
}
4949

5050
/**
5151
* Create an update operation of a relation with the given id, timestamp, properties, metadata, labels, startId and endId.
5252
*
5353
* @param id The id of the element.
54-
* @param tsMS The timestamp of the event in milliseconds.
54+
* @param tsNS The timestamp of the event in nanoseconds.
5555
* @param properties The properties of the element.
5656
* @param metadata The metadata of the element.
5757
* @param labels The labels of the element.
@@ -60,7 +60,7 @@ public SourceUpdate(String id, long tsMS, JsonNode properties, Map<String, Objec
6060
* @param startId The id of the start node.
6161
* @param endId The id of the end node.
6262
*/
63-
public SourceUpdate(String id, long tsMS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
64-
super(id, tsMS, properties, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
63+
public SourceUpdate(String id, long tsNS, JsonNode properties, Map<String, Object> metadata, List<String> labels, long sourceTsMS, long sequenceNumber, String startId, String endId) {
64+
super(id, tsNS, properties, metadata, labels, sourceTsMS, sequenceNumber, startId, endId);
6565
}
6666
}

sources/shared/change-dispatcher/src/main.rs

-7
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,6 @@ async fn process_changes(
162162
));
163163
}
164164
};
165-
// dispatch_event["metadata"]["tracking"]["source"]["changeDispatcherEnd_ms"] =
166-
// match serde_json::to_value(0) {
167-
// Ok(val) => val,
168-
// Err(_) => {
169-
// unreachable!();
170-
// }
171-
// };
172165

173166
let subscriptions = match change_event["subscriptions"].as_array() {
174167
Some(subs) => subs.clone(),

0 commit comments

Comments
 (0)