From b2c81f98ddff6b840a7af242e914b4ed7b296b41 Mon Sep 17 00:00:00 2001 From: Kevin Retzke Date: Fri, 24 Jul 2020 13:00:45 -0500 Subject: [PATCH 1/2] add option to get metadata when listing topics --- commands/list/topics.go | 6 +++++- kafka/manager.go | 14 +++++++++++--- kafka/topic.go | 2 ++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/commands/list/topics.go b/commands/list/topics.go index cc2fc4e..8d001d4 100644 --- a/commands/list/topics.go +++ b/commands/list/topics.go @@ -21,6 +21,7 @@ type topics struct { kafkaParams *commands.KafkaParameters globalParams *commands.GlobalParameters topicFilter *regexp.Regexp + loadConfigs bool format string style string } @@ -34,6 +35,9 @@ func addTopicsSubCommand(parent *kingpin.CmdClause, global *commands.GlobalParam c.Flag("topic-filter", "An optional regular expression to filter the topics by."). Short('t'). RegexpVar(&cmd.topicFilter) + c.Flag("load-config", "Loads the topic's configurations from the server."). + NoEnvar(). + Short('c').BoolVar(&cmd.loadConfigs) commands.AddFormatFlag(c, &cmd.format, &cmd.style) } @@ -49,7 +53,7 @@ func (c *topics) run(_ *kingpin.ParseContext) error { cancel() }() - topics, err := manager.GetTopics(ctx, c.topicFilter) + topics, err := manager.GetTopics(ctx, c.topicFilter, c.loadConfigs) if err != nil { return err } diff --git a/kafka/manager.go b/kafka/manager.go index 546a122..e9d98bf 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -191,7 +191,7 @@ func (m *Manager) DescribeCluster(ctx context.Context, includeConfig bool) (*Clu } // GetTopics returns a list of all the topics on the server. -func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp) ([]Topic, error) { +func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeConfig bool) ([]Topic, error) { m.Log(internal.Verbose, "Retrieving topic list from the server") topics, err := m.admin.ListTopics() if err != nil { @@ -209,11 +209,19 @@ func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp) ([]Topic m.Logf(internal.SuperVerbose, "Filtering out %s topic", topic) continue } - result = append(result, Topic{ + t := Topic{ Name: topic, NumberOfPartitions: details.NumPartitions, ReplicationFactor: details.ReplicationFactor, - }) + } + if includeConfig { + meta, err := m.DescribeTopic(ctx, topic, includeConfig, includeConfig) + if err == nil { + t.Metadata = meta + + } + } + result = append(result, t) } } return result, nil diff --git a/kafka/topic.go b/kafka/topic.go index 209049d..0f2068e 100644 --- a/kafka/topic.go +++ b/kafka/topic.go @@ -10,6 +10,8 @@ type Topic struct { NumberOfPartitions int32 `json:"number_of_partitions"` // ReplicationFactor replication factor. ReplicationFactor int16 `json:"replication_factor"` + // Topic metadata. + Metadata *TopicMetadata `json:"metadata"` } // TopicsByName sorts the topic list by name. From 46d5cc59e0c959bd12dacc66184aa2bd9b7ef007 Mon Sep 17 00:00:00 2001 From: Kevin Retzke Date: Fri, 24 Jul 2020 13:21:34 -0500 Subject: [PATCH 2/2] add option to include offsets --- commands/list/topics.go | 18 +++++++++++------- kafka/manager.go | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/commands/list/topics.go b/commands/list/topics.go index 8d001d4..316fb03 100644 --- a/commands/list/topics.go +++ b/commands/list/topics.go @@ -18,12 +18,13 @@ import ( ) type topics struct { - kafkaParams *commands.KafkaParameters - globalParams *commands.GlobalParameters - topicFilter *regexp.Regexp - loadConfigs bool - format string - style string + kafkaParams *commands.KafkaParameters + globalParams *commands.GlobalParameters + topicFilter *regexp.Regexp + loadConfigs bool + includeOffsets bool + format string + style string } func addTopicsSubCommand(parent *kingpin.CmdClause, global *commands.GlobalParameters, kafkaParams *commands.KafkaParameters) { @@ -38,6 +39,9 @@ func addTopicsSubCommand(parent *kingpin.CmdClause, global *commands.GlobalParam c.Flag("load-config", "Loads the topic's configurations from the server."). NoEnvar(). Short('c').BoolVar(&cmd.loadConfigs) + c.Flag("include-offsets", "Queries the server to read the latest available offset of each partition."). + NoEnvar(). + Short('o').BoolVar(&cmd.includeOffsets) commands.AddFormatFlag(c, &cmd.format, &cmd.style) } @@ -53,7 +57,7 @@ func (c *topics) run(_ *kingpin.ParseContext) error { cancel() }() - topics, err := manager.GetTopics(ctx, c.topicFilter, c.loadConfigs) + topics, err := manager.GetTopics(ctx, c.topicFilter, c.loadConfigs, c.includeOffsets) if err != nil { return err } diff --git a/kafka/manager.go b/kafka/manager.go index e9d98bf..69421e2 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -191,7 +191,7 @@ func (m *Manager) DescribeCluster(ctx context.Context, includeConfig bool) (*Clu } // GetTopics returns a list of all the topics on the server. -func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeConfig bool) ([]Topic, error) { +func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeConfig, includeOffsets bool) ([]Topic, error) { m.Log(internal.Verbose, "Retrieving topic list from the server") topics, err := m.admin.ListTopics() if err != nil { @@ -215,7 +215,7 @@ func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeC ReplicationFactor: details.ReplicationFactor, } if includeConfig { - meta, err := m.DescribeTopic(ctx, topic, includeConfig, includeConfig) + meta, err := m.DescribeTopic(ctx, topic, includeConfig, includeOffsets) if err == nil { t.Metadata = meta