Skip to content

Commit

Permalink
DBZ-7618 Prevent multiple calls to pu.maxTimestamp()
Browse files Browse the repository at this point in the history
  • Loading branch information
samssh committed Mar 11, 2024
1 parent 7019a4b commit a8b1ecd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
}

private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
long maxTimePu = pu.maxTimestamp();
UnfilteredRowIterator it = pu.unfilteredIterator();
while (it.hasNext()) {
Unfiltered rowOrRangeTombstone = it.next();
Expand All @@ -318,7 +319,7 @@ private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition
}
if (rowOrRangeTombstone instanceof Row) {
Row row = (Row) rowOrRangeTombstone;
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable);
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable, maxTimePu);
}
else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
handleRangeTombstoneBoundMarker((RangeTombstoneBoundMarker) rowOrRangeTombstone,
Expand All @@ -328,6 +329,7 @@ else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
throw new CassandraConnectorSchemaException("Encountered unsupported Unfiltered type " + rowOrRangeTombstone.getClass());
}
}
it.close();
}

/**
Expand Down Expand Up @@ -407,7 +409,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, long maxTimePu) {
KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
LOGGER.trace("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString());
Expand All @@ -421,8 +423,7 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();

long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : maxTimePu;
switch (rowType) {
case INSERT:
recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
}

private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
long maxTimePu = pu.maxTimestamp();
UnfilteredRowIterator it = pu.unfilteredIterator();
while (it.hasNext()) {
Unfiltered rowOrRangeTombstone = it.next();
Expand All @@ -315,7 +316,7 @@ private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition
}
if (rowOrRangeTombstone instanceof Row) {
Row row = (Row) rowOrRangeTombstone;
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable);
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable, maxTimePu);
}
else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
handleRangeTombstoneBoundMarker((RangeTombstoneBoundMarker) rowOrRangeTombstone,
Expand All @@ -325,6 +326,7 @@ else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
throw new CassandraConnectorSchemaException("Encountered unsupported Unfiltered type " + rowOrRangeTombstone.getClass());
}
}
it.close();
}

/**
Expand Down Expand Up @@ -404,7 +406,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, long maxTimePu) {
KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
LOGGER.trace("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString());
Expand All @@ -418,7 +420,7 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();
long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : maxTimePu;

switch (rowType) {
case INSERT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
}

private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
long maxTimePu = pu.maxTimestamp();
UnfilteredRowIterator it = pu.unfilteredIterator();
while (it.hasNext()) {
Unfiltered rowOrRangeTombstone = it.next();
Expand All @@ -328,7 +329,7 @@ private void handleRowIterator(PartitionUpdate pu, OffsetPosition offsetPosition
}
if (rowOrRangeTombstone instanceof Row) {
Row row = (Row) rowOrRangeTombstone;
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable);
handleRowModifications(row, rowType, pu, offsetPosition, keyspaceTable, maxTimePu);
}
else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
handleRangeTombstoneBoundMarker((RangeTombstoneBoundMarker) rowOrRangeTombstone,
Expand All @@ -338,6 +339,7 @@ else if (rowOrRangeTombstone instanceof RangeTombstoneBoundMarker) {
throw new CassandraConnectorSchemaException("Encountered unsupported Unfiltered type " + rowOrRangeTombstone.getClass());
}
}
it.close();
}

/**
Expand Down Expand Up @@ -417,7 +419,7 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, long maxTimePu) {
KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
LOGGER.trace("Unable to get KeyValueSchema for table {}. It might have been deleted or CDC disabled.", keyspaceTable.toString());
Expand All @@ -431,7 +433,7 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();
long ts = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : maxTimePu;

switch (rowType) {
case INSERT:
Expand Down

0 comments on commit a8b1ecd

Please sign in to comment.