diff --git a/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3CommitLogReadHandlerImpl.java b/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3CommitLogReadHandlerImpl.java index 9cc5aad..01fbb11 100644 --- a/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3CommitLogReadHandlerImpl.java +++ b/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3CommitLogReadHandlerImpl.java @@ -81,6 +81,7 @@ public class Cassandra3CommitLogReadHandlerImpl implements CommitLogReadHandler private final CommitLogProcessorMetrics metrics; private final RangeTombstoneContext rangeTombstoneContext = new RangeTombstoneContext<>(); private final CassandraSchemaFactory schemaFactory; + private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode; Cassandra3CommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) { this.queues = context.getQueues(); @@ -91,6 +92,7 @@ public class Cassandra3CommitLogReadHandlerImpl implements CommitLogReadHandler this.schemaHolder = context.getSchemaHolder(); this.metrics = metrics; this.schemaFactory = CassandraSchemaFactory.get(); + this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode(); } /** @@ -391,7 +393,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema, - MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } /** @@ -428,25 +430,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu case INSERT: recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case UPDATE: recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case DELETE: recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case RANGE_TOMBSTONE: recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; default: @@ -485,7 +487,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } finally { rangeTombstoneContext.remove(pu.metadata()); @@ -665,4 +667,19 @@ private List getPartitionKeys(PartitionUpdate pu) { return values; } + private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) { + int hash; + switch (eventOrderGuaranteeMode) { + case COMMITLOG_FILE: + hash = offsetPosition.fileName.hashCode(); + break; + case PARTITION_VALUES: + hash = partitionUpdate.partitionKey().hashCode(); + break; + default: + throw new IllegalStateException(); + } + return ((hash % queues.size()) + queues.size()) % queues.size(); + } + } diff --git a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4CommitLogReadHandlerImpl.java b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4CommitLogReadHandlerImpl.java index 860025c..d289e69 100644 --- a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4CommitLogReadHandlerImpl.java +++ b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4CommitLogReadHandlerImpl.java @@ -79,6 +79,7 @@ public class Cassandra4CommitLogReadHandlerImpl implements CommitLogReadHandler private final CommitLogProcessorMetrics metrics; private final RangeTombstoneContext rangeTombstoneContext = new RangeTombstoneContext<>(); private final CassandraSchemaFactory schemaFactory; + private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode; Cassandra4CommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) { this.queues = context.getQueues(); @@ -89,6 +90,7 @@ public class Cassandra4CommitLogReadHandlerImpl implements CommitLogReadHandler this.schemaHolder = context.getSchemaHolder(); this.metrics = metrics; this.schemaFactory = CassandraSchemaFactory.get(); + this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode(); } /** @@ -388,7 +390,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema, - MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } /** @@ -426,25 +428,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu case INSERT: recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case UPDATE: recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case DELETE: recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case RANGE_TOMBSTONE: recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; default: @@ -483,7 +485,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } finally { rangeTombstoneContext.remove(pu.metadata()); @@ -663,4 +665,18 @@ private List getPartitionKeys(PartitionUpdate pu) { return values; } + private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) { + int hash; + switch (eventOrderGuaranteeMode) { + case COMMITLOG_FILE: + hash = offsetPosition.fileName.hashCode(); + break; + case PARTITION_VALUES: + hash = partitionUpdate.partitionKey().hashCode(); + break; + default: + throw new IllegalStateException(); + } + return ((hash % queues.size()) + queues.size()) % queues.size(); + } } diff --git a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java index acb4f67..5a96e2d 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java +++ b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java @@ -226,6 +226,56 @@ public static VarIntHandlingMode parse(String value, String defaultValue) { } } + /** + * The set of predefined EventOrderGuaranteeMode options. + * Each option determines to which property used for hashing. + * Events with the same hash value maintain the same order. + * Preferred to use PARTITION_VALUES to have same hashing strategy with messages in kafka. + */ + public enum EventOrderGuaranteeMode implements EnumeratedValue { + + /** + * Use commit log file name to calculate the hash of the event to determine the queue index. + */ + COMMITLOG_FILE("commitlog_file"), + + /** + * Use partition column values to calculate the hash of event to determine queue index. + */ + PARTITION_VALUES("partition_values"); + + private final String value; + + EventOrderGuaranteeMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static EventOrderGuaranteeMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (EventOrderGuaranteeMode option : values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + } + /** * The prefix prepended to all Kafka producer configurations, including schema registry */ @@ -468,6 +518,16 @@ public int getCommitLogMarkedCompletePollInterval() { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(CassandraSourceInfoStructMaker.class.getName()); + /** + * Must be one of 'COMMITLOG_FILE', or 'PARTITION_VALUES'. The default order guarantee mode is 'COMMITLOG_FILE'. + * See {@link EventOrderGuaranteeMode for details}. + */ + public static final Field EVENT_ORDER_GUARANTEE_MODE = Field.create("event.order.guarantee.mode") + .withDisplayName("VarInt Handling") + .withEnum(EventOrderGuaranteeMode.class, EventOrderGuaranteeMode.COMMITLOG_FILE) + .withImportance(Importance.MEDIUM) + .withDescription("Specifies how grantee order of change events."); + private static List validationFieldList = new ArrayList<>( Arrays.asList(OFFSET_BACKING_STORE_DIR, COMMIT_LOG_RELOCATION_DIR, SCHEMA_POLL_INTERVAL_MS, SNAPSHOT_POLL_INTERVAL_MS)); @@ -707,4 +767,8 @@ public void setValidationFieldList(List validationFieldList) { public String getNodeId() { return this.getConfig().getString(CASSANDRA_NODE_ID); } + + public EventOrderGuaranteeMode getEventOrderGuaranteeMode() { + return EventOrderGuaranteeMode.parse(this.getConfig().getString(EVENT_ORDER_GUARANTEE_MODE)); + } } diff --git a/dse/src/main/java/io/debezium/connector/dse/DseCommitLogReadHandlerImpl.java b/dse/src/main/java/io/debezium/connector/dse/DseCommitLogReadHandlerImpl.java index e190ead..02576a1 100644 --- a/dse/src/main/java/io/debezium/connector/dse/DseCommitLogReadHandlerImpl.java +++ b/dse/src/main/java/io/debezium/connector/dse/DseCommitLogReadHandlerImpl.java @@ -53,6 +53,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.cassandra.CassandraConnectorConfig; import io.debezium.connector.cassandra.CassandraConnectorContext; import io.debezium.connector.cassandra.CassandraSchemaFactory; import io.debezium.connector.cassandra.CassandraSchemaFactory.CellData; @@ -92,6 +93,7 @@ public class DseCommitLogReadHandlerImpl implements CommitLogReadHandler { private final CommitLogProcessorMetrics metrics; private final RangeTombstoneContext rangeTombstoneContext = new RangeTombstoneContext<>(); private final CassandraSchemaFactory schemaFactory; + private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode; DseCommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) { this.queues = context.getQueues(); @@ -102,6 +104,7 @@ public class DseCommitLogReadHandlerImpl implements CommitLogReadHandler { this.schemaHolder = context.getSchemaHolder(); this.metrics = metrics; this.schemaFactory = CassandraSchemaFactory.get(); + this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode(); } /** @@ -401,7 +404,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema, - MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } /** @@ -439,25 +442,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu case INSERT: recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case UPDATE: recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case DELETE: recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; case RANGE_TOMBSTONE: recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); break; default: @@ -496,7 +499,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET, - queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue); + queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue); } finally { rangeTombstoneContext.remove(pu.metadata()); @@ -676,4 +679,19 @@ private List getPartitionKeys(PartitionUpdate pu) { return values; } + private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) { + int hash; + switch (eventOrderGuaranteeMode) { + case COMMITLOG_FILE: + hash = offsetPosition.fileName.hashCode(); + break; + case PARTITION_VALUES: + hash = partitionUpdate.partitionKey().hashCode(); + break; + default: + throw new IllegalStateException(); + } + return ((hash % queues.size()) + queues.size()) % queues.size(); + } + }