Skip to content

Commit

Permalink
DBZ-7631 Add new hashing strategy to use partition columns with confi…
Browse files Browse the repository at this point in the history
…guration property.

To ensure the proper ordering of events within the same partition, it's necessary to utilize the partition columns when determining the queue index for event insertion.
To maintain the previous behavior, a property has been added to the config to allow the selection of the hashing strategy.
  • Loading branch information
samssh committed Mar 12, 2024
1 parent ca0cd55 commit 41cffe9
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class Cassandra3CommitLogReadHandlerImpl implements CommitLogReadHandler
private final CommitLogProcessorMetrics metrics;
private final RangeTombstoneContext<CFMetaData> rangeTombstoneContext = new RangeTombstoneContext<>();
private final CassandraSchemaFactory schemaFactory;
private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode;

Cassandra3CommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) {
this.queues = context.getQueues();
Expand All @@ -91,6 +92,7 @@ public class Cassandra3CommitLogReadHandlerImpl implements CommitLogReadHandler
this.schemaHolder = context.getSchemaHolder();
this.metrics = metrics;
this.schemaFactory = CassandraSchemaFactory.get();
this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode();
}

/**
Expand Down Expand Up @@ -391,7 +393,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo

recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema,
MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}

/**
Expand Down Expand Up @@ -428,25 +430,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
case INSERT:
recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case UPDATE:
recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case DELETE:
recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case RANGE_TOMBSTONE:
recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

default:
Expand Down Expand Up @@ -485,7 +487,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb

recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}
finally {
rangeTombstoneContext.remove(pu.metadata());
Expand Down Expand Up @@ -665,4 +667,19 @@ private List<Object> getPartitionKeys(PartitionUpdate pu) {
return values;
}

private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) {
int hash;
switch (eventOrderGuaranteeMode) {
case COMMITLOG_FILE:
hash = offsetPosition.fileName.hashCode();
break;
case PARTITION_VALUES:
hash = partitionUpdate.partitionKey().hashCode();
break;
default:
throw new IllegalStateException();
}
return ((hash % queues.size()) + queues.size()) % queues.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class Cassandra4CommitLogReadHandlerImpl implements CommitLogReadHandler
private final CommitLogProcessorMetrics metrics;
private final RangeTombstoneContext<org.apache.cassandra.schema.TableMetadata> rangeTombstoneContext = new RangeTombstoneContext<>();
private final CassandraSchemaFactory schemaFactory;
private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode;

Cassandra4CommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) {
this.queues = context.getQueues();
Expand All @@ -89,6 +90,7 @@ public class Cassandra4CommitLogReadHandlerImpl implements CommitLogReadHandler
this.schemaHolder = context.getSchemaHolder();
this.metrics = metrics;
this.schemaFactory = CassandraSchemaFactory.get();
this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode();
}

/**
Expand Down Expand Up @@ -388,7 +390,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo

recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema,
MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}

/**
Expand Down Expand Up @@ -426,25 +428,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
case INSERT:
recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case UPDATE:
recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case DELETE:
recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case RANGE_TOMBSTONE:
recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

default:
Expand Down Expand Up @@ -483,7 +485,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb

recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}
finally {
rangeTombstoneContext.remove(pu.metadata());
Expand Down Expand Up @@ -663,4 +665,18 @@ private List<Object> getPartitionKeys(PartitionUpdate pu) {
return values;
}

private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) {
int hash;
switch (eventOrderGuaranteeMode) {
case COMMITLOG_FILE:
hash = offsetPosition.fileName.hashCode();
break;
case PARTITION_VALUES:
hash = partitionUpdate.partitionKey().hashCode();
break;
default:
throw new IllegalStateException();
}
return ((hash % queues.size()) + queues.size()) % queues.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,56 @@ public static VarIntHandlingMode parse(String value, String defaultValue) {
}
}

/**
* The set of predefined EventOrderGuaranteeMode options.
* Each option determines to which property used for hashing.
* Events with the same hash value maintain the same order.
* Preferred to use PARTITION_VALUES to have same hashing strategy with messages in kafka.
*/
public enum EventOrderGuaranteeMode implements EnumeratedValue {

/**
* Use commit log file name to calculate the hash of the event to determine the queue index.
*/
COMMITLOG_FILE("commitlog_file"),

/**
* Use partition column values to calculate the hash of event to determine queue index.
*/
PARTITION_VALUES("partition_values");

private final String value;

EventOrderGuaranteeMode(String value) {
this.value = value;
}

@Override
public String getValue() {
return value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static EventOrderGuaranteeMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (EventOrderGuaranteeMode option : values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

}

/**
* The prefix prepended to all Kafka producer configurations, including schema registry
*/
Expand Down Expand Up @@ -468,6 +518,16 @@ public int getCommitLogMarkedCompletePollInterval() {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(CassandraSourceInfoStructMaker.class.getName());

/**
* Must be one of 'COMMITLOG_FILE', or 'PARTITION_VALUES'. The default order guarantee mode is 'COMMITLOG_FILE'.
* See {@link EventOrderGuaranteeMode for details}.
*/
public static final Field EVENT_ORDER_GUARANTEE_MODE = Field.create("event.order.guarantee.mode")
.withDisplayName("VarInt Handling")
.withEnum(EventOrderGuaranteeMode.class, EventOrderGuaranteeMode.COMMITLOG_FILE)
.withImportance(Importance.MEDIUM)
.withDescription("Specifies how grantee order of change events.");

private static List<Field> validationFieldList = new ArrayList<>(
Arrays.asList(OFFSET_BACKING_STORE_DIR, COMMIT_LOG_RELOCATION_DIR, SCHEMA_POLL_INTERVAL_MS, SNAPSHOT_POLL_INTERVAL_MS));

Expand Down Expand Up @@ -707,4 +767,8 @@ public void setValidationFieldList(List<Field> validationFieldList) {
public String getNodeId() {
return this.getConfig().getString(CASSANDRA_NODE_ID);
}

public EventOrderGuaranteeMode getEventOrderGuaranteeMode() {
return EventOrderGuaranteeMode.parse(this.getConfig().getString(EVENT_ORDER_GUARANTEE_MODE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.CassandraSchemaFactory.CellData;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class DseCommitLogReadHandlerImpl implements CommitLogReadHandler {
private final CommitLogProcessorMetrics metrics;
private final RangeTombstoneContext<org.apache.cassandra.schema.TableMetadata> rangeTombstoneContext = new RangeTombstoneContext<>();
private final CassandraSchemaFactory schemaFactory;
private final CassandraConnectorConfig.EventOrderGuaranteeMode eventOrderGuaranteeMode;

DseCommitLogReadHandlerImpl(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) {
this.queues = context.getQueues();
Expand All @@ -102,6 +104,7 @@ public class DseCommitLogReadHandlerImpl implements CommitLogReadHandler {
this.schemaHolder = context.getSchemaHolder();
this.metrics = metrics;
this.schemaFactory = CassandraSchemaFactory.get();
this.eventOrderGuaranteeMode = context.getCassandraConnectorConfig().getEventOrderGuaranteeMode();
}

/**
Expand Down Expand Up @@ -401,7 +404,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo

recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema,
MARK_OFFSET, queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
MARK_OFFSET, queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}

/**
Expand Down Expand Up @@ -439,25 +442,25 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
case INSERT:
recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case UPDATE:
recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case DELETE:
recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

case RANGE_TOMBSTONE:
recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
break;

default:
Expand Down Expand Up @@ -496,7 +499,7 @@ private void handleRangeTombstoneBoundMarker(RangeTombstoneBoundMarker rangeTomb

recordMaker.rangeTombstone(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keyValueSchema.keySchema(), keyValueSchema.valueSchema(), MARK_OFFSET,
queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
queues.get(getPartitionQueueIndex(pu, offsetPosition))::enqueue);
}
finally {
rangeTombstoneContext.remove(pu.metadata());
Expand Down Expand Up @@ -676,4 +679,19 @@ private List<Object> getPartitionKeys(PartitionUpdate pu) {
return values;
}

private int getPartitionQueueIndex(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition) {
int hash;
switch (eventOrderGuaranteeMode) {
case COMMITLOG_FILE:
hash = offsetPosition.fileName.hashCode();
break;
case PARTITION_VALUES:
hash = partitionUpdate.partitionKey().hashCode();
break;
default:
throw new IllegalStateException();
}
return ((hash % queues.size()) + queues.size()) % queues.size();
}

}

0 comments on commit 41cffe9

Please sign in to comment.