Skip to content

Commit

Permalink
refactor: apply new formatter and new context
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 19, 2022
1 parent ae07fd9 commit 8915c6d
Show file tree
Hide file tree
Showing 32 changed files with 1,865 additions and 1,195 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
target/
*.db
1 change: 0 additions & 1 deletion cluster-state/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
<dependency>
<groupId>io.github.jeqo.kafka</groupId>
<artifactId>kafka-context</artifactId>
<version>0.2.0</version>
</dependency>

<dependency>
Expand Down
107 changes: 56 additions & 51 deletions cluster-state/src/main/java/kafka/cli/cluster/state/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.stream.Collectors;
import kafka.cli.cluster.state.Cli.VersionProviderWithConfigProvider;
import kafka.context.KafkaContexts;
import kafka.context.SchemaRegistryContexts;
import kafka.context.sr.SchemaRegistryContexts;
import org.apache.kafka.clients.admin.AdminClient;
import picocli.CommandLine;
import picocli.CommandLine.ArgGroup;
Expand All @@ -24,39 +24,36 @@
import picocli.CommandLine.Option;

@Command(
name = "kfk-cluster-state",
descriptionHeading = "Kafka CLI - Topic list",
description =
"""
name = "kfk-cluster-state",
descriptionHeading = "Kafka CLI - Topic list",
description = """
List Kafka topics with metadata, partitions, replica placement, configuration,
and offsets at once.
""",
versionProvider = VersionProviderWithConfigProvider.class,
mixinStandardHelpOptions = true)
versionProvider = VersionProviderWithConfigProvider.class,
mixinStandardHelpOptions = true
)
public class Cli implements Callable<Integer> {

public static void main(String[] args) {
int exitCode = new CommandLine(new Cli()).execute(args);
System.exit(exitCode);
}

@Option(
names = {"-t", "--topics"},
description = "list of topic names to include")
@Option(names = { "-t", "--topics" }, description = "list of topic names to include")
List<String> topics = new ArrayList<>();

@Option(
names = {"-p", "--prefix"},
description = "Topic name prefix")
@Option(names = { "-p", "--prefix" }, description = "Topic name prefix")
Optional<String> prefix = Optional.empty();

@ArgGroup(multiplicity = "1")
PropertiesOption propertiesOption;

@Option(
names = {"--pretty"},
defaultValue = "false",
description = "Print pretty/formatted JSON")
names = { "--pretty" },
defaultValue = "false",
description = "Print pretty/formatted JSON"
)
boolean pretty;

@Override
Expand All @@ -68,12 +65,14 @@ public Integer call() throws Exception {

try (var adminClient = AdminClient.create(clientConfig)) {
if (sr) {
var srClient =
new CachedSchemaRegistryClient(
clientConfig.getProperty("schema.registry.url"),
10_000,
clientConfig.keySet().stream()
.collect(Collectors.toMap(Object::toString, clientConfig::get)));
var srClient = new CachedSchemaRegistryClient(
clientConfig.getProperty("schema.registry.url"),
10_000,
clientConfig
.keySet()
.stream()
.collect(Collectors.toMap(Object::toString, clientConfig::get))
);
final var helper = new Helper(adminClient, srClient);
final var output = helper.run(opts);
out.println(output.toJson(pretty));
Expand All @@ -95,34 +94,35 @@ public boolean match(String name) {
static class PropertiesOption {

@CommandLine.Option(
names = {"-c", "--config"},
description = "Client configuration properties file." + "Must include connection to Kafka")
names = { "-c", "--config" },
description = "Client configuration properties file." +
"Must include connection to Kafka"
)
Optional<Path> configPath;

@ArgGroup(exclusive = false)
ContextOption contextOption;

public Properties load() {
return configPath
.map(
path -> {
try {
final var p = new Properties();
p.load(Files.newInputStream(path));
return p;
} catch (Exception e) {
throw new IllegalArgumentException(
"ERROR: properties file at %s is failing to load".formatted(path));
}
})
.orElseGet(
() -> {
try {
return contextOption.load();
} catch (IOException e) {
throw new IllegalArgumentException("ERROR: loading contexts");
}
});
.map(path -> {
try {
final var p = new Properties();
p.load(Files.newInputStream(path));
return p;
} catch (Exception e) {
throw new IllegalArgumentException(
"ERROR: properties file at %s is failing to load".formatted(path)
);
}
})
.orElseGet(() -> {
try {
return contextOption.load();
} catch (IOException e) {
throw new IllegalArgumentException("ERROR: loading contexts");
}
});
}
}

Expand Down Expand Up @@ -151,15 +151,18 @@ public Properties load() throws IOException {
props.putAll(srProps);
} else {
err.printf(
"WARN: Schema Registry context `%s` not found. Proceeding without it.%n", srName);
"WARN: Schema Registry context `%s` not found. Proceeding without it.%n",
srName
);
}
}

return props;
} else {
err.printf(
"ERROR: Kafka context `%s` not found. Check that context already exist.%n",
kafkaContextName);
"ERROR: Kafka context `%s` not found. Check that context already exist.%n",
kafkaContextName
);
return null;
}
}
Expand All @@ -170,16 +173,18 @@ static class VersionProviderWithConfigProvider implements IVersionProvider {
@Override
public String[] getVersion() throws IOException {
final var url =
VersionProviderWithConfigProvider.class.getClassLoader().getResource("cli.properties");
VersionProviderWithConfigProvider.class.getClassLoader()
.getResource("cli.properties");
if (url == null) {
return new String[] {
"No cli.properties file found in the classpath.",
};
return new String[] { "No cli.properties file found in the classpath." };
}
final var properties = new Properties();
properties.load(url.openStream());
return new String[] {
properties.getProperty("appName") + " version " + properties.getProperty("appVersion") + "",
properties.getProperty("appName") +
" version " +
properties.getProperty("appVersion") +
"",
"Built: " + properties.getProperty("appBuildTime"),
};
}
Expand Down
96 changes: 50 additions & 46 deletions cluster-state/src/main/java/kafka/cli/cluster/state/Helper.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,22 @@ Output run(Opts opts) throws ExecutionException, InterruptedException {
final var builder = Output.newBuilder(topics);
final var describeClusterResult = adminClient.describeCluster();

final var descriptions = adminClient.describeTopics(builder.names()).allTopicNames().get();
final var descriptions = adminClient
.describeTopics(builder.names())
.allTopicNames()
.get();

final var startOffsetRequest = new HashMap<TopicPartition, OffsetSpec>();
final var endOffsetRequest = new HashMap<TopicPartition, OffsetSpec>();

for (final var topic : builder.names) {
final var description = descriptions.get(topic);
final var tps =
description.partitions().stream()
.map(tpi -> new TopicPartition(topic, tpi.partition()))
.sorted(Comparator.comparingInt(TopicPartition::partition))
.toList();
final var tps = description
.partitions()
.stream()
.map(tpi -> new TopicPartition(topic, tpi.partition()))
.sorted(Comparator.comparingInt(TopicPartition::partition))
.toList();
for (final var tp : tps) {
startOffsetRequest.put(tp, OffsetSpec.earliest());
endOffsetRequest.put(tp, OffsetSpec.latest());
Expand All @@ -66,51 +70,51 @@ Output run(Opts opts) throws ExecutionException, InterruptedException {
final var startOffsets = adminClient.listOffsets(startOffsetRequest).all().get();
final var endOffsets = adminClient.listOffsets(endOffsetRequest).all().get();

final var configs = adminClient.describeConfigs(builder.configResources()).all().get();

final var srSubjects =
srClient.map(
sr -> {
final var configs = adminClient
.describeConfigs(builder.configResources())
.all()
.get();

final var srSubjects = srClient.map(sr -> {
try {
return opts
.prefix()
.map(p -> {
try {
return sr.getAllSubjectsByPrefix(p);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
})
.orElse(sr.getAllSubjects());
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
});

final var srSubjectsMetadata = srClient.map(sr ->
srSubjects
.map(subjects ->
subjects
.stream()
.map(s -> {
try {
return opts.prefix()
.map(
p -> {
try {
return sr.getAllSubjectsByPrefix(p);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
})
.orElse(sr.getAllSubjects());
return Map.entry(s, sr.getLatestSchemaMetadata(s));
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
});

final var srSubjectsMetadata =
srClient.map(
sr ->
srSubjects
.map(
subjects ->
subjects.stream()
.map(
s -> {
try {
return Map.entry(s, sr.getLatestSchemaMetadata(s));
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of()));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
)
.orElse(Map.of())
);
builder
.withClusterId(describeClusterResult.clusterId().get())
.withBrokers(describeClusterResult.nodes().get())
.withTopicDescriptions(descriptions)
.withStartOffsets(startOffsets)
.withEndOffsets(endOffsets)
.withConfigs(configs);
.withClusterId(describeClusterResult.clusterId().get())
.withBrokers(describeClusterResult.nodes().get())
.withTopicDescriptions(descriptions)
.withStartOffsets(startOffsets)
.withEndOffsets(endOffsets)
.withConfigs(configs);

srSubjectsMetadata.ifPresent(builder::withSchemaRegistrySubjects);

Expand Down
Loading

0 comments on commit 8915c6d

Please sign in to comment.