diff --git a/.gitignore b/.gitignore index 9f97022..6c11d4f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -target/ \ No newline at end of file +target/ +*.db \ No newline at end of file diff --git a/cluster-state/pom.xml b/cluster-state/pom.xml index 8ec590a..41fd829 100644 --- a/cluster-state/pom.xml +++ b/cluster-state/pom.xml @@ -35,7 +35,6 @@ io.github.jeqo.kafka kafka-context - 0.2.0 diff --git a/cluster-state/src/main/java/kafka/cli/cluster/state/Cli.java b/cluster-state/src/main/java/kafka/cli/cluster/state/Cli.java index c074481..a7daac2 100644 --- a/cluster-state/src/main/java/kafka/cli/cluster/state/Cli.java +++ b/cluster-state/src/main/java/kafka/cli/cluster/state/Cli.java @@ -15,7 +15,7 @@ import java.util.stream.Collectors; import kafka.cli.cluster.state.Cli.VersionProviderWithConfigProvider; import kafka.context.KafkaContexts; -import kafka.context.SchemaRegistryContexts; +import kafka.context.sr.SchemaRegistryContexts; import org.apache.kafka.clients.admin.AdminClient; import picocli.CommandLine; import picocli.CommandLine.ArgGroup; @@ -24,15 +24,15 @@ import picocli.CommandLine.Option; @Command( - name = "kfk-cluster-state", - descriptionHeading = "Kafka CLI - Topic list", - description = - """ + name = "kfk-cluster-state", + descriptionHeading = "Kafka CLI - Topic list", + description = """ List Kafka topics with metadata, partitions, replica placement, configuration, and offsets at once. """, - versionProvider = VersionProviderWithConfigProvider.class, - mixinStandardHelpOptions = true) + versionProvider = VersionProviderWithConfigProvider.class, + mixinStandardHelpOptions = true +) public class Cli implements Callable { public static void main(String[] args) { @@ -40,23 +40,20 @@ public static void main(String[] args) { System.exit(exitCode); } - @Option( - names = {"-t", "--topics"}, - description = "list of topic names to include") + @Option(names = { "-t", "--topics" }, description = "list of topic names to include") List topics = new ArrayList<>(); - @Option( - names = {"-p", "--prefix"}, - description = "Topic name prefix") + @Option(names = { "-p", "--prefix" }, description = "Topic name prefix") Optional prefix = Optional.empty(); @ArgGroup(multiplicity = "1") PropertiesOption propertiesOption; @Option( - names = {"--pretty"}, - defaultValue = "false", - description = "Print pretty/formatted JSON") + names = { "--pretty" }, + defaultValue = "false", + description = "Print pretty/formatted JSON" + ) boolean pretty; @Override @@ -68,12 +65,14 @@ public Integer call() throws Exception { try (var adminClient = AdminClient.create(clientConfig)) { if (sr) { - var srClient = - new CachedSchemaRegistryClient( - clientConfig.getProperty("schema.registry.url"), - 10_000, - clientConfig.keySet().stream() - .collect(Collectors.toMap(Object::toString, clientConfig::get))); + var srClient = new CachedSchemaRegistryClient( + clientConfig.getProperty("schema.registry.url"), + 10_000, + clientConfig + .keySet() + .stream() + .collect(Collectors.toMap(Object::toString, clientConfig::get)) + ); final var helper = new Helper(adminClient, srClient); final var output = helper.run(opts); out.println(output.toJson(pretty)); @@ -95,8 +94,10 @@ public boolean match(String name) { static class PropertiesOption { @CommandLine.Option( - names = {"-c", "--config"}, - description = "Client configuration properties file." + "Must include connection to Kafka") + names = { "-c", "--config" }, + description = "Client configuration properties file." + + "Must include connection to Kafka" + ) Optional configPath; @ArgGroup(exclusive = false) @@ -104,25 +105,24 @@ static class PropertiesOption { public Properties load() { return configPath - .map( - path -> { - try { - final var p = new Properties(); - p.load(Files.newInputStream(path)); - return p; - } catch (Exception e) { - throw new IllegalArgumentException( - "ERROR: properties file at %s is failing to load".formatted(path)); - } - }) - .orElseGet( - () -> { - try { - return contextOption.load(); - } catch (IOException e) { - throw new IllegalArgumentException("ERROR: loading contexts"); - } - }); + .map(path -> { + try { + final var p = new Properties(); + p.load(Files.newInputStream(path)); + return p; + } catch (Exception e) { + throw new IllegalArgumentException( + "ERROR: properties file at %s is failing to load".formatted(path) + ); + } + }) + .orElseGet(() -> { + try { + return contextOption.load(); + } catch (IOException e) { + throw new IllegalArgumentException("ERROR: loading contexts"); + } + }); } } @@ -151,15 +151,18 @@ public Properties load() throws IOException { props.putAll(srProps); } else { err.printf( - "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", srName); + "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", + srName + ); } } return props; } else { err.printf( - "ERROR: Kafka context `%s` not found. Check that context already exist.%n", - kafkaContextName); + "ERROR: Kafka context `%s` not found. Check that context already exist.%n", + kafkaContextName + ); return null; } } @@ -170,16 +173,18 @@ static class VersionProviderWithConfigProvider implements IVersionProvider { @Override public String[] getVersion() throws IOException { final var url = - VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties"); + VersionProviderWithConfigProvider.class.getClassLoader() + .getResource("cli.properties"); if (url == null) { - return new String[] { - "No cli.properties file found in the classpath.", - }; + return new String[] { "No cli.properties file found in the classpath." }; } final var properties = new Properties(); properties.load(url.openStream()); return new String[] { - properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "", + properties.getProperty("appName") + + " version " + + properties.getProperty("appVersion") + + "", "Built: " + properties.getProperty("appBuildTime"), }; } diff --git a/cluster-state/src/main/java/kafka/cli/cluster/state/Helper.java b/cluster-state/src/main/java/kafka/cli/cluster/state/Helper.java index 427a653..01787f4 100644 --- a/cluster-state/src/main/java/kafka/cli/cluster/state/Helper.java +++ b/cluster-state/src/main/java/kafka/cli/cluster/state/Helper.java @@ -45,18 +45,22 @@ Output run(Opts opts) throws ExecutionException, InterruptedException { final var builder = Output.newBuilder(topics); final var describeClusterResult = adminClient.describeCluster(); - final var descriptions = adminClient.describeTopics(builder.names()).allTopicNames().get(); + final var descriptions = adminClient + .describeTopics(builder.names()) + .allTopicNames() + .get(); final var startOffsetRequest = new HashMap(); final var endOffsetRequest = new HashMap(); for (final var topic : builder.names) { final var description = descriptions.get(topic); - final var tps = - description.partitions().stream() - .map(tpi -> new TopicPartition(topic, tpi.partition())) - .sorted(Comparator.comparingInt(TopicPartition::partition)) - .toList(); + final var tps = description + .partitions() + .stream() + .map(tpi -> new TopicPartition(topic, tpi.partition())) + .sorted(Comparator.comparingInt(TopicPartition::partition)) + .toList(); for (final var tp : tps) { startOffsetRequest.put(tp, OffsetSpec.earliest()); endOffsetRequest.put(tp, OffsetSpec.latest()); @@ -66,51 +70,51 @@ Output run(Opts opts) throws ExecutionException, InterruptedException { final var startOffsets = adminClient.listOffsets(startOffsetRequest).all().get(); final var endOffsets = adminClient.listOffsets(endOffsetRequest).all().get(); - final var configs = adminClient.describeConfigs(builder.configResources()).all().get(); - - final var srSubjects = - srClient.map( - sr -> { + final var configs = adminClient + .describeConfigs(builder.configResources()) + .all() + .get(); + + final var srSubjects = srClient.map(sr -> { + try { + return opts + .prefix() + .map(p -> { + try { + return sr.getAllSubjectsByPrefix(p); + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + }) + .orElse(sr.getAllSubjects()); + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + }); + + final var srSubjectsMetadata = srClient.map(sr -> + srSubjects + .map(subjects -> + subjects + .stream() + .map(s -> { try { - return opts.prefix() - .map( - p -> { - try { - return sr.getAllSubjectsByPrefix(p); - } catch (IOException | RestClientException e) { - throw new RuntimeException(e); - } - }) - .orElse(sr.getAllSubjects()); + return Map.entry(s, sr.getLatestSchemaMetadata(s)); } catch (IOException | RestClientException e) { throw new RuntimeException(e); } - }); - - final var srSubjectsMetadata = - srClient.map( - sr -> - srSubjects - .map( - subjects -> - subjects.stream() - .map( - s -> { - try { - return Map.entry(s, sr.getLatestSchemaMetadata(s)); - } catch (IOException | RestClientException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - .orElse(Map.of())); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ) + .orElse(Map.of()) + ); builder - .withClusterId(describeClusterResult.clusterId().get()) - .withBrokers(describeClusterResult.nodes().get()) - .withTopicDescriptions(descriptions) - .withStartOffsets(startOffsets) - .withEndOffsets(endOffsets) - .withConfigs(configs); + .withClusterId(describeClusterResult.clusterId().get()) + .withBrokers(describeClusterResult.nodes().get()) + .withTopicDescriptions(descriptions) + .withStartOffsets(startOffsets) + .withEndOffsets(endOffsets) + .withConfigs(configs); srSubjectsMetadata.ifPresent(builder::withSchemaRegistrySubjects); diff --git a/cluster-state/src/main/java/kafka/cli/cluster/state/Output.java b/cluster-state/src/main/java/kafka/cli/cluster/state/Output.java index f2dbbe7..4284e41 100644 --- a/cluster-state/src/main/java/kafka/cli/cluster/state/Output.java +++ b/cluster-state/src/main/java/kafka/cli/cluster/state/Output.java @@ -22,9 +22,13 @@ import org.apache.kafka.common.config.ConfigResource; public record Output( - KafkaCluster kafkaCluster, Map topics, SchemaRegistry schemaRegistry) { - static ObjectMapper json = - new ObjectMapper().registerModule(new Jdk8Module()).registerModule(new JavaTimeModule()); + KafkaCluster kafkaCluster, + Map topics, + SchemaRegistry schemaRegistry +) { + static ObjectMapper json = new ObjectMapper() + .registerModule(new Jdk8Module()) + .registerModule(new JavaTimeModule()); public static Builder newBuilder(List topicNames) { return new Builder(topicNames); @@ -73,37 +77,41 @@ List names() { } List configResources() { - return names.stream().map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t)).toList(); + return names + .stream() + .map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t)) + .toList(); } Output build() { final var topics = new HashMap(); for (final var name : names) { final var description = descriptions.get(name); - var partitions = - description.partitions().stream() - .map( - tpi -> { - final var tp = new TopicPartition(name, tpi.partition()); - return Partition.from(tpi, startOffsets.get(tp), endOffsets.get(tp)); - }) - .toList(); + var partitions = description + .partitions() + .stream() + .map(tpi -> { + final var tp = new TopicPartition(name, tpi.partition()); + return Partition.from(tpi, startOffsets.get(tp), endOffsets.get(tp)); + }) + .toList(); final var config = topicConfigs.get(name); - final var topic = - new Topic( - name, - description.topicId().toString(), - partitions.size(), - partitions.get(0).replicas().size(), - description.isInternal(), - partitions, - config); + final var topic = new Topic( + name, + description.topicId().toString(), + partitions.size(), + partitions.get(0).replicas().size(), + description.isInternal(), + partitions, + config + ); topics.put(name, topic); } return new Output( - new KafkaCluster(clusterId, brokers.stream().map(Node::from).toList()), - topics, - srSubjects == null ? null : new SchemaRegistry(srSubjects)); + new KafkaCluster(clusterId, brokers.stream().map(Node::from).toList()), + topics, + srSubjects == null ? null : new SchemaRegistry(srSubjects) + ); } public Builder withClusterId(String id) { @@ -116,7 +124,9 @@ public Builder withBrokers(Collection nodes) { return this; } - public Builder withConfigs(Map configs) { + public Builder withConfigs( + Map configs + ) { final var map = new HashMap(configs.size()); for (var configResource : configResources()) { var config = configs.get(configResource); @@ -126,7 +136,9 @@ public Builder withConfigs(Map startOffsets) { + public Builder withStartOffsets( + Map startOffsets + ) { this.startOffsets = startOffsets; return this; } @@ -143,15 +155,14 @@ public Builder withTopicDescriptions(Map descriptions) public Builder withSchemaRegistrySubjects(Map srm) { this.srSubjects = new HashMap<>(srm.size()); - srm.forEach( - (subject, schemaMetadata) -> { - Subject s = - new Subject( - schemaMetadata.getId(), - schemaMetadata.getSchemaType(), - schemaMetadata.getVersion()); - srSubjects.put(subject, s); - }); + srm.forEach((subject, schemaMetadata) -> { + Subject s = new Subject( + schemaMetadata.getId(), + schemaMetadata.getSchemaType(), + schemaMetadata.getVersion() + ); + srSubjects.put(subject, s); + }); return this; } } @@ -187,20 +198,22 @@ public JsonNode jsonNode() { } public record Topic( - String name, - String id, - int partitionCount, - int replicationFactor, - boolean isInternal, - List partitions, - Config config) { + String name, + String id, + int partitionCount, + int replicationFactor, + boolean isInternal, + List partitions, + Config config + ) { public JsonNode jsonNode() { var node = json.createObjectNode(); - node.put("name", name) - .put("id", id) - .put("isInternal", isInternal) - .put("partitionCount", partitionCount) - .put("replicationFactor", replicationFactor); + node + .put("name", name) + .put("id", id) + .put("isInternal", isInternal) + .put("partitionCount", partitionCount) + .put("replicationFactor", replicationFactor); var ps = node.putArray("partitions"); partitions.forEach(p -> ps.add(p.jsonNode())); node.set("config", config.jsonNode()); @@ -209,23 +222,26 @@ public JsonNode jsonNode() { } public record Partition( - int id, - Integer leader, - List replicas, - List isr, - Offset startOffset, - Offset endOffset) { + int id, + Integer leader, + List replicas, + List isr, + Offset startOffset, + Offset endOffset + ) { public static Partition from( - TopicPartitionInfo topicPartitionInfo, - ListOffsetsResultInfo startOffset, - ListOffsetsResultInfo endOffset) { + TopicPartitionInfo topicPartitionInfo, + ListOffsetsResultInfo startOffset, + ListOffsetsResultInfo endOffset + ) { return new Partition( - topicPartitionInfo.partition(), - topicPartitionInfo.leader().id(), - topicPartitionInfo.replicas().stream().map(Node::from).map(Node::id).toList(), - topicPartitionInfo.isr().stream().map(Node::from).map(Node::id).toList(), - Offset.from(startOffset), - Offset.from(endOffset)); + topicPartitionInfo.partition(), + topicPartitionInfo.leader().id(), + topicPartitionInfo.replicas().stream().map(Node::from).map(Node::id).toList(), + topicPartitionInfo.isr().stream().map(Node::from).map(Node::id).toList(), + Offset.from(startOffset), + Offset.from(endOffset) + ); } public JsonNode jsonNode() { @@ -243,7 +259,11 @@ public JsonNode jsonNode() { public record Offset(long offset, long timestamp, Optional leaderEpoch) { static Offset from(ListOffsetsResultInfo resultInfo) { - return new Offset(resultInfo.offset(), resultInfo.timestamp(), resultInfo.leaderEpoch()); + return new Offset( + resultInfo.offset(), + resultInfo.timestamp(), + resultInfo.leaderEpoch() + ); } public JsonNode jsonNode() { @@ -258,10 +278,11 @@ public JsonNode jsonNode() { public record Node(int id, String host, int port, Optional rack) { public static Node from(org.apache.kafka.common.Node node) { return new Node( - node.id(), - node.host(), - node.port(), - node.hasRack() ? Optional.of(node.rack()) : Optional.empty()); + node.id(), + node.host(), + node.port(), + node.hasRack() ? Optional.of(node.rack()) : Optional.empty() + ); } public JsonNode jsonNode() { @@ -275,7 +296,11 @@ public JsonNode jsonNode() { public record Config(Map entries) { public static Config from(org.apache.kafka.clients.admin.Config config) { return new Config( - config.entries().stream().collect(Collectors.toMap(ConfigEntry::name, Entry::from))); + config + .entries() + .stream() + .collect(Collectors.toMap(ConfigEntry::name, Entry::from)) + ); } public JsonNode jsonNode() { @@ -285,33 +310,38 @@ public JsonNode jsonNode() { } public record Entry( - String name, - String value, - boolean isReadOnly, - boolean isSensitive, - boolean isDefault, - String documentation, - Map synonyms) { + String name, + String value, + boolean isReadOnly, + boolean isSensitive, + boolean isDefault, + String documentation, + Map synonyms + ) { public static Entry from(ConfigEntry e) { return new Entry( - e.name(), - e.value(), - e.isReadOnly(), - e.isSensitive(), - e.isDefault(), - e.documentation(), - e.synonyms().stream() - .collect(Collectors.toMap(ConfigSynonym::name, ConfigSynonym::value))); + e.name(), + e.value(), + e.isReadOnly(), + e.isSensitive(), + e.isDefault(), + e.documentation(), + e + .synonyms() + .stream() + .collect(Collectors.toMap(ConfigSynonym::name, ConfigSynonym::value)) + ); } public JsonNode jsonNode() { var node = json.createObjectNode(); - node.put("name", name) - .put("value", isSensitive ? "*****" : value) - .put("isReadOnly", isReadOnly) - .put("isSensitive", isSensitive) - .put("isDefault", isDefault) - .put("documentation", documentation); + node + .put("name", name) + .put("value", isSensitive ? "*****" : value) + .put("isReadOnly", isReadOnly) + .put("isSensitive", isSensitive) + .put("isDefault", isDefault) + .put("documentation", documentation); var ss = node.putArray("synonyms"); synonyms.forEach((k, v) -> ss.add(json.createObjectNode().put(k, v))); return node; diff --git a/context/pom.xml b/context/pom.xml index 3863d4c..ae5e5b3 100644 --- a/context/pom.xml +++ b/context/pom.xml @@ -31,7 +31,6 @@ io.github.jeqo.kafka kafka-context - 0.2.0 diff --git a/context/src/main/java/kafka/cli/context/Cli.java b/context/src/main/java/kafka/cli/context/Cli.java index cc2c5a3..0dd8ca7 100644 --- a/context/src/main/java/kafka/cli/context/Cli.java +++ b/context/src/main/java/kafka/cli/context/Cli.java @@ -21,13 +21,18 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; +import kafka.context.KafkaCluster; +import kafka.context.KafkaContext; import kafka.context.KafkaContexts; -import kafka.context.KafkaContexts.KafkaContext; -import kafka.context.SchemaRegistryContexts; -import kafka.context.SchemaRegistryContexts.SchemaRegistryAuth; -import kafka.context.SchemaRegistryContexts.SchemaRegistryCluster; -import kafka.context.SchemaRegistryContexts.SchemaRegistryContext; -import kafka.context.SchemaRegistryContexts.UsernamePasswordAuth; +import kafka.context.auth.KafkaAuth; +import kafka.context.auth.KafkaNoAuth; +import kafka.context.auth.KafkaUsernamePasswordAuth; +import kafka.context.sr.SchemaRegistryCluster; +import kafka.context.sr.SchemaRegistryContext; +import kafka.context.sr.SchemaRegistryContexts; +import kafka.context.sr.auth.HttpNoAuth; +import kafka.context.sr.auth.HttpUsernamePasswordAuth; +import kafka.context.sr.auth.SchemaRegistryAuth; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeClusterOptions; @@ -36,24 +41,25 @@ import picocli.CommandLine.Option; @CommandLine.Command( - name = KFK_CTX_CMD, - versionProvider = Cli.VersionProviderWithConfigProvider.class, - mixinStandardHelpOptions = true, - subcommands = { - CreateCommand.class, - DeleteCommand.class, - TestCommand.class, - PropertiesCommand.class, - KCatCommand.class, - EnvCommand.class, - SchemaRegistryContextsCommand.class, - }, - description = "Manage Kafka connection as contexts.") + name = KFK_CTX_CMD, + versionProvider = Cli.VersionProviderWithConfigProvider.class, + mixinStandardHelpOptions = true, + subcommands = { + CreateCommand.class, + DeleteCommand.class, + TestCommand.class, + PropertiesCommand.class, + KCatCommand.class, + EnvCommand.class, + SchemaRegistryContextsCommand.class, + }, + description = "Manage Kafka connection as contexts." +) public class Cli implements Callable { public static final String KFK_CTX_CMD = "kfk-ctx"; - @Option(names = {"-v", "--verbose"}) + @Option(names = { "-v", "--verbose" }) boolean verbose; public static void main(String[] args) { @@ -73,23 +79,27 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "create", - description = "Register a Kafka context. Destination: ~/.kafka/kafka.json") + name = "create", + description = "Register a Kafka context. Destination: ~/.kafka/kafka.json" + ) static class CreateCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Kafka context name. e.g. `local`") String name; - @CommandLine.Parameters(index = "1", description = "Bootstrap servers. e.g. `localhost:9092`") + @CommandLine.Parameters( + index = "1", + description = "Bootstrap servers. e.g. `localhost:9092`" + ) String bootstrapServers; @CommandLine.Option( - names = "--auth", - description = - "Authentication method (default: ${DEFAULT-VALUE}). Valid values: ${COMPLETION-CANDIDATES}", - required = true, - defaultValue = "PLAINTEXT") - KafkaContexts.KafkaAuth.AuthType authType; + names = "--auth", + description = "Authentication method (default: ${DEFAULT-VALUE}). Valid values: ${COMPLETION-CANDIDATES}", + required = true, + defaultValue = "PLAINTEXT" + ) + KafkaAuth.AuthType authType; @ArgGroup(exclusive = false) UsernamePasswordOptions usernamePasswordOptions; @@ -99,21 +109,25 @@ public Integer call() throws Exception { final var contexts = KafkaContexts.load(); try { - final KafkaContexts.KafkaAuth auth = - switch (authType) { - case SASL_PLAIN -> KafkaContexts.UsernamePasswordAuth.build( - authType, usernamePasswordOptions.username, usernamePasswordOptions.password()); - default -> new KafkaContexts.NoAuth(); - }; - final var ctx = - new KafkaContext(name, new KafkaContexts.KafkaCluster(bootstrapServers, auth)); + final KafkaAuth auth = + switch (authType) { + case SASL_PLAIN -> KafkaUsernamePasswordAuth.build( + authType, + usernamePasswordOptions.username, + usernamePasswordOptions.password() + ); + default -> new KafkaNoAuth(); + }; + final var ctx = new KafkaContext(name, new KafkaCluster(bootstrapServers, auth)); contexts.add(ctx); KafkaContexts.save(contexts); out.printf( - "Kafka context `%s` with bootstrap-servers [%s] is saved.", - ctx.name(), ctx.cluster().bootstrapServers()); + "Kafka context `%s` with bootstrap-servers [%s] is saved.", + ctx.name(), + ctx.cluster().bootstrapServers() + ); return 0; } catch (IllegalArgumentException e) { err.println("ERROR: " + e.getMessage()); @@ -123,8 +137,9 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "delete", - description = "Removes context. Destination: ~/.kafka/kafka.json") + name = "delete", + description = "Removes context. Destination: ~/.kafka/kafka.json" + ) static class DeleteCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Kafka context name. e.g. `local`") @@ -140,8 +155,10 @@ public Integer call() throws Exception { KafkaContexts.save(contexts); out.printf( - "Kafka context `%s` with bootstrap servers: [%s] is deleted.%n", - ctx.name(), ctx.cluster().bootstrapServers()); + "Kafka context `%s` with bootstrap servers: [%s] is deleted.%n", + ctx.name(), + ctx.cluster().bootstrapServers() + ); return 0; } else { out.printf("Kafka context `%s` is not registered.%n", name); @@ -151,16 +168,18 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "properties", - description = "Get properties configurations for contexts") + name = "properties", + description = "Get properties configurations for contexts" + ) static class PropertiesCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Kafka context name") String name; @Option( - names = {"--schema-registry", "-sr"}, - description = "Schema Registry context name") + names = { "--schema-registry", "-sr" }, + description = "Schema Registry context name" + ) Optional schemeRegistryContext; @Override @@ -177,12 +196,16 @@ public Integer call() throws Exception { final var srCtx = srContexts.get(schemeRegistryContext.get()); final var srProps = srCtx.properties(); - srProps.store(out, "Schema Registry client properties generated by " + KFK_CTX_CMD); + srProps.store( + out, + "Schema Registry client properties generated by " + KFK_CTX_CMD + ); } else { System.err.printf( - "WARN: Schema Registry context %s does not exist. " - + "Schema Registry connection properties will not be included", - schemeRegistryContext.get()); + "WARN: Schema Registry context %s does not exist. " + + "Schema Registry connection properties will not be included", + schemeRegistryContext.get() + ); } } } else { @@ -200,8 +223,9 @@ static class TestCommand implements Callable { String name; @Option( - names = {"--schema-registry", "-sr"}, - description = "Schema Registry context name") + names = { "--schema-registry", "-sr" }, + description = "Schema Registry context name" + ) Optional schemeRegistryContext; @Override @@ -211,20 +235,25 @@ public Integer call() throws Exception { final var ctx = contexts.get(name); final var props = ctx.properties(); - final var bootstrapServers = props.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + final var bootstrapServers = props.get( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ); try (final var admin = AdminClient.create(props)) { - final var clusterId = - admin - .describeCluster(new DescribeClusterOptions().timeoutMs(10_000)) - .clusterId() - .get(); + final var clusterId = admin + .describeCluster(new DescribeClusterOptions().timeoutMs(10_000)) + .clusterId() + .get(); out.printf( - "Connection to Kafka `%s` [%s] (id=%s) succeed%n", name, bootstrapServers, clusterId); + "Connection to Kafka `%s` [%s] (id=%s) succeed%n", + name, + bootstrapServers, + clusterId + ); admin - .describeCluster() - .nodes() - .get() - .forEach(node -> System.err.println("Node: " + node)); + .describeCluster() + .nodes() + .get() + .forEach(node -> System.err.println("Node: " + node)); } catch (Exception e) { out.printf("Connection to Kafka `%s` [%s] failed%n", name, bootstrapServers); e.printStackTrace(); @@ -239,35 +268,42 @@ public Integer call() throws Exception { final var auth = srCtx.cluster().auth(); final var httpClient = - switch (auth.type()) { - case BASIC_AUTH -> HttpClient.newBuilder() - .authenticator( - new Authenticator() { - @Override - protected PasswordAuthentication getPasswordAuthentication() { - final var basicAuth = (UsernamePasswordAuth) auth; - return basicAuth.passwordAuth(); - } - }) - .build(); - case NO_AUTH -> HttpClient.newHttpClient(); - }; + switch (auth.type()) { + case BASIC_AUTH -> HttpClient + .newBuilder() + .authenticator( + new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + final var basicAuth = (HttpUsernamePasswordAuth) auth; + return basicAuth.passwordAuth(); + } + } + ) + .build(); + case NO_AUTH -> HttpClient.newHttpClient(); + }; final var urls = srCtx.cluster().urls(); - final var response = - httpClient.send( - HttpRequest.newBuilder().uri(URI.create(urls)).GET().build(), - BodyHandlers.discarding()); + final var response = httpClient.send( + HttpRequest.newBuilder().uri(URI.create(urls)).GET().build(), + BodyHandlers.discarding() + ); if (response.statusCode() == 200) { out.printf("Connection to Schema Registry `%s` [%s] succeed%n", sr, urls); } else { - out.printf("Connection to Schema Registry `%s` URL(s): [%s] failed%n", sr, urls); + out.printf( + "Connection to Schema Registry `%s` URL(s): [%s] failed%n", + sr, + urls + ); return 1; } } else { out.printf( - "WARN: Schema Registry context %s does not exist. " - + "Schema Registry connection properties will not be tested", - sr); + "WARN: Schema Registry context %s does not exist. " + + "Schema Registry connection properties will not be tested", + sr + ); } } } else { @@ -278,20 +314,22 @@ protected PasswordAuthentication getPasswordAuthentication() { } } - @CommandLine.Command(name = "env", description = "env command with properties from context") + @CommandLine.Command( + name = "env", + description = "env command with properties from context" + ) static class EnvCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Context name") String name; @Option( - names = {"--schema-registry", "-sr"}, - description = "Schema Registry context name") + names = { "--schema-registry", "-sr" }, + description = "Schema Registry context name" + ) Optional schemeRegistryContext; - @Option( - names = {"--auth"}, - description = "Include auth env variables") + @Option(names = { "--auth" }, description = "Include auth env variables") boolean includeAuth; @Override @@ -310,8 +348,9 @@ public Integer call() throws Exception { out.println(env + "\n" + srProps); } else { System.err.printf( - "WARN: Schema Registry context %s does not exist. Schema Registry connection properties will not be included", - schemeRegistryContext.get()); + "WARN: Schema Registry context %s does not exist. Schema Registry connection properties will not be included", + schemeRegistryContext.get() + ); } } else { out.println(env); @@ -324,15 +363,19 @@ public Integer call() throws Exception { } } - @CommandLine.Command(name = "kcat", description = "kcat command with properties from context") + @CommandLine.Command( + name = "kcat", + description = "kcat command with properties from context" + ) static class KCatCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Context name") String name; @Option( - names = {"--schema-registry", "-sr"}, - description = "Schema Registry context name") + names = { "--schema-registry", "-sr" }, + description = "Schema Registry context name" + ) Optional schemeRegistryContext; @Override @@ -351,8 +394,9 @@ public Integer call() throws Exception { out.println(kcat + srProps); } else { System.err.printf( - "WARN: Schema Registry context %s does not exist. Schema Registry connection properties will not be included", - schemeRegistryContext.get()); + "WARN: Schema Registry context %s does not exist. Schema Registry connection properties will not be included", + schemeRegistryContext.get() + ); } } else { out.println(kcat); @@ -366,15 +410,16 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "sr", - subcommands = { - SchemaRegistryContextsCommand.CreateCommand.class, - SchemaRegistryContextsCommand.DeleteCommand.class, - }, - description = "Manage Schema Registry connection properties as contexts.") + name = "sr", + subcommands = { + SchemaRegistryContextsCommand.CreateCommand.class, + SchemaRegistryContextsCommand.DeleteCommand.class, + }, + description = "Manage Schema Registry connection properties as contexts." + ) static class SchemaRegistryContextsCommand implements Callable { - @Option(names = {"-v", "--verbose"}) + @Option(names = { "-v", "--verbose" }) boolean verbose; @Override @@ -389,24 +434,26 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "create", - description = "Register context. Destination: ~/.kafka/schema-registry.json") + name = "create", + description = "Register context. Destination: ~/.kafka/schema-registry.json" + ) static class CreateCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Context name. e.g. `local`") String name; @CommandLine.Parameters( - index = "1", - description = "Schema Registry URLs. e.g. `http://localhost:8081`") + index = "1", + description = "Schema Registry URLs. e.g. `http://localhost:8081`" + ) String urls; @CommandLine.Option( - names = "--auth", - description = - "Authentication type (default: ${DEFAULT-VALUE}). Valid values: ${COMPLETION-CANDIDATES}", - required = true, - defaultValue = "NO_AUTH") + names = "--auth", + description = "Authentication type (default: ${DEFAULT-VALUE}). Valid values: ${COMPLETION-CANDIDATES}", + required = true, + defaultValue = "NO_AUTH" + ) SchemaRegistryAuth.AuthType authType; @ArgGroup(exclusive = false) @@ -417,19 +464,27 @@ public Integer call() throws Exception { var contexts = SchemaRegistryContexts.load(); try { final SchemaRegistryAuth auth = - switch (authType) { - case BASIC_AUTH -> SchemaRegistryContexts.UsernamePasswordAuth.build( - authType, usernamePasswordOptions.username, usernamePasswordOptions.password()); - default -> new SchemaRegistryContexts.NoAuth(); - }; - final var ctx = new SchemaRegistryContext(name, new SchemaRegistryCluster(urls, auth)); + switch (authType) { + case BASIC_AUTH -> HttpUsernamePasswordAuth.build( + authType, + usernamePasswordOptions.username, + usernamePasswordOptions.password() + ); + default -> new HttpNoAuth(); + }; + final var ctx = new SchemaRegistryContext( + name, + new SchemaRegistryCluster(urls, auth) + ); contexts.add(ctx); SchemaRegistryContexts.save(contexts); out.printf( - "Schema Registry context `%s` with URL(s): [%s] is saved.", - ctx.name(), ctx.cluster().urls()); + "Schema Registry context `%s` with URL(s): [%s] is saved.", + ctx.name(), + ctx.cluster().urls() + ); return 0; } catch (IllegalArgumentException e) { err.println("ERROR: " + e.getMessage()); @@ -439,8 +494,9 @@ public Integer call() throws Exception { } @CommandLine.Command( - name = "delete", - description = "Removes context. Destination: ~/.kafka/schema-registry.json") + name = "delete", + description = "Removes context. Destination: ~/.kafka/schema-registry.json" + ) static class DeleteCommand implements Callable { @CommandLine.Parameters(index = "0", description = "Context name. e.g. `local`") @@ -456,8 +512,10 @@ public Integer call() throws Exception { SchemaRegistryContexts.save(contexts); out.printf( - "Schema Registry context `%s` with URL(s): [%s] is deleted.%n", - ctx.name(), ctx.cluster().urls()); + "Schema Registry context `%s` with URL(s): [%s] is deleted.%n", + ctx.name(), + ctx.cluster().urls() + ); return 0; } else { out.printf("Schema Registry Context %s is not registered.%n", name); @@ -470,15 +528,17 @@ public Integer call() throws Exception { static class UsernamePasswordOptions { @CommandLine.Option( - names = {"--username", "-u"}, - description = "Username authentication") + names = { "--username", "-u" }, + description = "Username authentication" + ) String username; @CommandLine.Option( - names = {"--password", "-p"}, - description = "Password authentication", - arity = "0..1", - interactive = true) + names = { "--password", "-p" }, + description = "Password authentication", + arity = "0..1", + interactive = true + ) String password; public String password() { @@ -494,16 +554,18 @@ static class VersionProviderWithConfigProvider implements CommandLine.IVersionPr @Override public String[] getVersion() throws IOException { final var url = - VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties"); + VersionProviderWithConfigProvider.class.getClassLoader() + .getResource("cli.properties"); if (url == null) { - return new String[] { - "No cli.properties file found in the classpath.", - }; + return new String[] { "No cli.properties file found in the classpath." }; } final var properties = new Properties(); properties.load(url.openStream()); return new String[] { - properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "", + properties.getProperty("appName") + + " version " + + properties.getProperty("appVersion") + + "", "Built: " + properties.getProperty("appBuildTime"), }; } diff --git a/emulator/pom.xml b/emulator/pom.xml index 8480175..5dce646 100644 --- a/emulator/pom.xml +++ b/emulator/pom.xml @@ -29,5 +29,10 @@ io.github.jeqo.kafka kafka-context + + org.xerial + sqlite-jdbc + 3.36.0.3 + \ No newline at end of file diff --git a/emulator/src/main/java/kafka/emulator/ArchiveStore.java b/emulator/src/main/java/kafka/emulator/ArchiveStore.java new file mode 100644 index 0000000..af4fc7b --- /dev/null +++ b/emulator/src/main/java/kafka/emulator/ArchiveStore.java @@ -0,0 +1,238 @@ +package kafka.emulator; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +public interface ArchiveStore { + EmulatorArchive load(); + + void save(EmulatorArchive archive); + + class SqliteArchiveLoader implements ArchiveStore { + + final StringDeserializer stringDeserializer = new StringDeserializer(); + final LongDeserializer longDeserializer = new LongDeserializer(); + final IntegerDeserializer intDeserializer = new IntegerDeserializer(); + final StringSerializer stringSerializer = new StringSerializer(); + final LongSerializer longSerializer = new LongSerializer(); + final IntegerSerializer intSerializer = new IntegerSerializer(); + + final Path archivePath; + + public SqliteArchiveLoader(Path archivePath) { + this.archivePath = archivePath; + } + + @Override + public EmulatorArchive load() { + final var db = "jdbc:sqlite:" + archivePath.toAbsolutePath(); + try (final var conn = DriverManager.getConnection(db)) { + final var archive = EmulatorArchive.create(); + final var st = conn.createStatement(); + final var rs = st.executeQuery( + """ + SELECT * + FROM records_v1 + ORDER BY offset ASC""" + ); + while (rs.next()) { + final var tp = new TopicPartition( + rs.getString("topic"), + rs.getInt("partition") + ); + final var keyFormat = EmulatorArchive.FieldFormat.valueOf( + rs.getString("key_format") + ); + final var key = + switch (keyFormat) { + case BYTES -> rs.getBytes("key_bytes"); + case INTEGER -> intSerializer.serialize("", rs.getInt("key_int")); + case LONG -> longSerializer.serialize("", rs.getLong("key_long")); + case STRING -> stringSerializer.serialize("", rs.getString("key_string")); + }; + final var valueFormat = EmulatorArchive.FieldFormat.valueOf( + rs.getString("value_format") + ); + final var value = + switch (valueFormat) { + case BYTES -> rs.getBytes("value_bytes"); + case INTEGER -> intSerializer.serialize("", rs.getInt("value_int")); + case LONG -> longSerializer.serialize("", rs.getLong("value_long")); + case STRING -> stringSerializer.serialize("", rs.getString("value_string")); + }; + final var record = new EmulatorArchive.EmulatorRecord( + tp.topic(), + tp.partition(), + rs.getLong("offset"), + rs.getLong("timestamp"), + rs.getLong("after_ms"), + keyFormat, + key, + valueFormat, + value + ); + archive.append(tp, record); + } + return archive; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void save(EmulatorArchive archive) { + final var db = "jdbc:sqlite:" + archivePath.toAbsolutePath(); + try (final var conn = DriverManager.getConnection(db)) { + final var st = conn.createStatement(); + // Prepare schema + st.executeUpdate( + """ + CREATE TABLE IF NOT EXISTS records_v1 + ( + topic text not null, + partition int not null, + offset long, + timestamp long, + after_ms long not null, + key_format text not null, + value_format text not null, + key_bytes bytes, + value_bytes bytes, + key_string text, + value_string text, + key_int int, + value_int int, + key_long long, + value_long long + )""" + ); + st.executeUpdate( + """ + CREATE INDEX IF NOT EXISTS records_v1_topic + ON records_v1 (topic)""" + ); + st.executeUpdate( + """ + CREATE INDEX IF NOT EXISTS records_v1_partition + ON records_v1 (partition)""" + ); + st.executeUpdate( + """ + CREATE INDEX IF NOT EXISTS records_v1_offset + ON records_v1 (offset)""" + ); + // prepare batch + final var ps = conn.prepareStatement( + """ + INSERT INTO records_v1 ( + topic, + partition, + offset, + timestamp, + after_ms, + key_format, + value_format, + key_bytes, + value_bytes, + key_string, + value_string, + key_int, + value_int, + key_long, + value_long + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, + ?, ?, + ?, ?, + ?, ?, + ?, ? + )""" + ); + archive + .all() + .parallelStream() + .flatMap(Collection::stream) + .forEach(r -> { + try { + ps.setString(1, r.topic()); + ps.setInt(2, r.partition()); + ps.setLong(3, r.offset()); + ps.setLong(4, r.timestamp()); + ps.setLong(5, r.afterMs()); + ps.setString(6, r.keyFormat().name()); + ps.setString(7, r.valueFormat().name()); + if (r.key() != null) { + switch (r.keyFormat()) { + case BYTES -> ps.setBytes(8, r.key()); + case STRING -> ps.setString( + 10, + stringDeserializer.deserialize("", r.key()) + ); + case INTEGER -> ps.setInt(12, intDeserializer.deserialize("", r.key())); + case LONG -> ps.setLong(14, longDeserializer.deserialize("", r.key())); + } + } + if (r.value() != null) { + switch (r.valueFormat()) { + case BYTES -> ps.setBytes(9, r.value()); + case STRING -> ps.setString( + 11, + stringDeserializer.deserialize("", r.value()) + ); + case INTEGER -> ps.setInt( + 13, + intDeserializer.deserialize("", r.value()) + ); + case LONG -> ps.setLong( + 15, + longDeserializer.deserialize("", r.value()) + ); + } + } + ps.addBatch(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + ps.executeBatch(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) { + var loader = new SqliteArchiveLoader(Path.of("test.db")); + var archive = EmulatorArchive.create(); + archive.append( + new TopicPartition("t1", 0), + new EmulatorArchive.EmulatorRecord( + "t1", + 0, + 0L, + System.currentTimeMillis(), + 100L, + EmulatorArchive.FieldFormat.BYTES, + "s".getBytes(StandardCharsets.UTF_8), + EmulatorArchive.FieldFormat.BYTES, + "v".getBytes(StandardCharsets.UTF_8) + ) + ); + loader.save(archive); + + var archive2 = loader.load(); + + System.out.println(archive2.equals(archive)); + } + } +} diff --git a/emulator/src/main/java/kafka/emulator/Cli.java b/emulator/src/main/java/kafka/emulator/Cli.java new file mode 100644 index 0000000..5c06ec9 --- /dev/null +++ b/emulator/src/main/java/kafka/emulator/Cli.java @@ -0,0 +1,138 @@ +package kafka.emulator; + +import static java.lang.System.err; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import kafka.context.KafkaContexts; +import kafka.context.sr.SchemaRegistryContexts; +import picocli.CommandLine; + +public class Cli { + + static class PackCommand { + + PropertiesOption propertiesOption; + List topics; + } + + static class UnpackCommand { + + PropertiesOption propertiesOption; + boolean dryRun; // false + RepeatOptions repeatOptions; + + static class RepeatOptions { + + boolean repeat; // false + long afterMs; + } + } + + static class PropertiesOption { + + @CommandLine.Option( + names = { "-c", "--config" }, + description = "Client configuration properties file." + + "Must include connection to Kafka" + ) + Optional configPath; + + @CommandLine.ArgGroup(exclusive = false) + ContextOption contextOption; + + public Properties load() { + return configPath + .map(path -> { + try { + final var p = new Properties(); + p.load(Files.newInputStream(path)); + return p; + } catch (Exception e) { + throw new IllegalArgumentException( + "ERROR: properties file at %s is failing to load".formatted(path) + ); + } + }) + .orElseGet(() -> { + try { + return contextOption.load(); + } catch (IOException e) { + throw new IllegalArgumentException("ERROR: loading contexts"); + } + }); + } + } + + static class ContextOption { + + @CommandLine.Option( + names = "--kafka", + description = "Kafka context name", + required = true + ) + String kafkaContextName; + + @CommandLine.Option(names = "--sr", description = "Schema Registry context name") + Optional srContextName; + + public Properties load() throws IOException { + final var kafkas = KafkaContexts.load(); + final var props = new Properties(); + if (kafkas.has(kafkaContextName)) { + final var kafka = kafkas.get(kafkaContextName); + final var kafkaProps = kafka.properties(); + props.putAll(kafkaProps); + + if (srContextName.isPresent()) { + final var srs = SchemaRegistryContexts.load(); + final var srName = srContextName.get(); + if (srs.has(srName)) { + final var sr = srs.get(srName); + final var srProps = sr.properties(); + props.putAll(srProps); + } else { + err.printf( + "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", + srName + ); + } + } + + return props; + } else { + err.printf( + "ERROR: Kafka context `%s` not found. Check that context already exist.%n", + kafkaContextName + ); + return null; + } + } + } + + static class VersionProviderWithConfigProvider implements CommandLine.IVersionProvider { + + @Override + public String[] getVersion() throws IOException { + final var url = + VersionProviderWithConfigProvider.class.getClassLoader() + .getResource("cli.properties"); + if (url == null) { + return new String[] { "No cli.properties file found in the classpath." }; + } + final var properties = new Properties(); + properties.load(url.openStream()); + return new String[] { + properties.getProperty("appName") + + " version " + + properties.getProperty("appVersion") + + "", + "Built: " + properties.getProperty("appBuildTime"), + }; + } + } +} diff --git a/emulator/src/main/java/kafka/emulator/EmulatorArchive.java b/emulator/src/main/java/kafka/emulator/EmulatorArchive.java new file mode 100644 index 0000000..82aeaed --- /dev/null +++ b/emulator/src/main/java/kafka/emulator/EmulatorArchive.java @@ -0,0 +1,121 @@ +package kafka.emulator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +public class EmulatorArchive { + + private final Map> records = new HashMap<>(); + private final Map oldestOffsets = new HashMap<>(); + private final Map oldestTimestamps = new HashMap<>(); + + public static EmulatorArchive create() { + return new EmulatorArchive(); + } + + public void append(TopicPartition topicPartition, EmulatorRecord zipRecord) { + records.computeIfPresent( + topicPartition, + (tp, zipRecords) -> { + zipRecords.add(zipRecord); + oldestOffsets.put( + tp, + oldestOffsets.get(tp) < zipRecord.offset() + ? zipRecord.offset() + : oldestOffsets.get(tp) + ); + oldestTimestamps.put( + tp, + oldestTimestamps.get(tp) < zipRecord.timestamp() + ? zipRecord.timestamp() + : oldestTimestamps.get(tp) + ); + return zipRecords; + } + ); + records.computeIfAbsent( + topicPartition, + tp -> { + final var zipRecords = new ArrayList(); + zipRecords.add(zipRecord); + oldestOffsets.put(tp, zipRecord.offset()); + oldestTimestamps.put(tp, zipRecord.timestamp()); + return zipRecords; + } + ); + } + + public Set topicPartitions() { + return records.keySet(); + } + + public Map topicPartitionNumber() { + final var map = new HashMap(); + for (var tp : records.keySet()) { + final var partitions = tp.partition() + 1; + map.computeIfPresent(tp.topic(), (t, p) -> partitions > p ? partitions : p); + map.putIfAbsent(tp.topic(), partitions); + } + return map; + } + + public Collection> all() { + return records.values(); + } + + public List records(TopicPartition tp) { + return records.get(tp); + } + + public Long oldestOffsets(TopicPartition tp) { + return oldestOffsets.get(tp); + } + + public Long oldestTimestamps(TopicPartition tp) { + return oldestTimestamps.get(tp); + } + + record EmulatorRecord( + String topic, + int partition, + long offset, + long timestamp, + long afterMs, + FieldFormat keyFormat, + byte[] key, + FieldFormat valueFormat, + byte[] value + ) { + public static EmulatorRecord from( + ConsumerRecord record, + FieldFormat keyFormat, + FieldFormat valueFormat, + long afterMs + ) { + return new EmulatorRecord( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + afterMs, + keyFormat, + record.key(), + valueFormat, + record.value() + ); + } + } + + enum FieldFormat { + STRING, + LONG, + INTEGER, + BYTES, + } +} diff --git a/emulator/src/main/java/kafka/emulator/KafkaEmulator.java b/emulator/src/main/java/kafka/emulator/KafkaEmulator.java new file mode 100644 index 0000000..1adca65 --- /dev/null +++ b/emulator/src/main/java/kafka/emulator/KafkaEmulator.java @@ -0,0 +1,325 @@ +package kafka.emulator; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import kafka.context.KafkaContext; +import kafka.context.KafkaContexts; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaEmulator { + + static final Logger LOG = LoggerFactory.getLogger(KafkaEmulator.class); + final ArchiveStore archiveLoader; + + public KafkaEmulator(ArchiveStore archiveLoader) { + this.archiveLoader = archiveLoader; + } + + /** + * Read and package topics records into an archive. + * + *

Based on the start and end conditions, start a consumer and poll records per partition. + * Transform into archive records and append to the archive. Once the end condition is given, the + * results are flushed into the zip archive files. + * + */ + public void record( + KafkaContext kafkaContext, + List topics, + EmulatorArchive.FieldFormat keyFormat, + EmulatorArchive.FieldFormat valueFormat, + StartFromOption startFrom, + EndAtOption endAt + ) throws IOException { + final var endTime = System.currentTimeMillis(); + // create consumer + final var properties = kafkaContext.properties(); + properties.put("group.id", "emulator-" + endTime); + final var keyDeserializer = new ByteArrayDeserializer(); + final var valueDeserializer = new ByteArrayDeserializer(); + var consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer); + // set offsets from + // consumer loop + var latestTimestamps = new HashMap(); + final var archive = EmulatorArchive.create(); + var listTopics = consumer.listTopics(); + var topicPartitions = new ArrayList(); + for (var topic : topics) { + var partitionsInfo = listTopics.get(topic); + topicPartitions.addAll( + partitionsInfo + .stream() + .map(info -> new TopicPartition(info.topic(), info.partition())) + .toList() + ); + } + consumer.assign(topicPartitions); + if (!startFrom.offsets().isEmpty()) { + startFrom.offsets().forEach(consumer::seek); + } + if (startFrom.timestamp().isPresent()) { + final var ts = startFrom.timestamp().getAsLong(); + final var offsets = consumer.offsetsForTimes( + topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> ts)) + ); + offsets.forEach((tp, offsetAndTimestamp) -> + consumer.seek(tp, offsetAndTimestamp.offset()) + ); + } else { + consumer.seekToBeginning(topicPartitions); + } + + var allDone = false; + var done = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> false)); + while (!allDone) { + // set offsets to + // break by topic-partition + var records = consumer.poll(Duration.ofSeconds(5)); + for (var partition : records.partitions()) { + if (done.get(partition)) break; + // start: per partition + var perPartition = records.records(partition); + for (var record : perPartition) { + // transform to ZipRecord + var latestTimestamp = latestTimestamps.getOrDefault(partition, -1L); + final var currentTimestamp = record.timestamp(); + if (currentTimestamp >= endTime) { + break; + } + final long afterMs; + if (latestTimestamp < 0) { + afterMs = 0; + } else { + afterMs = currentTimestamp - latestTimestamp; + } + var zipRecord = EmulatorArchive.EmulatorRecord.from( + record, + keyFormat, + valueFormat, + afterMs + ); + // append to topic-partition file + archive.append(partition, zipRecord); + latestTimestamps.put(partition, currentTimestamp); + if (isDone(partition, archive, endAt)) { + done.put(partition, true); + break; + } + } + // end: per partition + } + if (records.isEmpty()) { + allDone = true; + } else { + allDone = done.values().stream().reduce((r, l) -> r && l).orElse(false); + } + } + archiveLoader.save(archive); + } + + private boolean isDone(TopicPartition tp, EmulatorArchive archive, EndAtOption endAt) { + if (!endAt.now()) { + if (endAt.recordsPerPartition().isPresent()) { + var enough = true; + final var total = endAt.recordsPerPartition().getAsInt(); + final var size = archive.records(tp).size(); + if (total > size) { + enough = false; + } + return enough; + } + + if (!endAt.offsets.isEmpty()) { + return archive.oldestOffsets(tp) >= endAt.offsets().get(tp); + } + + if (endAt.timestamp().isPresent()) { + return archive.oldestTimestamps(tp) >= endAt.timestamp().getAsLong(); + } + } + return false; + } + + /** + * Read zip archive files and produce records into Kafka topics with the frequency defined in the + * archive. + */ + public void replay( + KafkaContext kafkaContext, + Map topicMap, + boolean noWait + ) throws InterruptedException { + // load archive + var archive = archiveLoader.load(); + // create producer + var keySerializer = new ByteArraySerializer(); + var valueSerializer = new ByteArraySerializer(); + final var props = kafkaContext.properties(); + props.put("acks", "1"); + var producer = new KafkaProducer<>(props, keySerializer, valueSerializer); + // per partition + final var topicPartitionNumber = archive.topicPartitionNumber(); + // prepare topics + try (var admin = AdminClient.create(kafkaContext.properties())) { + var topics = admin.listTopics().names().get(); + var newTopics = new ArrayList(); + for (var t : topicPartitionNumber.keySet()) { + final var topicName = topicMap.getOrDefault(t, t); + if (!topics.contains(topicName)) { + final var newTopic = new NewTopic( + topicName, + Optional.of(topicPartitionNumber.get(t)), + Optional.empty() + ); + newTopics.add(newTopic); + } + } + admin.createTopics(newTopics).all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + final var size = archive.topicPartitions().size(); + var executor = Executors.newFixedThreadPool(size); + var countDownLatch = new CountDownLatch(size); + for (var topicPartition : archive.topicPartitions()) { + var rs = archive.records(topicPartition); + executor.submit(() -> { + long prevTime = 0L; + // per record + for (var r : rs) { + // prepare record + final var topicName = topicMap.getOrDefault(r.topic(), r.topic()); + var record = new ProducerRecord<>(topicName, r.partition(), r.key(), r.value()); + try { + // wait + var wait = (prevTime + r.afterMs()) - System.currentTimeMillis(); + if (!noWait && wait > 0) { + LOG.info("{}:{}: waiting {} ms.", topicPartition, r.offset(), r.afterMs()); + Thread.sleep(r.afterMs()); + } else { + LOG.info( + "{}:{}: no waiting (after: {} ms.)", + topicPartition, + r.offset(), + r.afterMs() + ); + } + var meta = producer.send(record).get(); + prevTime = meta.timestamp(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + countDownLatch.countDown(); + LOG.info("Replay for {} finished", topicPartition); + }); + } + countDownLatch.await(); + LOG.info("Replay finished"); + executor.shutdown(); + } + + record StartFromOption( + boolean beginning, + Map offsets, + OptionalLong timestamp + ) { + public static StartFromOption of() { + return new StartFromOption(true, Map.of(), OptionalLong.empty()); + } + public static StartFromOption of(Map offsets) { + return new StartFromOption(true, offsets, OptionalLong.empty()); + } + public static StartFromOption of(Map offsets, long timestamp) { + return new StartFromOption(false, offsets, OptionalLong.of(timestamp)); + } + public static StartFromOption of(Instant timestamp) { + return new StartFromOption( + false, + Map.of(), + OptionalLong.of(timestamp.toEpochMilli()) + ); + } + } + + record EndAtOption( + boolean now, + OptionalInt recordsPerPartition, + Map offsets, + OptionalLong timestamp + ) { + public static EndAtOption of() { + return new EndAtOption(true, OptionalInt.empty(), Map.of(), OptionalLong.empty()); + } + public static EndAtOption of(int recordsPerPartition) { + return new EndAtOption( + false, + OptionalInt.of(recordsPerPartition), + Map.of(), + OptionalLong.empty() + ); + } + + public static EndAtOption of(Instant timestamp) { + return new EndAtOption( + false, + OptionalInt.empty(), + Map.of(), + OptionalLong.of(timestamp.toEpochMilli()) + ); + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + var archiveLoader = new ArchiveStore.SqliteArchiveLoader(Path.of("test.db")); + var emulator = new KafkaEmulator(archiveLoader); + var context = KafkaContexts.load().get("local"); + + emulator.record( + context, + List.of("t5"), + EmulatorArchive.FieldFormat.STRING, + EmulatorArchive.FieldFormat.STRING, + StartFromOption.of(), + EndAtOption.of(10) + ); + + emulator.replay(context, Map.of("t5", "t14"), true); + // Check Thread handling by parallel stream + // final var map = Map.of("s1", "a", "s2", "b", "s3", "c"); + // map.keySet().parallelStream() + // .forEach( + // k -> { + // System.out.println(Thread.currentThread().getName()); + // try { + // Thread.sleep(1000); + // } catch (InterruptedException e) { + // throw new RuntimeException(e); + // } + // System.out.println(map.get(k)); + // }); + } +} diff --git a/emulator/src/main/java/kafka/zip/ArchiveLoader.java b/emulator/src/main/java/kafka/zip/ArchiveLoader.java deleted file mode 100644 index f2e25a6..0000000 --- a/emulator/src/main/java/kafka/zip/ArchiveLoader.java +++ /dev/null @@ -1,6 +0,0 @@ -package kafka.zip; - -public interface ArchiveLoader { - EmulatorArchive open(); - void save(); -} diff --git a/emulator/src/main/java/kafka/zip/Cli.java b/emulator/src/main/java/kafka/zip/Cli.java deleted file mode 100644 index 238c78a..0000000 --- a/emulator/src/main/java/kafka/zip/Cli.java +++ /dev/null @@ -1,124 +0,0 @@ -package kafka.zip; - -import static java.lang.System.err; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.Optional; -import java.util.Properties; -import kafka.context.KafkaContexts; -import kafka.context.SchemaRegistryContexts; -import picocli.CommandLine; - -public class Cli { - static class PackCommand { - PropertiesOption propertiesOption; - List topics; - } - - static class UnpackCommand { - PropertiesOption propertiesOption; - boolean dryRun; // false - RepeatOptions repeatOptions; - - static class RepeatOptions { - boolean repeat; // false - long afterMs; - } - } - - static class PropertiesOption { - - @CommandLine.Option( - names = {"-c", "--config"}, - description = "Client configuration properties file." + "Must include connection to Kafka") - Optional configPath; - - @CommandLine.ArgGroup(exclusive = false) - ContextOption contextOption; - - public Properties load() { - return configPath - .map( - path -> { - try { - final var p = new Properties(); - p.load(Files.newInputStream(path)); - return p; - } catch (Exception e) { - throw new IllegalArgumentException( - "ERROR: properties file at %s is failing to load".formatted(path)); - } - }) - .orElseGet( - () -> { - try { - return contextOption.load(); - } catch (IOException e) { - throw new IllegalArgumentException("ERROR: loading contexts"); - } - }); - } - } - - static class ContextOption { - - @CommandLine.Option(names = "--kafka", description = "Kafka context name", required = true) - String kafkaContextName; - - @CommandLine.Option(names = "--sr", description = "Schema Registry context name") - Optional srContextName; - - public Properties load() throws IOException { - final var kafkas = KafkaContexts.load(); - final var props = new Properties(); - if (kafkas.has(kafkaContextName)) { - final var kafka = kafkas.get(kafkaContextName); - final var kafkaProps = kafka.properties(); - props.putAll(kafkaProps); - - if (srContextName.isPresent()) { - final var srs = SchemaRegistryContexts.load(); - final var srName = srContextName.get(); - if (srs.has(srName)) { - final var sr = srs.get(srName); - final var srProps = sr.properties(); - props.putAll(srProps); - } else { - err.printf( - "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", srName); - } - } - - return props; - } else { - err.printf( - "ERROR: Kafka context `%s` not found. Check that context already exist.%n", - kafkaContextName); - return null; - } - } - } - - static class VersionProviderWithConfigProvider implements CommandLine.IVersionProvider { - - @Override - public String[] getVersion() throws IOException { - final var url = - VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties"); - if (url == null) { - return new String[] { - "No cli.properties file found in the classpath.", - }; - } - final var properties = new Properties(); - properties.load(url.openStream()); - return new String[] { - properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "", - "Built: " + properties.getProperty("appBuildTime"), - }; - } - } -} diff --git a/emulator/src/main/java/kafka/zip/EmulatorArchive.java b/emulator/src/main/java/kafka/zip/EmulatorArchive.java deleted file mode 100644 index fc7accc..0000000 --- a/emulator/src/main/java/kafka/zip/EmulatorArchive.java +++ /dev/null @@ -1,93 +0,0 @@ -package kafka.zip; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -public class EmulatorArchive { - Map> records = new HashMap<>(); - - public static EmulatorArchive load(Path directory) throws IOException { - try (var list = Files.list(directory)) { - var tpToPath = list - .filter(p -> p.endsWith(".txt")) - .filter(p -> p.getFileName().toString().contains("-")) - .collect(Collectors.toMap(p -> { - var filename = p.getFileName().toString(); - var topic = filename.substring(0, filename.lastIndexOf("-") - 1); - var partition = Integer.parseInt( - filename.substring(filename.lastIndexOf("-"), filename.lastIndexOf("."))); - return new TopicPartition(topic, partition); - }, p -> p)); - - } - return null; - } - - public static EmulatorArchive create() { - return new EmulatorArchive(); - } - - public void append(TopicPartition topicPartition, EmulatorRecord zipRecord) { - records.computeIfPresent(topicPartition, (topicPartition1, zipRecords) -> { - zipRecords.add(zipRecord); - return zipRecords; - }); - records.computeIfAbsent(topicPartition, tp -> { - final var zipRecords = new ArrayList(); - zipRecords.add(zipRecord); - return zipRecords; - }); - } - - public boolean isDone() { - return false; - } - - public void save() throws IOException { - for (var tp : records.keySet()) { - var tpPath = Path.of(tp.topic() + "-" + tp.partition() + ".csv"); - Files.writeString(tpPath, "after_ms,key,value\n", StandardOpenOption.CREATE); - for (var record : records.get(tp)) { - Files.writeString(tpPath, record.toLine(), StandardOpenOption.APPEND); - } - } - } - - record EmulatorRecord( - String topic, - int partition, - long afterMs, - FieldFormat keyFormat, - byte[] key, - FieldFormat valueFormat, - byte[] value) { - public static EmulatorRecord from(ConsumerRecord record, long afterMs) { - return new EmulatorRecord(record.topic(), record.partition(), afterMs, FieldFormat.BYTES, record.key(), FieldFormat.BYTES, record.value()); - } - - public String toLine () { - var k = key == null ? "" : Base64.getEncoder().encodeToString(key); - var v = value == null ? "" : Base64.getEncoder().encodeToString(value); - return afterMs - + "," + k - + "," + v - + "\n"; - } - } - - enum FieldFormat { - STRING, - LONG, - BYTES - } -} diff --git a/emulator/src/main/java/kafka/zip/KafkaEmulator.java b/emulator/src/main/java/kafka/zip/KafkaEmulator.java deleted file mode 100644 index e7148e3..0000000 --- a/emulator/src/main/java/kafka/zip/KafkaEmulator.java +++ /dev/null @@ -1,134 +0,0 @@ -package kafka.zip; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import kafka.context.KafkaContexts; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -public class KafkaEmulator { - - /** - * Read and package topics records into an archive. - * - *

Based on the start and end conditions, start a consumer and poll records per partition. - * Transform into archive records and append to the archive. Once the end condition is given, the - * results are flushed into the zip archive files. - * - * @param kafkaContext - * @param topics - * @param fromBeginning // TODO replace with startCondition // TODO add endCondition - * @return - */ - public RecordingResult record( - KafkaContexts.KafkaContext kafkaContext, List topics, boolean fromBeginning) - throws IOException { - final var endTime = System.currentTimeMillis(); - // create consumer - final var properties = kafkaContext.properties(); - properties.put("group.id", "emulator-" + endTime); - final var keyDeserializer = new ByteArrayDeserializer(); - final var valueDeserializer = new ByteArrayDeserializer(); - var consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer); - // set offsets from - // consumer loop - var done = false; - var latestTimestamps = new HashMap(); - final var archive = EmulatorArchive.create(); - var listTopics = consumer.listTopics(); - var topicPartitions = new ArrayList(); - for (var topic : topics) { - var partitionsInfo = listTopics.get(topic); - topicPartitions.addAll( - partitionsInfo.stream() - .map(info -> new TopicPartition(info.topic(), info.partition())) - .toList()); - } - consumer.assign(topicPartitions); - consumer.seekToBeginning(topicPartitions); - while (!done) { - // set offsets to - // break by topic-partition - var records = consumer.poll(Duration.ofSeconds(5)); - for (var partition : records.partitions()) { - // start: per partition - var perPartition = records.records(partition); - for (var record : perPartition) { - // transform to ZipRecord - var latestTimestamp = latestTimestamps.getOrDefault(partition, -1L); - final var currentTimestamp = record.timestamp(); - if (currentTimestamp >= endTime) { - break; - } - final long afterMs; - if (latestTimestamp < 0) { - afterMs = 0; - } else { - afterMs = currentTimestamp - latestTimestamp; - } - var zipRecord = EmulatorArchive.EmulatorRecord.from(record, afterMs); - // append to topic-partition file - archive.append(partition, zipRecord); - latestTimestamps.put(partition, currentTimestamp); - } - if (isDone(archive)) { - done = true; - break; - } - // end: per partition - } - if (records.isEmpty()) { - done = true; - } - } - return new RecordingResult(archive); - } - - private boolean isDone(EmulatorArchive archive) { - return false; - } - - /** - * Read zip archive files and produce records into Kafka topics with the frequency defined in the - * archive. - * - * @param kafkaContext - * @return - */ - public ReplayResult replay(KafkaContexts.KafkaContext kafkaContext, Path directory) throws IOException { - //create producer - var keySerializer = new ByteArraySerializer(); - var valueSerializer = new ByteArraySerializer(); - var producer = new KafkaProducer<>(kafkaContext.properties(), keySerializer, valueSerializer); - //per partition - //per record - //prepare record - //wait - return null; - } - - private class RecordingResult { - final EmulatorArchive archive; - - private RecordingResult(EmulatorArchive archive) { - this.archive = archive; - } - } - - private class ReplayResult {} - - public static void main(String[] args) throws IOException { - var zip = new KafkaEmulator(); - var context = KafkaContexts.load().get("local"); - var result = zip.record(context, List.of("t1"), true); - result.archive.save(); - } -} diff --git a/emulator/src/main/java/kafka/zip/SqliteArchiveLoader.java b/emulator/src/main/java/kafka/zip/SqliteArchiveLoader.java deleted file mode 100644 index a0f8aa6..0000000 --- a/emulator/src/main/java/kafka/zip/SqliteArchiveLoader.java +++ /dev/null @@ -1,23 +0,0 @@ -package kafka.zip; - -import java.nio.file.Path; - -public class SqliteArchiveLoader implements ArchiveLoader { - - final Path archivePath; - - public SqliteArchiveLoader(Path archivePath) { - this.archivePath = archivePath; - } - - @Override - public EmulatorArchive open() { - - return null; - } - - @Override - public void save() { - - } -} diff --git a/pom.xml b/pom.xml index 9f86188..bd3206a 100644 --- a/pom.xml +++ b/pom.xml @@ -29,9 +29,9 @@ ${java.version} 4.6.3 - ${version.clean.plugin} + 3.1.0 7.0.1 - 0.2.0 + 0.3.1-SNAPSHOT 2.13.3 1.7.36 5.8.2 @@ -243,11 +243,6 @@ maven-javadoc-plugin ${version.javadoc.plugin} - - org.apache.maven.plugins - maven-site-plugin - ${version.site.plugin} - org.apache.maven.plugins maven-source-plugin @@ -283,11 +278,20 @@ - com.spotify.fmt - fmt-maven-plugin - 2.16 + com.hubspot.maven.plugins + prettier-maven-plugin + 0.16 + + 1.5.0 + 90 + 2 + false + true + true + + validate check diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/Cli.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/Cli.java index b34b01d..c25c66f 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/Cli.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/Cli.java @@ -23,18 +23,19 @@ import picocli.CommandLine.Option; @CommandLine.Command( - name = "kfk-producer-datagen", - versionProvider = VersionProviderWithConfigProvider.class, - mixinStandardHelpOptions = true, - descriptionHeading = "Kafka CLI - Producer Datagen", - description = "Kafka Producer with Data generation", - subcommands = { - PerfCommand.class, - IntervalCommand.class, - ProduceOnceCommand.class, - SampleCommand.class, - ListTopicsCommand.class, - }) + name = "kfk-producer-datagen", + versionProvider = VersionProviderWithConfigProvider.class, + mixinStandardHelpOptions = true, + descriptionHeading = "Kafka CLI - Producer Datagen", + description = "Kafka Producer with Data generation", + subcommands = { + PerfCommand.class, + IntervalCommand.class, + ProduceOnceCommand.class, + SampleCommand.class, + ListTopicsCommand.class, + } +) public class Cli implements Callable { public static void main(String[] args) { @@ -51,10 +52,10 @@ public Integer call() { public static class PropertiesOption { @CommandLine.Option( - names = {"-c", "--config"}, - description = - "Client configuration properties file." - + "Must include connection to Kafka and Schema Registry") + names = { "-c", "--config" }, + description = "Client configuration properties file." + + "Must include connection to Kafka and Schema Registry" + ) Optional configPath; @ArgGroup(exclusive = false) @@ -62,25 +63,24 @@ public static class PropertiesOption { public Properties load() { return configPath - .map( - path -> { - try { - final var p = new Properties(); - p.load(Files.newInputStream(path)); - return p; - } catch (Exception e) { - throw new IllegalArgumentException( - "ERROR: properties file at %s is failing to load".formatted(path)); - } - }) - .orElseGet( - () -> { - try { - return contextOption.load(); - } catch (IOException e) { - throw new IllegalArgumentException("ERROR: loading contexts"); - } - }); + .map(path -> { + try { + final var p = new Properties(); + p.load(Files.newInputStream(path)); + return p; + } catch (Exception e) { + throw new IllegalArgumentException( + "ERROR: properties file at %s is failing to load".formatted(path) + ); + } + }) + .orElseGet(() -> { + try { + return contextOption.load(); + } catch (IOException e) { + throw new IllegalArgumentException("ERROR: loading contexts"); + } + }); } } @@ -109,15 +109,18 @@ public Properties load() throws IOException { props.putAll(srProps); } else { err.printf( - "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", srName); + "WARN: Schema Registry context `%s` not found. Proceeding without it.%n", + srName + ); } } return props; } else { err.printf( - "ERROR: Kafka context `%s` not found. Check that context already exist.%n", - kafkaContextName); + "ERROR: Kafka context `%s` not found. Check that context already exist.%n", + kafkaContextName + ); return null; } } @@ -126,13 +129,15 @@ public Properties load() throws IOException { public static class SchemaSourceOption { @Option( - names = {"-q", "--quickstart"}, - description = "Quickstart name. Valid values: ${COMPLETION-CANDIDATES}") + names = { "-q", "--quickstart" }, + description = "Quickstart name. Valid values: ${COMPLETION-CANDIDATES}" + ) public Optional quickstart; @Option( - names = {"-s", "--schema"}, - description = "Path to Avro schema to use for generating records.") + names = { "-s", "--schema" }, + description = "Path to Avro schema to use for generating records." + ) public Optional schemaPath; } @@ -141,16 +146,18 @@ static class VersionProviderWithConfigProvider implements IVersionProvider { @Override public String[] getVersion() throws IOException { final var url = - VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties"); + VersionProviderWithConfigProvider.class.getClassLoader() + .getResource("cli.properties"); if (url == null) { - return new String[] { - "No cli.properties file found in the classpath.", - }; + return new String[] { "No cli.properties file found in the classpath." }; } final var properties = new Properties(); properties.load(url.openStream()); return new String[] { - properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "", + properties.getProperty("appName") + + " version " + + properties.getProperty("appVersion") + + "", "Built: " + properties.getProperty("appBuildTime"), }; } diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java index fcf3abd..59d7a9f 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java @@ -13,10 +13,11 @@ public class IntervalRunner { final Stats stats; public IntervalRunner( - Config config, - KafkaProducer producer, - PayloadGenerator payloadGenerator, - Stats stats) { + Config config, + KafkaProducer producer, + PayloadGenerator payloadGenerator, + Stats stats + ) { this.config = config; this.producer = producer; this.payloadGenerator = payloadGenerator; diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java index 9d8a671..8543549 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java @@ -37,19 +37,18 @@ public PayloadGenerator(Config config) { this.format = config.format(); this.random = new Random(); config - .randomSeed() - .ifPresent( - r -> { - random.setSeed(r); - random.setSeed(random.nextLong()); - }); + .randomSeed() + .ifPresent(r -> { + random.setSeed(r); + random.setSeed(random.nextLong()); + }); this.generator = - new Generator.Builder() - .random(random) - .generation(config.count()) - .schema(config.schema()) - .build(); + new Generator.Builder() + .random(random) + .generation(config.count()) + .schema(config.schema()) + .build(); this.keyFieldName = config.keyFieldName(); } @@ -57,9 +56,11 @@ public GenericRecord get() { final Object generatedObject = generator.generate(); if (!(generatedObject instanceof GenericRecord)) { throw new RuntimeException( - String.format( - "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", - generatedObject.getClass().getName())); + String.format( + "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", + generatedObject.getClass().getName() + ) + ); } return (GenericRecord) generatedObject; } @@ -81,7 +82,9 @@ String toJson(GenericRecord record) { final var outputStream = new ByteArrayOutputStream(); final var schema = record.getSchema(); final var datumWriter = new GenericDatumWriter(schema); - final var encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), outputStream); + final var encoder = EncoderFactory + .get() + .jsonEncoder(record.getSchema(), outputStream); datumWriter.write(record, encoder); encoder.flush(); return outputStream.toString(); @@ -121,30 +124,30 @@ public String schema() { } public record Config( - Optional randomSeed, - Optional quickstart, - Optional schemaPath, - long count, - Format format) { - + Optional randomSeed, + Optional quickstart, + Optional schemaPath, + long count, + Format format + ) { Schema schema() { return quickstart - .map(Quickstart::getSchemaFilename) - .map(Config::getSchemaFromSchemaFileName) - .orElse( - schemaPath - .map( - s -> { - Schema schemaFromSchemaFileName = null; - try { - schemaFromSchemaFileName = - getSchemaFromSchemaFileName(Files.newInputStream(schemaPath.get())); - } catch (IOException e) { - e.printStackTrace(); - } - return schemaFromSchemaFileName; - }) - .orElse(null)); + .map(Quickstart::getSchemaFilename) + .map(Config::getSchemaFromSchemaFileName) + .orElse( + schemaPath + .map(s -> { + Schema schemaFromSchemaFileName = null; + try { + schemaFromSchemaFileName = + getSchemaFromSchemaFileName(Files.newInputStream(schemaPath.get())); + } catch (IOException e) { + e.printStackTrace(); + } + return schemaFromSchemaFileName; + }) + .orElse(null) + ); } public static Schema getSchemaFromSchemaFileName(InputStream stream) { @@ -174,14 +177,20 @@ public enum Format { AVRO, } - public static Serializer valueSerializer(Format format, Properties producerConfig) { + public static Serializer valueSerializer( + Format format, + Properties producerConfig + ) { Serializer valueSerializer; if (format.equals(Format.AVRO)) { valueSerializer = new KafkaAvroSerializer(); valueSerializer.configure( - producerConfig.keySet().stream() - .collect(Collectors.toMap(String::valueOf, producerConfig::get)), - false); + producerConfig + .keySet() + .stream() + .collect(Collectors.toMap(String::valueOf, producerConfig::get)), + false + ); } else { valueSerializer = (Serializer) new StringSerializer(); } diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java index 4cc9b24..b8752c5 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java @@ -20,11 +20,12 @@ public class PerformanceRunner { final Stats stats; public PerformanceRunner( - final Config config, - final KafkaProducer producer, - final PayloadGenerator payloadGenerator, - final ThroughputThrottler throughputThrottler, - final Stats stats) { + final Config config, + final KafkaProducer producer, + final PayloadGenerator payloadGenerator, + final ThroughputThrottler throughputThrottler, + final Stats stats + ) { this.config = config; this.producer = producer; this.payloadGenerator = payloadGenerator; @@ -65,8 +66,10 @@ record = new ProducerRecord<>(config.topicName(), key, value); producer.send(record, cb); currentTransactionSize++; - if (config.transactionsEnabled() - && config.transactionDurationMs() <= (sendStartMs - transactionStartTime)) { + if ( + config.transactionsEnabled() && + config.transactionDurationMs() <= (sendStartMs - transactionStartTime) + ) { producer.commitTransaction(); currentTransactionSize = 0; } @@ -76,10 +79,11 @@ record = new ProducerRecord<>(config.topicName(), key, value); } } - if (config.transactionsEnabled() && currentTransactionSize != 0) producer.commitTransaction(); + if ( + config.transactionsEnabled() && currentTransactionSize != 0 + ) producer.commitTransaction(); if (!config.shouldPrintMetrics()) { - /* print final results */ stats.printTotal(); } else { @@ -114,23 +118,28 @@ public static void printMetrics(Map metrics) { } String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f"; String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s"; - System.out.printf("\n%-" + maxLengthOfDisplayName + "s %s%n", "Metric Name", "Value"); + System.out.printf( + "\n%-" + maxLengthOfDisplayName + "s %s%n", + "Metric Name", + "Value" + ); for (Map.Entry entry : sortedMetrics.entrySet()) { String outputFormat; - if (entry.getValue() instanceof Double) outputFormat = doubleOutputFormat; - else outputFormat = defaultOutputFormat; + if (entry.getValue() instanceof Double) outputFormat = + doubleOutputFormat; else outputFormat = defaultOutputFormat; System.out.printf((outputFormat) + "%n", entry.getKey(), entry.getValue()); } } } public record Config( - long records, - String topicName, - boolean transactionsEnabled, - long transactionDurationMs, - boolean shouldPrintMetrics) { + long records, + String topicName, + boolean transactionsEnabled, + long transactionDurationMs, + boolean shouldPrintMetrics + ) { static Config create(long records, String topicName) { return new Config(records, topicName, false, -1L, false); } @@ -140,8 +149,18 @@ static Config create(long records, String topicName, boolean shouldPrintMetrics) } static Config create( - long records, String topicName, long transactionDuration, boolean shouldPrintMetrics) { - return new Config(records, topicName, true, transactionDuration, shouldPrintMetrics); + long records, + String topicName, + long transactionDuration, + boolean shouldPrintMetrics + ) { + return new Config( + records, + topicName, + true, + transactionDuration, + shouldPrintMetrics + ); } static Config create(long records, String topicName, long transactionDuration) { diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/Stats.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/Stats.java index 0e2ef83..e93f094 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/Stats.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/Stats.java @@ -72,12 +72,13 @@ public void printWindow() { double recsPerSec = 1000.0 * windowCount / (double) elapsed; double mbPerSec = 1000.0 * this.windowBytes / (double) elapsed / (1024.0 * 1024.0); System.out.printf( - "%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n", - windowCount, - recsPerSec, - mbPerSec, - windowTotalLatency / (double) windowCount, - (double) windowMaxLatency); + "%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n", + windowCount, + recsPerSec, + mbPerSec, + windowTotalLatency / (double) windowCount, + (double) windowMaxLatency + ); } public void newWindow() { @@ -94,16 +95,17 @@ public void printTotal() { double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf( - "%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", - count, - recsPerSec, - mbPerSec, - totalLatency / (double) count, - (double) maxLatency, - percs[0], - percs[1], - percs[2], - percs[3]); + "%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + count, + recsPerSec, + mbPerSec, + totalLatency / (double) count, + (double) maxLatency, + percs[0], + percs[1], + percs[2], + percs[3] + ); } private static int[] percentiles(int[] latencies, int count, double... percentiles) { diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/ThroughputThrottler.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/ThroughputThrottler.java index bb30460..25372bf 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/ThroughputThrottler.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/ThroughputThrottler.java @@ -14,7 +14,8 @@ public class ThroughputThrottler { public ThroughputThrottler(long startMs, long targetThroughput) { this.startMs = startMs; this.targetThroughput = targetThroughput; - this.sleepTimeNs = targetThroughput > 0 ? NS_PER_SEC / targetThroughput : Long.MAX_VALUE; + this.sleepTimeNs = + targetThroughput > 0 ? NS_PER_SEC / targetThroughput : Long.MAX_VALUE; } public boolean shouldThrottle(long amountSoFar, long sendStartMs) { diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/IntervalCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/IntervalCommand.java index b332de9..9ff0e37 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/IntervalCommand.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/IntervalCommand.java @@ -19,38 +19,43 @@ public class IntervalCommand implements Callable { @CommandLine.Option( - names = {"-t", "--topic"}, - description = "target Kafka topic name", - required = true) + names = { "-t", "--topic" }, + description = "target Kafka topic name", + required = true + ) String topicName; @CommandLine.Option( - names = {"-n", "--num-records"}, - description = "Number of records to produce", - required = true) + names = { "-n", "--num-records" }, + description = "Number of records to produce", + required = true + ) long numRecords; @CommandLine.Option( - names = {"-i", "--interval"}, - description = "Maximum interval between producer send", - defaultValue = "5000") + names = { "-i", "--interval" }, + description = "Maximum interval between producer send", + defaultValue = "5000" + ) long intervalMs; @CommandLine.ArgGroup(multiplicity = "1") Cli.PropertiesOption propertiesOption; @CommandLine.Option( - names = {"-f", "--format"}, - description = "Record value format", - defaultValue = "JSON") + names = { "-f", "--format" }, + description = "Record value format", + defaultValue = "JSON" + ) PayloadGenerator.Format format; @CommandLine.ArgGroup(multiplicity = "1") Cli.SchemaSourceOption schemaSource; @CommandLine.Option( - names = {"-p", "--prop"}, - description = "Additional client properties") + names = { "-p", "--prop" }, + description = "Additional client properties" + ) Map additionalProperties = new HashMap<>(); int reportingIntervalMs = 5_000; @@ -62,17 +67,23 @@ public Integer call() throws Exception { producerConfig.putAll(additionalProperties); var keySerializer = new StringSerializer(); - Serializer valueSerializer = PayloadGenerator.valueSerializer(format, producerConfig); - - try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) { - final var payloadGenerator = - new PayloadGenerator( - new PayloadGenerator.Config( - Optional.empty(), - schemaSource.quickstart, - schemaSource.schemaPath, - numRecords, - format)); + Serializer valueSerializer = PayloadGenerator.valueSerializer( + format, + producerConfig + ); + + try ( + var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer) + ) { + final var payloadGenerator = new PayloadGenerator( + new PayloadGenerator.Config( + Optional.empty(), + schemaSource.quickstart, + schemaSource.schemaPath, + numRecords, + format + ) + ); final var stats = new Stats(numRecords, reportingIntervalMs); final var config = new IntervalRunner.Config(topicName, numRecords, intervalMs); diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ListTopicsCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ListTopicsCommand.java index cc3aced..a6b6837 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ListTopicsCommand.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ListTopicsCommand.java @@ -21,27 +21,28 @@ import picocli.CommandLine; @CommandLine.Command( - name = "topics", - description = "List topics and subjectSchemas available in a cluster") + name = "topics", + description = "List topics and subjectSchemas available in a cluster" +) public class ListTopicsCommand implements Callable { @CommandLine.ArgGroup(multiplicity = "1") Cli.PropertiesOption propertiesOption; @CommandLine.Option( - names = {"--pretty"}, - defaultValue = "false", - description = "Print pretty/formatted JSON") + names = { "--pretty" }, + defaultValue = "false", + description = "Print pretty/formatted JSON" + ) boolean pretty; @CommandLine.Option( - names = {"-p", "--prop"}, - description = "Additional client properties") + names = { "-p", "--prop" }, + description = "Additional client properties" + ) Map additionalProperties = new HashMap<>(); - @CommandLine.Option( - names = {"--prefix"}, - description = "Topic name prefix") + @CommandLine.Option(names = { "--prefix" }, description = "Topic name prefix") Optional prefix; final ObjectMapper json = new ObjectMapper(); @@ -52,54 +53,61 @@ public Integer call() throws Exception { if (props == null) return 1; props.putAll(additionalProperties); final var kafkaAdminClient = AdminClient.create(props); - final var topics = - kafkaAdminClient.listTopics().names().get().stream() - .filter(t -> prefix.map(t::startsWith).orElse(true)) - .toList(); + final var topics = kafkaAdminClient + .listTopics() + .names() + .get() + .stream() + .filter(t -> prefix.map(t::startsWith).orElse(true)) + .toList(); final var schemaRegistryUrl = props.getProperty("schema.registry.url"); final Optional schemaRegistryClient; if (schemaRegistryUrl != null && !schemaRegistryUrl.isBlank()) { schemaRegistryClient = - Optional.of( - new CachedSchemaRegistryClient( - schemaRegistryUrl, - 10, - props.keySet().stream() - .collect( - Collectors.toMap( - Object::toString, k -> props.getProperty(k.toString()))))); + Optional.of( + new CachedSchemaRegistryClient( + schemaRegistryUrl, + 10, + props + .keySet() + .stream() + .collect( + Collectors.toMap(Object::toString, k -> props.getProperty(k.toString())) + ) + ) + ); } else { schemaRegistryClient = Optional.empty(); } final var result = new ArrayList(topics.size()); for (final var topic : topics) { - var subject = - schemaRegistryClient - .map( - c -> { - try { - final var allSubjectsByPrefix = c.getAllSubjectsByPrefix(topic); - final var subjects = new HashMap>(); - for (final var s : allSubjectsByPrefix) { - try { - final var schemas = c.getSchemas(s, false, true); - subjects.put(s, schemas); - } catch (IOException | RestClientException e) { - throw new RuntimeException(e); - } - } - return subjects; - } catch (IOException | RestClientException e) { - throw new RuntimeException(e); - } - }) - .map( - parsedSchemas -> - parsedSchemas.entrySet().stream() - .map(TopicAndSchema.SubjectSchemas::from) - .toList()) - .orElse(List.of()); + var subject = schemaRegistryClient + .map(c -> { + try { + final var allSubjectsByPrefix = c.getAllSubjectsByPrefix(topic); + final var subjects = new HashMap>(); + for (final var s : allSubjectsByPrefix) { + try { + final var schemas = c.getSchemas(s, false, true); + subjects.put(s, schemas); + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + } + return subjects; + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + }) + .map(parsedSchemas -> + parsedSchemas + .entrySet() + .stream() + .map(TopicAndSchema.SubjectSchemas::from) + .toList() + ) + .orElse(List.of()); result.add(new TopicAndSchema(topic, subject)); } final var array = json.createArrayNode(); @@ -126,7 +134,9 @@ JsonNode toJson() { record SubjectSchemas(String name, List schemas) { static SubjectSchemas from(Map.Entry> entry) { return new SubjectSchemas( - entry.getKey(), entry.getValue().stream().map(TopicAndSchema.Schema::from).toList()); + entry.getKey(), + entry.getValue().stream().map(TopicAndSchema.Schema::from).toList() + ); } JsonNode toJson() { diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/PerfCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/PerfCommand.java index b14a2fa..8d69033 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/PerfCommand.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/PerfCommand.java @@ -19,21 +19,24 @@ public class PerfCommand implements Callable { @CommandLine.Option( - names = {"-t", "--topic"}, - description = "target Kafka topic name", - required = true) + names = { "-t", "--topic" }, + description = "target Kafka topic name", + required = true + ) String topicName; @CommandLine.Option( - names = {"-n", "--num-records"}, - description = "Number of records to produce", - required = true) + names = { "-n", "--num-records" }, + description = "Number of records to produce", + required = true + ) long numRecords; @CommandLine.Option( - names = {"-k", "--throughput"}, - description = "Number of target records per second to produce", - defaultValue = "-1") + names = { "-k", "--throughput" }, + description = "Number of target records per second to produce", + defaultValue = "-1" + ) long throughput = -1L; @CommandLine.ArgGroup(multiplicity = "1") @@ -43,14 +46,16 @@ public class PerfCommand implements Callable { Cli.SchemaSourceOption schemaSource; @CommandLine.Option( - names = {"-f", "--format"}, - description = "Record value format", - defaultValue = "JSON") + names = { "-f", "--format" }, + description = "Record value format", + defaultValue = "JSON" + ) PayloadGenerator.Format format; @CommandLine.Option( - names = {"-p", "--prop"}, - description = "Additional client properties") + names = { "-p", "--prop" }, + description = "Additional client properties" + ) Map additionalProperties = new HashMap<>(); int reportingIntervalMs = 5_000; @@ -68,27 +73,41 @@ public Integer call() { var keySerializer = new StringSerializer(); var valueSerializer = PayloadGenerator.valueSerializer(format, producerConfig); - try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) { - final var config = - new PerformanceRunner.Config( - numRecords, topicName, transactionEnabled, transactionDurationMs, shouldPrintMetrics); - final var payloadGenerator = - new PayloadGenerator( - new PayloadGenerator.Config( - Optional.empty(), - schemaSource.quickstart, - schemaSource.schemaPath, - numRecords, - format)); - final var throughputThrottler = - new ThroughputThrottler(System.currentTimeMillis(), throughput); + try ( + var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer) + ) { + final var config = new PerformanceRunner.Config( + numRecords, + topicName, + transactionEnabled, + transactionDurationMs, + shouldPrintMetrics + ); + final var payloadGenerator = new PayloadGenerator( + new PayloadGenerator.Config( + Optional.empty(), + schemaSource.quickstart, + schemaSource.schemaPath, + numRecords, + format + ) + ); + final var throughputThrottler = new ThroughputThrottler( + System.currentTimeMillis(), + throughput + ); final var stats = new Stats(numRecords, reportingIntervalMs); out.println("Avro Schema used to generate records:"); out.println(payloadGenerator.schema()); - var pp = - new PerformanceRunner(config, producer, payloadGenerator, throughputThrottler, stats); + var pp = new PerformanceRunner( + config, + producer, + payloadGenerator, + throughputThrottler, + stats + ); pp.start(); } return 0; diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceOnceCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceOnceCommand.java index 5f9ad82..f991f93 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceOnceCommand.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceOnceCommand.java @@ -17,9 +17,10 @@ public class ProduceOnceCommand implements Callable { @CommandLine.Option( - names = {"-t", "--topic"}, - description = "target Kafka topic name", - required = true) + names = { "-t", "--topic" }, + description = "target Kafka topic name", + required = true + ) String topicName; @CommandLine.ArgGroup(multiplicity = "1") @@ -29,14 +30,16 @@ public class ProduceOnceCommand implements Callable { Cli.SchemaSourceOption schemaSource; @CommandLine.Option( - names = {"-f", "--format"}, - description = "Record value format", - defaultValue = "JSON") + names = { "-f", "--format" }, + description = "Record value format", + defaultValue = "JSON" + ) PayloadGenerator.Format format; @CommandLine.Option( - names = {"-p", "--prop"}, - description = "Additional client properties") + names = { "-p", "--prop" }, + description = "Additional client properties" + ) Map additionalProperties = new HashMap<>(); @Override @@ -45,13 +48,23 @@ public Integer call() throws Exception { if (producerConfig == null) return 1; producerConfig.putAll(additionalProperties); var keySerializer = new StringSerializer(); - Serializer valueSerializer = PayloadGenerator.valueSerializer(format, producerConfig); + Serializer valueSerializer = PayloadGenerator.valueSerializer( + format, + producerConfig + ); - try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) { - var pg = - new PayloadGenerator( - new PayloadGenerator.Config( - Optional.empty(), schemaSource.quickstart, schemaSource.schemaPath, 10, format)); + try ( + var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer) + ) { + var pg = new PayloadGenerator( + new PayloadGenerator.Config( + Optional.empty(), + schemaSource.quickstart, + schemaSource.schemaPath, + 10, + format + ) + ); out.println("Avro Schema used to generate records:"); out.println(pg.schema()); diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/SampleCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/SampleCommand.java index 9957a21..fac08e8 100644 --- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/SampleCommand.java +++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/SampleCommand.java @@ -16,29 +16,32 @@ public class SampleCommand implements Callable { Cli.SchemaSourceOption schemaSource; @CommandLine.Option( - names = {"--pretty"}, - defaultValue = "false", - description = "Print pretty/formatted JSON") + names = { "--pretty" }, + defaultValue = "false", + description = "Print pretty/formatted JSON" + ) boolean pretty; @CommandLine.Option( - names = {"--print-schema"}, - defaultValue = "false", - description = "Print Avro Schema JSON") + names = { "--print-schema" }, + defaultValue = "false", + description = "Print Avro Schema JSON" + ) boolean schema; final ObjectMapper json = new ObjectMapper(); @Override public Integer call() throws Exception { - final var payloadGenerator = - new PayloadGenerator( - new PayloadGenerator.Config( - Optional.empty(), - schemaSource.quickstart, - schemaSource.schemaPath, - 1, - PayloadGenerator.Format.JSON)); + final var payloadGenerator = new PayloadGenerator( + new PayloadGenerator.Config( + Optional.empty(), + schemaSource.quickstart, + schemaSource.schemaPath, + 1, + PayloadGenerator.Format.JSON + ) + ); if (schema) { final var schema = json.readTree(payloadGenerator.schema()); if (pretty) { diff --git a/quotas/src/main/java/kafka/cli/quotas/Cli.java b/quotas/src/main/java/kafka/cli/quotas/Cli.java index a34f451..0440ae8 100644 --- a/quotas/src/main/java/kafka/cli/quotas/Cli.java +++ b/quotas/src/main/java/kafka/cli/quotas/Cli.java @@ -32,11 +32,12 @@ import picocli.CommandLine.Option; @Command( - name = "kfk-quotas", - versionProvider = Cli.VersionProviderWithConfigProvider.class, - mixinStandardHelpOptions = true, - description = "CLI to manage Manage Kafka Quotas", - subcommands = {QueryCommand.class, CreateCommand.class, DeleteCommand.class}) + name = "kfk-quotas", + versionProvider = Cli.VersionProviderWithConfigProvider.class, + mixinStandardHelpOptions = true, + description = "CLI to manage Manage Kafka Quotas", + subcommands = { QueryCommand.class, CreateCommand.class, DeleteCommand.class } +) public class Cli implements Callable { public static void main(String[] args) { @@ -51,60 +52,57 @@ public Integer call() { } @Command( - name = "query", - description = - """ + name = "query", + description = """ Search for existing quotas or quotas applying to a certain client application - """) + """ + ) static class QueryCommand implements Callable { @ArgGroup(multiplicity = "1") PropertiesOption propertiesOption; - @Option( - names = {"--all-users"}, - description = "Get all quotas related to users") + @Option(names = { "--all-users" }, description = "Get all quotas related to users") boolean allUsers; @Option( - names = {"--all-clients"}, - description = "Get all quotas related to clients") + names = { "--all-clients" }, + description = "Get all quotas related to clients" + ) boolean allClients; - @Option( - names = {"--all-ips"}, - description = "Get all quotas related to IPs") + @Option(names = { "--all-ips" }, description = "Get all quotas related to IPs") boolean allIps; - @Option(names = {"--user-clients"}) + @Option(names = { "--user-clients" }) Map userClients = new HashMap<>(); - @Option(names = {"--user"}) + @Option(names = { "--user" }) List users = new ArrayList<>(); - @Option(names = {"--user-default"}) + @Option(names = { "--user-default" }) boolean userDefault; - @Option(names = {"--client"}) + @Option(names = { "--client" }) List clientIds = new ArrayList<>(); - @Option(names = {"--client-default"}) + @Option(names = { "--client-default" }) boolean clientIdDefault; - @Option(names = {"--ip"}) + @Option(names = { "--ip" }) List ips = new ArrayList<>(); - @Option(names = {"--ip-default"}) + @Option(names = { "--ip-default" }) boolean ipDefault; @Option( - names = {"--only", "-o"}, - description = - """ + names = { "--only", "-o" }, + description = """ Look only for quotas matching User, Client IDs, or IPs. If set to false (default), returns quotas even if not explicitly matching filters, e.g. defaults. If set to true, will return only the quotas matching the filters. - """) + """ + ) boolean onlyMatch; @Override @@ -122,18 +120,28 @@ public Integer call() throws Exception { } else if (allIps) { final var quotas = quotaManager.allByIps(); System.out.println(quotas.toJson()); - } else // start querying by params // end of all by * - if (!userClients.isEmpty()) { - final var userClientMap = - userClients.keySet().stream() - .collect(Collectors.toMap(k -> k, k -> List.of(userClients.get(k).split(",")))); - final var quotas = quotaManager.byUsers(userClientMap, clientIdDefault, onlyMatch); + } else if (!userClients.isEmpty()) { // start querying by params // end of all by * + final var userClientMap = userClients + .keySet() + .stream() + .collect( + Collectors.toMap(k -> k, k -> List.of(userClients.get(k).split(","))) + ); + final var quotas = quotaManager.byUsers( + userClientMap, + clientIdDefault, + onlyMatch + ); System.out.println(quotas.toJson()); } else if (userDefault || !users.isEmpty()) { final var quotas = quotaManager.byUsers(users, userDefault, onlyMatch); System.out.println(quotas.toJson()); } else if (clientIdDefault || !clientIds.isEmpty()) { - final var quotas = quotaManager.byClients(clientIds, clientIdDefault, onlyMatch); + final var quotas = quotaManager.byClients( + clientIds, + clientIdDefault, + onlyMatch + ); System.out.println(quotas.toJson()); } else if (ipDefault || !ips.isEmpty()) { final var quotas = quotaManager.byIps(ips, ipDefault, onlyMatch); @@ -153,54 +161,34 @@ static class CreateCommand implements Callable { @ArgGroup(multiplicity = "1") PropertiesOption propertiesOption; - @Option( - names = {"--user-default"}, - description = "Default to all users") + @Option(names = { "--user-default" }, description = "Default to all users") boolean userDefault; - @Option( - names = {"--user"}, - description = "Application's User Principal") + @Option(names = { "--user" }, description = "Application's User Principal") Optional user; - @Option( - names = {"--client-default"}, - description = "Default to all client IDs") + @Option(names = { "--client-default" }, description = "Default to all client IDs") boolean clientIdDefault; - @Option( - names = {"--client"}, - description = "Application's Client ID") + @Option(names = { "--client" }, description = "Application's Client ID") Optional clientId; - @Option( - names = {"--ip-default"}, - description = "Default to all IPs") + @Option(names = { "--ip-default" }, description = "Default to all IPs") boolean ipDefault; - @Option( - names = {"--ip"}, - description = "Application's IP") + @Option(names = { "--ip" }, description = "Application's IP") Optional ip; - @Option( - names = {"--produce-rate"}, - description = "Write bandwidth") + @Option(names = { "--produce-rate" }, description = "Write bandwidth") Optional writeBandwidth; - @Option( - names = {"--fetch-rate"}, - description = "Read bandwidth") + @Option(names = { "--fetch-rate" }, description = "Read bandwidth") Optional readBandwidth; - @Option( - names = {"--request-rate"}, - description = "Request rate") + @Option(names = { "--request-rate" }, description = "Request rate") Optional requestRate; - @Option( - names = {"--connection-rate"}, - description = "Connection creation rate") + @Option(names = { "--connection-rate" }, description = "Connection creation rate") Optional connectionRate; @Override @@ -208,17 +196,19 @@ public Integer call() throws Exception { final var props = propertiesOption.load(); try (final var kafkaAdmin = AdminClient.create(props)) { final var quotaManager = new QuotaManager(kafkaAdmin); - final var quota = - new Quota( - new ClientEntity( - new KafkaClientEntity(userDefault, user), - new KafkaClientEntity(clientIdDefault, clientId), - new KafkaClientEntity(ipDefault, ip)), - new Constraint( - writeBandwidth.map(NetworkBandwidth::new), - readBandwidth.map(NetworkBandwidth::new), - requestRate.map(RequestRate::new), - connectionRate.map(ConnectionCreationRate::new))); + final var quota = new Quota( + new ClientEntity( + new KafkaClientEntity(userDefault, user), + new KafkaClientEntity(clientIdDefault, clientId), + new KafkaClientEntity(ipDefault, ip) + ), + new Constraint( + writeBandwidth.map(NetworkBandwidth::new), + readBandwidth.map(NetworkBandwidth::new), + requestRate.map(RequestRate::new), + connectionRate.map(ConnectionCreationRate::new) + ) + ); quotaManager.create(quota); return 0; } @@ -231,59 +221,40 @@ static class DeleteCommand implements Callable { @ArgGroup(multiplicity = "1") PropertiesOption propertiesOption; - @Option( - names = {"--user-default"}, - description = "Default to all users") + @Option(names = { "--user-default" }, description = "Default to all users") boolean userDefault; - @Option( - names = {"--user"}, - description = "Application's User Principal") + @Option(names = { "--user" }, description = "Application's User Principal") Optional user; - @Option( - names = {"--client-default"}, - description = "Default to all client IDs") + @Option(names = { "--client-default" }, description = "Default to all client IDs") boolean clientIdDefault; - @Option( - names = {"--client"}, - description = "Application's Client ID") + @Option(names = { "--client" }, description = "Application's Client ID") Optional clientId; - @Option( - names = {"--ip-default"}, - description = "Default to all IPs") + @Option(names = { "--ip-default" }, description = "Default to all IPs") boolean ipDefault; - @Option( - names = {"--ip"}, - description = "Application's IP") + @Option(names = { "--ip" }, description = "Application's IP") Optional ip; @Option( - names = {"--all"}, - description = "Use to remove all existing quotas for an application") + names = { "--all" }, + description = "Use to remove all existing quotas for an application" + ) boolean all; - @Option( - names = {"--produce-rate"}, - description = "Write bandwidth") + @Option(names = { "--produce-rate" }, description = "Write bandwidth") boolean writeBandwidth; - @Option( - names = {"--fetch-rate"}, - description = "Read bandwidth") + @Option(names = { "--fetch-rate" }, description = "Read bandwidth") boolean readBandwidth; - @Option( - names = {"--request-rate"}, - description = "Request rate") + @Option(names = { "--request-rate" }, description = "Request rate") boolean requestRate; - @Option( - names = {"--connection-rate"}, - description = "Connection creation rate") + @Option(names = { "--connection-rate" }, description = "Connection creation rate") boolean connectionRate; @Override @@ -293,36 +264,46 @@ public Integer call() throws Exception { final var quotaManager = new QuotaManager(kafkaAdmin); if (all) { if (userDefault || user.isPresent()) { - final var quotas = - quotaManager.byUsers(user.map(List::of).orElse(List.of()), userDefault, true); + final var quotas = quotaManager.byUsers( + user.map(List::of).orElse(List.of()), + userDefault, + true + ); System.out.println(quotas.toJson()); quotaManager.delete(quotas); } else if (clientIdDefault || clientId.isPresent()) { - final var quotas = - quotaManager.byClients( - clientId.map(List::of).orElse(List.of()), clientIdDefault, true); + final var quotas = quotaManager.byClients( + clientId.map(List::of).orElse(List.of()), + clientIdDefault, + true + ); System.out.println(quotas.toJson()); quotaManager.delete(quotas); } else if (ipDefault || ip.isPresent()) { - final var quotas = - quotaManager.byIps(ip.map(List::of).orElse(List.of()), ipDefault, true); + final var quotas = quotaManager.byIps( + ip.map(List::of).orElse(List.of()), + ipDefault, + true + ); System.out.println(quotas.toJson()); quotaManager.delete(quotas); } } else { - final var quota = - new Quota( - new ClientEntity( - new KafkaClientEntity(userDefault, user), - new KafkaClientEntity(clientIdDefault, clientId), - new KafkaClientEntity(ipDefault, ip)), - new Constraint( - writeBandwidth ? Optional.of(NetworkBandwidth.empty()) : Optional.empty(), - readBandwidth ? Optional.of(NetworkBandwidth.empty()) : Optional.empty(), - requestRate ? Optional.of(RequestRate.empty()) : Optional.empty(), - connectionRate - ? Optional.of(ConnectionCreationRate.empty()) - : Optional.empty())); + final var quota = new Quota( + new ClientEntity( + new KafkaClientEntity(userDefault, user), + new KafkaClientEntity(clientIdDefault, clientId), + new KafkaClientEntity(ipDefault, ip) + ), + new Constraint( + writeBandwidth ? Optional.of(NetworkBandwidth.empty()) : Optional.empty(), + readBandwidth ? Optional.of(NetworkBandwidth.empty()) : Optional.empty(), + requestRate ? Optional.of(RequestRate.empty()) : Optional.empty(), + connectionRate + ? Optional.of(ConnectionCreationRate.empty()) + : Optional.empty() + ) + ); quotaManager.delete(quota); } return 0; @@ -333,10 +314,10 @@ public Integer call() throws Exception { static class PropertiesOption { @CommandLine.Option( - names = {"-c", "--config"}, - description = - "Client configuration properties file." - + "Must include connection to Kafka and Schema Registry") + names = { "-c", "--config" }, + description = "Client configuration properties file." + + "Must include connection to Kafka and Schema Registry" + ) Optional configPath; @ArgGroup(exclusive = false) @@ -344,25 +325,24 @@ static class PropertiesOption { public Properties load() { return configPath - .map( - path -> { - try { - final var p = new Properties(); - p.load(Files.newInputStream(path)); - return p; - } catch (Exception e) { - throw new IllegalArgumentException( - "ERROR: properties file at %s is failing to load".formatted(path)); - } - }) - .orElseGet( - () -> { - try { - return contextOption.load(); - } catch (IOException e) { - throw new IllegalArgumentException("ERROR: loading contexts"); - } - }); + .map(path -> { + try { + final var p = new Properties(); + p.load(Files.newInputStream(path)); + return p; + } catch (Exception e) { + throw new IllegalArgumentException( + "ERROR: properties file at %s is failing to load".formatted(path) + ); + } + }) + .orElseGet(() -> { + try { + return contextOption.load(); + } catch (IOException e) { + throw new IllegalArgumentException("ERROR: loading contexts"); + } + }); } } @@ -382,8 +362,9 @@ public Properties load() throws IOException { return props; } else { err.printf( - "ERROR: Kafka context `%s` not found. Check that context already exist.%n", - kafkaContextName); + "ERROR: Kafka context `%s` not found. Check that context already exist.%n", + kafkaContextName + ); return null; } } @@ -394,16 +375,18 @@ static class VersionProviderWithConfigProvider implements CommandLine.IVersionPr @Override public String[] getVersion() throws IOException { final var url = - VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties"); + VersionProviderWithConfigProvider.class.getClassLoader() + .getResource("cli.properties"); if (url == null) { - return new String[] { - "No cli.properties file found in the classpath.", - }; + return new String[] { "No cli.properties file found in the classpath." }; } final var properties = new Properties(); properties.load(url.openStream()); return new String[] { - properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "", + properties.getProperty("appName") + + " version " + + properties.getProperty("appVersion") + + "", "Built: " + properties.getProperty("appBuildTime"), }; } diff --git a/quotas/src/main/java/kafka/cli/quotas/QuotaManager.java b/quotas/src/main/java/kafka/cli/quotas/QuotaManager.java index 5898f4a..778741b 100644 --- a/quotas/src/main/java/kafka/cli/quotas/QuotaManager.java +++ b/quotas/src/main/java/kafka/cli/quotas/QuotaManager.java @@ -40,26 +40,34 @@ private Quotas query(ClientQuotaFilter filter) { } public Quotas allByUsers() { - final var conditions = List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)); + final var conditions = List.of( + ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER) + ); final var filter = ClientQuotaFilter.contains(conditions); return query(filter); } public Quotas allByClients() { - final var conditions = - List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)); + final var conditions = List.of( + ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID) + ); final var filter = ClientQuotaFilter.contains(conditions); return query(filter); } public Quotas allByIps() { - final var conditions = List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)); + final var conditions = List.of( + ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP) + ); final var filter = ClientQuotaFilter.contains(conditions); return query(filter); } public Quotas byUsers( - Map> users, boolean includeDefault, boolean onlyMatch) { + Map> users, + boolean includeDefault, + boolean onlyMatch + ) { final var perEntity = Quotas.empty(); final var defaults = Quotas.empty(); for (final var user : users.keySet()) { @@ -85,7 +93,11 @@ public Quotas byUsers(List users, boolean userDefault, boolean onlyMatch return by(ClientQuotaEntity.USER, users, userDefault, onlyMatch); } - public Quotas byClients(List clientIds, boolean clientIdDefault, boolean onlyMatch) { + public Quotas byClients( + List clientIds, + boolean clientIdDefault, + boolean onlyMatch + ) { return by(ClientQuotaEntity.CLIENT_ID, clientIds, clientIdDefault, onlyMatch); } @@ -94,9 +106,15 @@ public Quotas byIps(List ips, boolean ipDefault, boolean onlyMatch) { } public Quotas by( - String entityType, List entities, boolean includeDefault, boolean onlyMatch) { - final var perEntity = - entities.stream().map(e -> onlyBy(entityType, e)).reduce(Quotas.empty(), Quotas::append); + String entityType, + List entities, + boolean includeDefault, + boolean onlyMatch + ) { + final var perEntity = entities + .stream() + .map(e -> onlyBy(entityType, e)) + .reduce(Quotas.empty(), Quotas::append); if (onlyMatch) { if (includeDefault) { return perEntity.append(fromDefault(entityType)); @@ -115,7 +133,9 @@ Quotas fromDefault(String entityType) { Quotas fromUserClientDefault(String user) { final var byUser = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, user); - final var byClient = ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.CLIENT_ID); + final var byClient = ClientQuotaFilterComponent.ofDefaultEntity( + ClientQuotaEntity.CLIENT_ID + ); final var filter = ClientQuotaFilter.containsOnly(List.of(byClient)); return query(filter); } @@ -128,7 +148,10 @@ Quotas onlyBy(String entityType, String entity) { Quotas onlyByUserClient(String user, String client) { final var byUser = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, user); - final var byClient = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, client); + final var byClient = ClientQuotaFilterComponent.ofEntity( + ClientQuotaEntity.CLIENT_ID, + client + ); final var filter = ClientQuotaFilter.containsOnly(List.of(byUser, byClient)); return query(filter); } diff --git a/quotas/src/main/java/kafka/cli/quotas/Quotas.java b/quotas/src/main/java/kafka/cli/quotas/Quotas.java index 08b2b5c..dc0b187 100644 --- a/quotas/src/main/java/kafka/cli/quotas/Quotas.java +++ b/quotas/src/main/java/kafka/cli/quotas/Quotas.java @@ -15,7 +15,8 @@ import org.apache.kafka.common.quota.ClientQuotaEntity; public record Quotas(List quotas) { - static final ObjectMapper jsonMapper = new ObjectMapper().registerModule(new Jdk8Module()); + static final ObjectMapper jsonMapper = new ObjectMapper() + .registerModule(new Jdk8Module()); public static Quotas empty() { return new Quotas(new ArrayList<>()); @@ -65,24 +66,28 @@ public JsonNode toJson() { record KafkaClientEntity(boolean isDefault, Optional id) {} - record ClientEntity(KafkaClientEntity user, KafkaClientEntity clientId, KafkaClientEntity ip) { + record ClientEntity( + KafkaClientEntity user, + KafkaClientEntity clientId, + KafkaClientEntity ip + ) { public static ClientEntity from(ClientQuotaEntity entity) { final var entries = entity.entries(); - final var userEntity = - new KafkaClientEntity( - entries.containsKey(ClientQuotaEntity.USER) - && entries.get(ClientQuotaEntity.USER) == null, - Optional.ofNullable(entries.get(ClientQuotaEntity.USER))); - final var clientEntity = - new KafkaClientEntity( - entries.containsKey(ClientQuotaEntity.CLIENT_ID) - && entries.get(ClientQuotaEntity.CLIENT_ID) == null, - Optional.ofNullable(entries.get(ClientQuotaEntity.CLIENT_ID))); - final var ipEntity = - new KafkaClientEntity( - entries.containsKey(ClientQuotaEntity.IP) - && entries.get(ClientQuotaEntity.IP) == null, - Optional.ofNullable(entries.get(ClientQuotaEntity.IP))); + final var userEntity = new KafkaClientEntity( + entries.containsKey(ClientQuotaEntity.USER) && + entries.get(ClientQuotaEntity.USER) == null, + Optional.ofNullable(entries.get(ClientQuotaEntity.USER)) + ); + final var clientEntity = new KafkaClientEntity( + entries.containsKey(ClientQuotaEntity.CLIENT_ID) && + entries.get(ClientQuotaEntity.CLIENT_ID) == null, + Optional.ofNullable(entries.get(ClientQuotaEntity.CLIENT_ID)) + ); + final var ipEntity = new KafkaClientEntity( + entries.containsKey(ClientQuotaEntity.IP) && + entries.get(ClientQuotaEntity.IP) == null, + Optional.ofNullable(entries.get(ClientQuotaEntity.IP)) + ); return new ClientEntity(userEntity, clientEntity, ipEntity); } @@ -122,57 +127,70 @@ public JsonNode toJson() { } record Constraint( - Optional produceRate, - Optional fetchRate, - Optional requestRate, - Optional connectionCreationRate) { + Optional produceRate, + Optional fetchRate, + Optional requestRate, + Optional connectionCreationRate + ) { static Constraint from(Map quotas) { final var produceRate = quotas.get(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG); final var fetchRate = quotas.get(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG); final var requestRate = quotas.get(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG); - final var connectionRate = quotas.get(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG); + final var connectionRate = quotas.get( + QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG + ); return new Constraint( - Optional.ofNullable(produceRate).map(NetworkBandwidth::new), - Optional.ofNullable(fetchRate).map(NetworkBandwidth::new), - Optional.ofNullable(requestRate).map(RequestRate::new), - Optional.ofNullable(connectionRate).map(ConnectionCreationRate::new)); + Optional.ofNullable(produceRate).map(NetworkBandwidth::new), + Optional.ofNullable(fetchRate).map(NetworkBandwidth::new), + Optional.ofNullable(requestRate).map(RequestRate::new), + Optional.ofNullable(connectionRate).map(ConnectionCreationRate::new) + ); } public List toEntries() { final var entries = new ArrayList(5); - produceRate.ifPresent( - r -> - entries.add( - new Op(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()))); - fetchRate.ifPresent( - r -> - entries.add( - new Op(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()))); - requestRate.ifPresent( - r -> entries.add(new Op(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, r.percent()))); - connectionCreationRate.ifPresent( - r -> entries.add(new Op(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, r.rate()))); + produceRate.ifPresent(r -> + entries.add( + new Op(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()) + ) + ); + fetchRate.ifPresent(r -> + entries.add( + new Op(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()) + ) + ); + requestRate.ifPresent(r -> + entries.add(new Op(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, r.percent())) + ); + connectionCreationRate.ifPresent(r -> + entries.add(new Op(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, r.rate())) + ); return entries; } public Constraint toDelete() { return new Constraint( - produceRate.map(n -> NetworkBandwidth.empty()), - fetchRate.map(n -> NetworkBandwidth.empty()), - requestRate.map(r -> RequestRate.empty()), - connectionCreationRate.map(r -> ConnectionCreationRate.empty())); + produceRate.map(n -> NetworkBandwidth.empty()), + fetchRate.map(n -> NetworkBandwidth.empty()), + requestRate.map(r -> RequestRate.empty()), + connectionCreationRate.map(r -> ConnectionCreationRate.empty()) + ); } public JsonNode toJson() { final var json = jsonMapper.createObjectNode(); - produceRate.ifPresent( - r -> json.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec())); - fetchRate.ifPresent( - r -> json.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec())); - requestRate.ifPresent( - r -> json.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, r.percent())); - connectionCreationRate.ifPresent( - r -> json.put(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, r.rate())); + produceRate.ifPresent(r -> + json.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()) + ); + fetchRate.ifPresent(r -> + json.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, r.bytesPerSec()) + ); + requestRate.ifPresent(r -> + json.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, r.percent()) + ); + connectionCreationRate.ifPresent(r -> + json.put(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, r.rate()) + ); return json; } }