Skip to content

Commit

Permalink
[FLINK-35251][cdc][runtime] Fix bug of serializing derivation mapping…
Browse files Browse the repository at this point in the history
… in SchemaDerivation

This closes  #3267.
  • Loading branch information
PatrickRen committed Apr 29, 2024
1 parent de19f7b commit 711bc00
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static void serializeDerivationMapping(
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
// Serialize derivation mapping in SchemaDerivation
Map<TableId, Set<TableId>> derivationMapping = schemaDerivation.getDerivationMapping();
out.write(derivationMapping.size());
out.writeInt(derivationMapping.size());
for (Map.Entry<TableId, Set<TableId>> entry : derivationMapping.entrySet()) {
// Routed table ID
TableId routedTableId = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@

import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -362,4 +368,26 @@ void testIncompatibleTypes() {
.isInstanceOf(IllegalStateException.class)
.hasMessage("Incompatible types: \"INT\" and \"STRING\"");
}

@Test
void testSerde() throws Exception {
Map<TableId, Set<TableId>> derivationMapping = new HashMap<>();
Set<TableId> originalTableIds = new HashSet<>();
originalTableIds.add(TABLE_1);
originalTableIds.add(TABLE_2);
derivationMapping.put(MERGED_TABLE, originalTableIds);
SchemaDerivation schemaDerivation =
new SchemaDerivation(new SchemaManager(), ROUTES, derivationMapping);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
byte[] serialized = baos.toByteArray();
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
Map<TableId, Set<TableId>> deserialized =
SchemaDerivation.deserializerDerivationMapping(in);
assertThat(deserialized).isEqualTo(derivationMapping);
}
}
}
}

0 comments on commit 711bc00

Please sign in to comment.