Skip to content

Commit

Permalink
refactor(emulator): move serde to archive
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 21, 2022
1 parent 88be65d commit 520504d
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 125 deletions.
130 changes: 122 additions & 8 deletions emulator/src/main/java/kafka/cli/emulator/EmulatorArchive.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class EmulatorArchive {

Expand All @@ -19,10 +25,102 @@ public class EmulatorArchive {
List<String> includeTopics = new ArrayList<>();
List<String> excludeTopics = new ArrayList<>();

FieldFormat keyFormat;
FieldFormat valueFormat;

final StringDeserializer stringDeserializer = new StringDeserializer();
final LongDeserializer longDeserializer = new LongDeserializer();
final IntegerDeserializer intDeserializer = new IntegerDeserializer();

final StringSerializer stringSerializer = new StringSerializer();
final LongSerializer longSerializer = new LongSerializer();
final IntegerSerializer intSerializer = new IntegerSerializer();

public static EmulatorArchive create() {
return new EmulatorArchive();
}

public void setExcludeTopics(List<String> excludeTopics) {
this.excludeTopics = excludeTopics;
}

public void setIncludeTopics(List<String> includeTopics) {
this.includeTopics = includeTopics;
}

public static EmulatorArchive with(FieldFormat keyFormat, FieldFormat valueFormat) {
final var emulatorArchive = new EmulatorArchive();
emulatorArchive.keyFormat = keyFormat;
emulatorArchive.valueFormat = valueFormat;
return emulatorArchive;
}

public void append(
String topic,
int partition,
long offset,
long timestamp,
long afterMs,
FieldFormat keyFormat,
FieldFormat valueFormat,
byte[] keyBytes,
byte[] valueBytes,
String keyString,
String valueString,
int keyInt,
int valueInt,
long keyLong,
long valueLong
) {
final var key =
switch (keyFormat) {
case BYTES -> keyBytes;
case INTEGER -> intSerializer.serialize("", keyInt);
case LONG -> longSerializer.serialize("", keyLong);
case STRING -> stringSerializer.serialize("", keyString);
};
final var value =
switch (valueFormat) {
case BYTES -> valueBytes;
case INTEGER -> intSerializer.serialize("", valueInt);
case LONG -> longSerializer.serialize("", valueLong);
case STRING -> stringSerializer.serialize("", valueString);
};

var emuRecord = new EmulatorRecord(
topic,
partition,
offset,
timestamp,
afterMs,
keyFormat,
key,
valueFormat,
value
);

append(new TopicPartition(topic, partition), emuRecord);
}

public void append(
TopicPartition topicPartition,
ConsumerRecord<byte[], byte[]> record,
long afterMs
) {
var emuRecord = new EmulatorRecord(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
afterMs,
keyFormat,
record.key(),
valueFormat,
record.value()
);
append(topicPartition, emuRecord);
}

public void append(TopicPartition topicPartition, EmulatorRecord record) {
records.computeIfPresent(
topicPartition,
Expand Down Expand Up @@ -55,14 +153,6 @@ public void append(TopicPartition topicPartition, EmulatorRecord record) {
);
}

public void setExcludeTopics(List<String> excludeTopics) {
this.excludeTopics = excludeTopics;
}

public void setIncludeTopics(List<String> includeTopics) {
this.includeTopics = includeTopics;
}

public Set<TopicPartition> topicPartitions() {
return records
.keySet()
Expand Down Expand Up @@ -102,6 +192,30 @@ public Long oldestTimestamps(TopicPartition tp) {
return oldestTimestamps.get(tp);
}

public int keyAsInt(EmulatorRecord r) {
return intDeserializer.deserialize(r.topic(), r.key());
}

public long keyAsLong(EmulatorRecord r) {
return longDeserializer.deserialize(r.topic(), r.key());
}

public String keyAsString(EmulatorRecord r) {
return stringDeserializer.deserialize(r.topic(), r.key());
}

public int valueAsInt(EmulatorRecord r) {
return intDeserializer.deserialize(r.topic(), r.value());
}

public long valueAsLong(EmulatorRecord r) {
return longDeserializer.deserialize(r.topic(), r.value());
}

public String valueAsString(EmulatorRecord r) {
return stringDeserializer.deserialize(r.topic(), r.value());
}

record EmulatorRecord(
String topic,
int partition,
Expand Down
27 changes: 3 additions & 24 deletions emulator/src/main/java/kafka/cli/emulator/KafkaRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public EmulatorArchive record(
// set offsets from
// consumer loop
var latestTimestamps = new HashMap<TopicPartition, Long>();
final var archive = EmulatorArchive.create();
final var archive = EmulatorArchive.with(keyFormat, valueFormat);
var listTopics = consumer.listTopics();
var topicPartitions = new ArrayList<TopicPartition>();
for (var topic : topics) {
Expand Down Expand Up @@ -98,14 +98,8 @@ public EmulatorArchive record(
} else {
afterMs = currentTimestamp - latestTimestamp;
}
var zipRecord = EmulatorArchive.EmulatorRecord.from(
record,
keyFormat,
valueFormat,
afterMs
);
// append to topic-partition file
archive.append(partition, zipRecord);
archive.append(partition, record, afterMs);
latestTimestamps.put(partition, currentTimestamp);
if (isDone(partition, archive, endAt)) {
done.put(partition, true);
Expand Down Expand Up @@ -201,7 +195,7 @@ public static RecordEndAt of(Map<TopicPartition, Long> offsets) {
}
}

public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws IOException {
var emulator = new KafkaRecorder();
var context = KafkaContexts.load().get("local");
var props = context.properties();
Expand All @@ -213,20 +207,5 @@ public static void main(String[] args) throws IOException, InterruptedException
RecordStartFrom.of(),
RecordEndAt.of(10)
);
// var archiveLoader = new ArchiveStore.SqliteArchiveLoader(Path.of("test.db"));
// emulator.replay(props, archiveLoader.load(), Map.of("t5", "t14"), false, true);
// Check Thread handling by parallel stream
// final var map = Map.of("s1", "a", "s2", "b", "s3", "c");
// map.keySet().parallelStream()
// .forEach(
// k -> {
// System.out.println(Thread.currentThread().getName());
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// System.out.println(map.get(k));
// });
}
}
21 changes: 0 additions & 21 deletions emulator/src/main/java/kafka/cli/emulator/KafkaReplayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,7 @@ public static void main(String[] args) throws IOException, InterruptedException
var emulator = new KafkaReplayer();
var context = KafkaContexts.load().get("local");
var props = context.properties();
// emulator.record(
// props,
// List.of("t5"),
// EmulatorArchive.FieldFormat.STRING,
// EmulatorArchive.FieldFormat.STRING,
// RecordStartFrom.of(),
// RecordEndAt.of(10)
// );

emulator.replay(props, archiveLoader.load(), Map.of("t5", "t14"), false, true);
// Check Thread handling by parallel stream
// final var map = Map.of("s1", "a", "s2", "b", "s3", "c");
// map.keySet().parallelStream()
// .forEach(
// k -> {
// System.out.println(Thread.currentThread().getName());
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// System.out.println(map.get(k));
// });
}
}
90 changes: 18 additions & 72 deletions emulator/src/main/java/kafka/cli/emulator/SqliteStore.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
package kafka.cli.emulator;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SqliteStore {

final StringDeserializer stringDeserializer = new StringDeserializer();
final LongDeserializer longDeserializer = new LongDeserializer();
final IntegerDeserializer intDeserializer = new IntegerDeserializer();
final StringSerializer stringSerializer = new StringSerializer();
final LongSerializer longSerializer = new LongSerializer();
final IntegerSerializer intSerializer = new IntegerSerializer();

final Path archivePath;

public SqliteStore(Path archivePath) {
Expand All @@ -40,39 +25,30 @@ public EmulatorArchive load() {
ORDER BY offset ASC"""
);
while (rs.next()) {
final var tp = new TopicPartition(rs.getString("topic"), rs.getInt("partition"));
final var keyFormat = EmulatorArchive.FieldFormat.valueOf(
rs.getString("key_format")
);
final var key =
switch (keyFormat) {
case BYTES -> rs.getBytes("key_bytes");
case INTEGER -> intSerializer.serialize("", rs.getInt("key_int"));
case LONG -> longSerializer.serialize("", rs.getLong("key_long"));
case STRING -> stringSerializer.serialize("", rs.getString("key_string"));
};
final var valueFormat = EmulatorArchive.FieldFormat.valueOf(
rs.getString("value_format")
);
final var value =
switch (valueFormat) {
case BYTES -> rs.getBytes("value_bytes");
case INTEGER -> intSerializer.serialize("", rs.getInt("value_int"));
case LONG -> longSerializer.serialize("", rs.getLong("value_long"));
case STRING -> stringSerializer.serialize("", rs.getString("value_string"));
};
final var record = new EmulatorArchive.EmulatorRecord(
tp.topic(),
tp.partition(),

archive.append(
rs.getString("topic"),
rs.getInt("partition"),
rs.getLong("offset"),
rs.getLong("timestamp"),
rs.getLong("after_ms"),
keyFormat,
key,
valueFormat,
value
rs.getBytes("key_bytes"),
rs.getBytes("value_bytes"),
rs.getString("key_string"),
rs.getString("value_string"),
rs.getInt("key_int"),
rs.getInt("value_int"),
rs.getLong("key_long"),
rs.getLong("value_long")
);
archive.append(tp, record);
}
return archive;
} catch (SQLException e) {
Expand Down Expand Up @@ -165,23 +141,17 @@ INSERT INTO records_v1 (
if (r.key() != null) {
switch (r.keyFormat()) {
case BYTES -> ps.setBytes(8, r.key());
case STRING -> ps.setString(
10,
stringDeserializer.deserialize("", r.key())
);
case INTEGER -> ps.setInt(12, intDeserializer.deserialize("", r.key()));
case LONG -> ps.setLong(14, longDeserializer.deserialize("", r.key()));
case STRING -> ps.setString(10, archive.keyAsString(r));
case INTEGER -> ps.setInt(12, archive.keyAsInt(r));
case LONG -> ps.setLong(14, archive.keyAsLong(r));
}
}
if (r.value() != null) {
switch (r.valueFormat()) {
case BYTES -> ps.setBytes(9, r.value());
case STRING -> ps.setString(
11,
stringDeserializer.deserialize("", r.value())
);
case INTEGER -> ps.setInt(13, intDeserializer.deserialize("", r.value()));
case LONG -> ps.setLong(15, longDeserializer.deserialize("", r.value()));
case STRING -> ps.setString(11, archive.valueAsString(r));
case INTEGER -> ps.setInt(13, archive.valueAsInt(r));
case LONG -> ps.setLong(15, archive.valueAsLong(r));
}
}
ps.addBatch();
Expand All @@ -194,28 +164,4 @@ INSERT INTO records_v1 (
throw new RuntimeException(e);
}
}

public static void main(String[] args) {
var loader = new SqliteStore(Path.of("test.db"));
var archive = EmulatorArchive.create();
archive.append(
new TopicPartition("t1", 0),
new EmulatorArchive.EmulatorRecord(
"t1",
0,
0L,
System.currentTimeMillis(),
100L,
EmulatorArchive.FieldFormat.BYTES,
"s".getBytes(StandardCharsets.UTF_8),
EmulatorArchive.FieldFormat.BYTES,
"v".getBytes(StandardCharsets.UTF_8)
)
);
loader.save(archive);

var archive2 = loader.load();

System.out.println(archive2.equals(archive));
}
}

0 comments on commit 520504d

Please sign in to comment.