1
1
/*
2
- * Copyright 2024 The Drasi Authors.
3
- *
4
- * Licensed under the Apache License, Version 2.0 (the "License");
5
- * you may not use this file except in compliance with the License.
6
- * You may obtain a copy of the License at
7
- *
8
- * http://www.apache.org/licenses/LICENSE-2.0
9
- *
10
- * Unless required by applicable law or agreed to in writing, software
11
- * distributed under the License is distributed on an "AS IS" BASIS,
12
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
- * See the License for the specific language governing permissions and
14
- * limitations under the License.
15
- */
2
+ * Copyright 2024 The Drasi Authors.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
16
17
17
package io .drasi ;
18
18
19
19
import com .fasterxml .jackson .databind .JsonNode ;
20
20
import com .fasterxml .jackson .databind .ObjectMapper ;
21
- import com .fasterxml .jackson .databind .node .JsonNodeFactory ;
22
- import com .fasterxml .jackson .databind .node .ObjectNode ;
23
21
import io .drasi .models .NodeMapping ;
24
22
import io .drasi .models .RelationalGraphMapping ;
25
23
import io .drasi .models .RelationshipMapping ;
26
24
import io .debezium .engine .ChangeEvent ;
27
25
import io .debezium .engine .DebeziumEngine ;
28
26
import io .drasi .source .sdk .ChangePublisher ;
29
- import io .drasi .source .sdk .Reactivator ;
30
27
import io .drasi .source .sdk .models .*;
28
+ import org .slf4j .Logger ;
29
+ import org .slf4j .LoggerFactory ;
31
30
32
31
import java .io .IOException ;
33
32
import java .util .HashMap ;
34
33
import java .util .List ;
35
34
import java .util .Map ;
36
35
37
- abstract class RelationalChangeConsumer implements DebeziumEngine .ChangeConsumer <ChangeEvent <String , String >> {
38
-
39
- private ObjectMapper objectMapper = new ObjectMapper ();
40
- private Map <String , NodeMapping > tableToNodeMap ;
41
- private Map <String , RelationshipMapping > tableToRelMap ;
42
- private ChangePublisher changePublisher ;
43
-
44
- public RelationalChangeConsumer (RelationalGraphMapping mappings , ChangePublisher changePublisher ) {
36
+ /**
37
+ * Processes change events from relational databases and publishes them using a change publisher.
38
+ */
39
+ public class RelationalChangeConsumer implements DebeziumEngine .ChangeConsumer <ChangeEvent <String , String >> {
40
+ private static final Logger log = LoggerFactory .getLogger (RelationalChangeConsumer .class );
41
+ private final ObjectMapper objectMapper = new ObjectMapper ();
42
+ private final Map <String , NodeMapping > tableToNodeMap = new HashMap <>();
43
+ private Map <String , RelationshipMapping > tableToRelMap = new HashMap <>();
44
+ private final ChangePublisher changePublisher ;
45
+ private final DatabaseStrategy dbStrategy ;
46
+
47
+ /**
48
+ * Creates a new RelationalChangeConsumer instance.
49
+ *
50
+ * @param mappings Mappings between database tables and graph nodes.
51
+ * @param changePublisher Publisher from Drasi Source SDK.
52
+ * @param dbStrategy Strategy for the specific database in use.
53
+ */
54
+ public RelationalChangeConsumer (RelationalGraphMapping mappings , ChangePublisher changePublisher , DatabaseStrategy dbStrategy ) {
45
55
this .changePublisher = changePublisher ;
46
- tableToNodeMap = new HashMap <>();
47
- tableToRelMap = new HashMap <>();
48
-
56
+ this .dbStrategy = dbStrategy ;
57
+
49
58
if (mappings .nodes != null )
50
59
for (var nodeMapping : mappings .nodes ) {
51
60
tableToNodeMap .putIfAbsent (nodeMapping .tableName , nodeMapping );
@@ -58,72 +67,83 @@ public RelationalChangeConsumer(RelationalGraphMapping mappings, ChangePublisher
58
67
}
59
68
60
69
@ Override
61
- public void handleBatch (List <ChangeEvent <String , String >> records , DebeziumEngine .RecordCommitter <ChangeEvent <String , String >> committer ) throws InterruptedException {
62
- for (var record : records ) {
63
-
64
- if (record .value () == null )
65
- return ;
70
+ public void handleBatch (List <ChangeEvent <String , String >> records ,
71
+ DebeziumEngine .RecordCommitter <ChangeEvent <String , String >> committer )
72
+ throws InterruptedException {
73
+ for (var record : records ) {
74
+ if (record .value () == null ) {
75
+ continue ;
76
+ }
66
77
67
78
try {
68
- var pgChange = objectMapper .readTree (record .value ());
69
- var drasiChange = ExtractNodeChange ( pgChange );
79
+ var sourceChange = objectMapper .readTree (record .value ());
80
+ var drasiChange = ExtractDrasiChange ( sourceChange );
70
81
if (drasiChange != null ) {
71
82
changePublisher .Publish (drasiChange );
83
+ } else {
84
+ log .warn ("Change not processed: {}" , sourceChange );
72
85
}
73
86
} catch (IOException e ) {
87
+ log .error ("Error processing change record: {}" , e .getMessage ());
74
88
throw new InterruptedException (e .getMessage ());
75
89
}
90
+
76
91
committer .markProcessed (record );
77
92
}
93
+
78
94
committer .markBatchFinished ();
79
95
}
80
96
81
- private SourceChange ExtractNodeChange (JsonNode sourceChange ) {
82
- var pgPayload = sourceChange .path ("payload" );
83
-
84
- if (!pgPayload .has ("op" ))
97
+ private SourceChange ExtractDrasiChange (JsonNode sourceChange ) {
98
+ var payload = sourceChange .path ("payload" );
99
+ if (!payload .has ("op" )) {
85
100
return null ;
101
+ }
86
102
87
- var pgSource = pgPayload .path ("source" );
88
- var tableName = pgSource .path ("schema" ).asText () + "." + pgSource .path ("table" ).asText ();
89
-
90
- if (!tableToNodeMap .containsKey (tableName ))
91
- return null ;
103
+ var source = payload .path ("source" );
104
+ var tableName = dbStrategy .extractTableName (source );
92
105
93
106
var mapping = tableToNodeMap .get (tableName );
107
+
108
+ if (mapping == null ) {
109
+ log .warn ("Table {} not found in mappings" , tableName );
110
+ return null ;
111
+ }
94
112
95
- JsonNode item ;
96
- switch (pgPayload .path ("op" ).asText ()) {
97
- case "c" , "u" :
98
- item = pgPayload .path ("after" );
99
- break ;
100
- case "d" :
101
- item = pgPayload .path ("before" );
102
- break ;
103
- default :
104
- return null ;
113
+ var changeType = payload .path ("op" ).asText ();
114
+ var item = getChangeData (payload , changeType );
115
+
116
+ if (item == null ) {
117
+ log .warn ("No change data found for type: {}" , changeType );
118
+ return null ;
105
119
}
106
- var nodeId = SanitizeNodeId ( mapping . tableName + ":" + item . path ( mapping . keyField ). asText ());
120
+
107
121
if (!item .has (mapping .keyField )) {
122
+ log .warn ("Key field {} not found in change data" , mapping .keyField );
108
123
return null ;
109
124
}
110
- var tsMs = pgPayload .path ("ts_ms" ).asLong ();
111
-
112
- switch (pgPayload .path ("op" ).asText ()) {
113
- case "c" :
114
- return new SourceInsert (nodeId , tsMs , item , null , mapping .labels .stream ().toList (), tsMs , ExtractLsn (pgSource ));
115
- case "u" :
116
- return new SourceUpdate (nodeId , tsMs , item , null , mapping .labels .stream ().toList (), tsMs , ExtractLsn (pgSource ));
117
- case "d" :
118
- return new SourceDelete (nodeId , tsMs , null , mapping .labels .stream ().toList (), tsMs , ExtractLsn (pgSource ));
119
- }
120
125
121
- return null ;
126
+ var nodeId = createNodeId (mapping .tableName , item .path (mapping .keyField ).asText ());
127
+ var timestamp = payload .path ("ts_ms" ).asLong ();
128
+ var lsn = dbStrategy .extractLsn (source );
129
+
130
+ return switch (changeType ) {
131
+ case "c" -> new SourceInsert (nodeId , timestamp , item , null , mapping .labels .stream ().toList (), timestamp , lsn );
132
+ case "u" -> new SourceUpdate (nodeId , timestamp , item , null , mapping .labels .stream ().toList (), timestamp , lsn );
133
+ case "d" -> new SourceDelete (nodeId , timestamp , null , mapping .labels .stream ().toList (), timestamp , lsn );
134
+ default -> null ;
135
+ };
122
136
}
123
137
124
- protected abstract long ExtractLsn (JsonNode sourceChange );
138
+ private JsonNode getChangeData (JsonNode payload , String changeType ) {
139
+ return switch (changeType ) {
140
+ case "c" , "u" -> payload .path ("after" );
141
+ case "d" -> payload .path ("before" );
142
+ default -> null ;
143
+ };
144
+ }
125
145
126
- private String SanitizeNodeId (String nodeId ) {
127
- return nodeId .replace ('.' , ':' );
146
+ private String createNodeId (String tableName , String keyFieldValue ) {
147
+ return ( tableName + ":" + keyFieldValue ) .replace ('.' , ':' );
128
148
}
129
- }
149
+ }
0 commit comments