diff --git a/emulator/src/main/java/kafka/cli/emulator/EmulatorArchive.java b/emulator/src/main/java/kafka/cli/emulator/EmulatorArchive.java index e99e108..5c568aa 100644 --- a/emulator/src/main/java/kafka/cli/emulator/EmulatorArchive.java +++ b/emulator/src/main/java/kafka/cli/emulator/EmulatorArchive.java @@ -9,6 +9,12 @@ import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; public class EmulatorArchive { @@ -19,10 +25,102 @@ public class EmulatorArchive { List includeTopics = new ArrayList<>(); List excludeTopics = new ArrayList<>(); + FieldFormat keyFormat; + FieldFormat valueFormat; + + final StringDeserializer stringDeserializer = new StringDeserializer(); + final LongDeserializer longDeserializer = new LongDeserializer(); + final IntegerDeserializer intDeserializer = new IntegerDeserializer(); + + final StringSerializer stringSerializer = new StringSerializer(); + final LongSerializer longSerializer = new LongSerializer(); + final IntegerSerializer intSerializer = new IntegerSerializer(); + public static EmulatorArchive create() { return new EmulatorArchive(); } + public void setExcludeTopics(List excludeTopics) { + this.excludeTopics = excludeTopics; + } + + public void setIncludeTopics(List includeTopics) { + this.includeTopics = includeTopics; + } + + public static EmulatorArchive with(FieldFormat keyFormat, FieldFormat valueFormat) { + final var emulatorArchive = new EmulatorArchive(); + emulatorArchive.keyFormat = keyFormat; + emulatorArchive.valueFormat = valueFormat; + return emulatorArchive; + } + + public void append( + String topic, + int partition, + long offset, + long timestamp, + long afterMs, + FieldFormat keyFormat, + FieldFormat valueFormat, + byte[] keyBytes, + byte[] valueBytes, + String keyString, + String valueString, + int keyInt, + int valueInt, + long keyLong, + long valueLong + ) { + final var key = + switch (keyFormat) { + case BYTES -> keyBytes; + case INTEGER -> intSerializer.serialize("", keyInt); + case LONG -> longSerializer.serialize("", keyLong); + case STRING -> stringSerializer.serialize("", keyString); + }; + final var value = + switch (valueFormat) { + case BYTES -> valueBytes; + case INTEGER -> intSerializer.serialize("", valueInt); + case LONG -> longSerializer.serialize("", valueLong); + case STRING -> stringSerializer.serialize("", valueString); + }; + + var emuRecord = new EmulatorRecord( + topic, + partition, + offset, + timestamp, + afterMs, + keyFormat, + key, + valueFormat, + value + ); + + append(new TopicPartition(topic, partition), emuRecord); + } + + public void append( + TopicPartition topicPartition, + ConsumerRecord record, + long afterMs + ) { + var emuRecord = new EmulatorRecord( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + afterMs, + keyFormat, + record.key(), + valueFormat, + record.value() + ); + append(topicPartition, emuRecord); + } + public void append(TopicPartition topicPartition, EmulatorRecord record) { records.computeIfPresent( topicPartition, @@ -55,14 +153,6 @@ public void append(TopicPartition topicPartition, EmulatorRecord record) { ); } - public void setExcludeTopics(List excludeTopics) { - this.excludeTopics = excludeTopics; - } - - public void setIncludeTopics(List includeTopics) { - this.includeTopics = includeTopics; - } - public Set topicPartitions() { return records .keySet() @@ -102,6 +192,30 @@ public Long oldestTimestamps(TopicPartition tp) { return oldestTimestamps.get(tp); } + public int keyAsInt(EmulatorRecord r) { + return intDeserializer.deserialize(r.topic(), r.key()); + } + + public long keyAsLong(EmulatorRecord r) { + return longDeserializer.deserialize(r.topic(), r.key()); + } + + public String keyAsString(EmulatorRecord r) { + return stringDeserializer.deserialize(r.topic(), r.key()); + } + + public int valueAsInt(EmulatorRecord r) { + return intDeserializer.deserialize(r.topic(), r.value()); + } + + public long valueAsLong(EmulatorRecord r) { + return longDeserializer.deserialize(r.topic(), r.value()); + } + + public String valueAsString(EmulatorRecord r) { + return stringDeserializer.deserialize(r.topic(), r.value()); + } + record EmulatorRecord( String topic, int partition, diff --git a/emulator/src/main/java/kafka/cli/emulator/KafkaRecorder.java b/emulator/src/main/java/kafka/cli/emulator/KafkaRecorder.java index d28cb18..b42e202 100644 --- a/emulator/src/main/java/kafka/cli/emulator/KafkaRecorder.java +++ b/emulator/src/main/java/kafka/cli/emulator/KafkaRecorder.java @@ -47,7 +47,7 @@ public EmulatorArchive record( // set offsets from // consumer loop var latestTimestamps = new HashMap(); - final var archive = EmulatorArchive.create(); + final var archive = EmulatorArchive.with(keyFormat, valueFormat); var listTopics = consumer.listTopics(); var topicPartitions = new ArrayList(); for (var topic : topics) { @@ -98,14 +98,8 @@ public EmulatorArchive record( } else { afterMs = currentTimestamp - latestTimestamp; } - var zipRecord = EmulatorArchive.EmulatorRecord.from( - record, - keyFormat, - valueFormat, - afterMs - ); // append to topic-partition file - archive.append(partition, zipRecord); + archive.append(partition, record, afterMs); latestTimestamps.put(partition, currentTimestamp); if (isDone(partition, archive, endAt)) { done.put(partition, true); @@ -201,7 +195,7 @@ public static RecordEndAt of(Map offsets) { } } - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) throws IOException { var emulator = new KafkaRecorder(); var context = KafkaContexts.load().get("local"); var props = context.properties(); @@ -213,20 +207,5 @@ public static void main(String[] args) throws IOException, InterruptedException RecordStartFrom.of(), RecordEndAt.of(10) ); - // var archiveLoader = new ArchiveStore.SqliteArchiveLoader(Path.of("test.db")); - // emulator.replay(props, archiveLoader.load(), Map.of("t5", "t14"), false, true); - // Check Thread handling by parallel stream - // final var map = Map.of("s1", "a", "s2", "b", "s3", "c"); - // map.keySet().parallelStream() - // .forEach( - // k -> { - // System.out.println(Thread.currentThread().getName()); - // try { - // Thread.sleep(1000); - // } catch (InterruptedException e) { - // throw new RuntimeException(e); - // } - // System.out.println(map.get(k)); - // }); } } diff --git a/emulator/src/main/java/kafka/cli/emulator/KafkaReplayer.java b/emulator/src/main/java/kafka/cli/emulator/KafkaReplayer.java index 524bf42..91a70b2 100644 --- a/emulator/src/main/java/kafka/cli/emulator/KafkaReplayer.java +++ b/emulator/src/main/java/kafka/cli/emulator/KafkaReplayer.java @@ -126,28 +126,7 @@ public static void main(String[] args) throws IOException, InterruptedException var emulator = new KafkaReplayer(); var context = KafkaContexts.load().get("local"); var props = context.properties(); - // emulator.record( - // props, - // List.of("t5"), - // EmulatorArchive.FieldFormat.STRING, - // EmulatorArchive.FieldFormat.STRING, - // RecordStartFrom.of(), - // RecordEndAt.of(10) - // ); emulator.replay(props, archiveLoader.load(), Map.of("t5", "t14"), false, true); - // Check Thread handling by parallel stream - // final var map = Map.of("s1", "a", "s2", "b", "s3", "c"); - // map.keySet().parallelStream() - // .forEach( - // k -> { - // System.out.println(Thread.currentThread().getName()); - // try { - // Thread.sleep(1000); - // } catch (InterruptedException e) { - // throw new RuntimeException(e); - // } - // System.out.println(map.get(k)); - // }); } } diff --git a/emulator/src/main/java/kafka/cli/emulator/SqliteStore.java b/emulator/src/main/java/kafka/cli/emulator/SqliteStore.java index e81c049..70f8a57 100644 --- a/emulator/src/main/java/kafka/cli/emulator/SqliteStore.java +++ b/emulator/src/main/java/kafka/cli/emulator/SqliteStore.java @@ -1,27 +1,12 @@ package kafka.cli.emulator; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Collection; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; public class SqliteStore { - final StringDeserializer stringDeserializer = new StringDeserializer(); - final LongDeserializer longDeserializer = new LongDeserializer(); - final IntegerDeserializer intDeserializer = new IntegerDeserializer(); - final StringSerializer stringSerializer = new StringSerializer(); - final LongSerializer longSerializer = new LongSerializer(); - final IntegerSerializer intSerializer = new IntegerSerializer(); - final Path archivePath; public SqliteStore(Path archivePath) { @@ -40,39 +25,30 @@ public EmulatorArchive load() { ORDER BY offset ASC""" ); while (rs.next()) { - final var tp = new TopicPartition(rs.getString("topic"), rs.getInt("partition")); final var keyFormat = EmulatorArchive.FieldFormat.valueOf( rs.getString("key_format") ); - final var key = - switch (keyFormat) { - case BYTES -> rs.getBytes("key_bytes"); - case INTEGER -> intSerializer.serialize("", rs.getInt("key_int")); - case LONG -> longSerializer.serialize("", rs.getLong("key_long")); - case STRING -> stringSerializer.serialize("", rs.getString("key_string")); - }; final var valueFormat = EmulatorArchive.FieldFormat.valueOf( rs.getString("value_format") ); - final var value = - switch (valueFormat) { - case BYTES -> rs.getBytes("value_bytes"); - case INTEGER -> intSerializer.serialize("", rs.getInt("value_int")); - case LONG -> longSerializer.serialize("", rs.getLong("value_long")); - case STRING -> stringSerializer.serialize("", rs.getString("value_string")); - }; - final var record = new EmulatorArchive.EmulatorRecord( - tp.topic(), - tp.partition(), + + archive.append( + rs.getString("topic"), + rs.getInt("partition"), rs.getLong("offset"), rs.getLong("timestamp"), rs.getLong("after_ms"), keyFormat, - key, valueFormat, - value + rs.getBytes("key_bytes"), + rs.getBytes("value_bytes"), + rs.getString("key_string"), + rs.getString("value_string"), + rs.getInt("key_int"), + rs.getInt("value_int"), + rs.getLong("key_long"), + rs.getLong("value_long") ); - archive.append(tp, record); } return archive; } catch (SQLException e) { @@ -165,23 +141,17 @@ INSERT INTO records_v1 ( if (r.key() != null) { switch (r.keyFormat()) { case BYTES -> ps.setBytes(8, r.key()); - case STRING -> ps.setString( - 10, - stringDeserializer.deserialize("", r.key()) - ); - case INTEGER -> ps.setInt(12, intDeserializer.deserialize("", r.key())); - case LONG -> ps.setLong(14, longDeserializer.deserialize("", r.key())); + case STRING -> ps.setString(10, archive.keyAsString(r)); + case INTEGER -> ps.setInt(12, archive.keyAsInt(r)); + case LONG -> ps.setLong(14, archive.keyAsLong(r)); } } if (r.value() != null) { switch (r.valueFormat()) { case BYTES -> ps.setBytes(9, r.value()); - case STRING -> ps.setString( - 11, - stringDeserializer.deserialize("", r.value()) - ); - case INTEGER -> ps.setInt(13, intDeserializer.deserialize("", r.value())); - case LONG -> ps.setLong(15, longDeserializer.deserialize("", r.value())); + case STRING -> ps.setString(11, archive.valueAsString(r)); + case INTEGER -> ps.setInt(13, archive.valueAsInt(r)); + case LONG -> ps.setLong(15, archive.valueAsLong(r)); } } ps.addBatch(); @@ -194,28 +164,4 @@ INSERT INTO records_v1 ( throw new RuntimeException(e); } } - - public static void main(String[] args) { - var loader = new SqliteStore(Path.of("test.db")); - var archive = EmulatorArchive.create(); - archive.append( - new TopicPartition("t1", 0), - new EmulatorArchive.EmulatorRecord( - "t1", - 0, - 0L, - System.currentTimeMillis(), - 100L, - EmulatorArchive.FieldFormat.BYTES, - "s".getBytes(StandardCharsets.UTF_8), - EmulatorArchive.FieldFormat.BYTES, - "v".getBytes(StandardCharsets.UTF_8) - ) - ); - loader.save(archive); - - var archive2 = loader.load(); - - System.out.println(archive2.equals(archive)); - } }