diff --git a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4SchemaChangeListener.java b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4SchemaChangeListener.java index 2865858..3afdb76 100644 --- a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4SchemaChangeListener.java +++ b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4SchemaChangeListener.java @@ -21,7 +21,6 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; -import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,13 +242,10 @@ else if (oldCdc) { newCdc); // else if it was not cdc before nor now, do nothing with schema holder // but add it to Cassandra for subsequent deserialization path in every case - - UUID uuid = UUID.nameUUIDFromBytes(ArrayUtils.addAll(newTableMetadata.getKeyspace().toString().getBytes(), - newTableMetadata.getName().toString().getBytes())); - + // we need to use the id of the existing table to correctly replace it org.apache.cassandra.schema.TableMetadata metadata = CreateTableStatement.parse(newTableMetadata.describe(true), newTableMetadata.getKeyspace().toString()) - .id(TableId.fromUUID(uuid)) + .id(TableId.fromUUID(oldTableMetaData.getId().get())) .build(); org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKeyspaceMetadata(metadata.keyspace); diff --git a/cassandra-4/src/test/java/io/debezium/connector/cassandra/SchemaChangeListenerTest.java b/cassandra-4/src/test/java/io/debezium/connector/cassandra/SchemaChangeListenerTest.java new file mode 100644 index 0000000..ae15766 --- /dev/null +++ b/cassandra-4/src/test/java/io/debezium/connector/cassandra/SchemaChangeListenerTest.java @@ -0,0 +1,59 @@ +package io.debezium.connector.cassandra; + +import java.util.List; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static io.debezium.connector.cassandra.Event.EventType.CHANGE_EVENT; +import static io.debezium.connector.cassandra.Record.Operation.INSERT; +import static io.debezium.connector.cassandra.TestUtils.TEST_KEYSPACE_NAME; +import static io.debezium.connector.cassandra.TestUtils.TEST_TABLE_NAME; +import static io.debezium.connector.cassandra.TestUtils.runCql; +import static java.lang.String.format; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SchemaChangeListenerTest extends AbstractCommitLogProcessorTest{ + + @Override + public void initialiseData() throws Exception { + createTable("CREATE TABLE %s.%s (a int, b int, PRIMARY KEY ((a), b)) WITH cdc = true;", + TEST_KEYSPACE_NAME, TEST_TABLE_NAME); + runCql(insertInto(TEST_KEYSPACE_NAME, TEST_TABLE_NAME) + .value("a", literal(1)) + .value("b", literal(2)) + .build()); + } + + @Override + public void verifyEvents() throws Exception { + // We have to read the first event before altering the table. + // That way we make sure that the initial schema is registered and the schema change code path is triggered. + List events = getEvents(1); + Record insert1 = (Record) events.get(0); + assertEquals(CHANGE_EVENT, insert1.getEventType()); + assertEquals(INSERT, insert1.getOp()); + assertTrue(insert1.getRowData().hasCell("a")); + assertTrue(insert1.getRowData().hasCell("b")); + assertFalse(insert1.getRowData().hasCell("c")); + + runCql(format("ALTER TABLE %s.%s ADD c int;", TEST_KEYSPACE_NAME, TEST_TABLE_NAME)); + + runCql(insertInto(TEST_KEYSPACE_NAME, TEST_TABLE_NAME) + .value("a", literal(3)) + .value("b", literal(4)) + .value("c", literal(5)) + .build()); + + events = getEvents(2); + Record insert2 = (Record) events.get(1); + System.out.println("GOT 2ND EVENT"); + + assertEquals(CHANGE_EVENT, insert2.getEventType()); + assertEquals(INSERT, insert2.getOp()); + assertTrue(insert2.getRowData().hasCell("a")); + assertTrue(insert2.getRowData().hasCell("b")); + assertTrue(insert2.getRowData().hasCell("c")); + } +}