diff --git a/cmd/kaf/group.go b/cmd/kaf/group.go index 1a76aa4..dc538d5 100644 --- a/cmd/kaf/group.go +++ b/cmd/kaf/group.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "sort" @@ -10,7 +11,6 @@ import ( "text/tabwriter" "encoding/base64" - "encoding/hex" "sync" @@ -141,6 +141,7 @@ func createGroupCommitOffsetCmd() *cobra.Command { var offset string var partitionFlag int32 var allPartitions bool + var offsetMap string var noconfirm bool res := &cobra.Command{ Use: "commit", @@ -151,27 +152,91 @@ func createGroupCommitOffsetCmd() *cobra.Command { client := getClient() group := args[0] + partitionOffsets := make(map[int32]int64) - var partitions []int32 - if allPartitions { - // Determine partitions - admin := getClusterAdmin() - topicDetails, err := admin.DescribeTopics([]string{topic}) - if err != nil { - errorExit("Unable to determine partitions of topic: %v\n", err) + if offsetMap != "" { + if err := json.Unmarshal([]byte(offsetMap), &partitionOffsets); err != nil { + errorExit("Wrong --offset-map format. Use JSON with keys as partition numbers and values as offsets.\nExample: --offset-map '{\"0\":123, \"1\":135, \"2\":120}'\n") } + } else { + var partitions []int32 + if allPartitions { + // Determine partitions + admin := getClusterAdmin() + topicDetails, err := admin.DescribeTopics([]string{topic}) + if err != nil { + errorExit("Unable to determine partitions of topic: %v\n", err) + } - detail := topicDetails[0] + detail := topicDetails[0] - for _, p := range detail.Partitions { - partitions = append(partitions, p.ID) + for _, p := range detail.Partitions { + partitions = append(partitions, p.ID) + } + } else if partitionFlag != -1 { + partitions = []int32{partitionFlag} + } else { + errorExit("Either --partition, --all-partitions or --offset-map flag must be provided") + } + + sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] }) + + type Assignment struct { + partition int32 + offset int64 + } + assignments := make(chan Assignment, len(partitions)) + + // TODO offset must be calced per partition + var wg sync.WaitGroup + for _, partition := range partitions { + wg.Add(1) + go func(partition int32) { + defer wg.Done() + i, err := strconv.ParseInt(offset, 10, 64) + if err != nil { + // Try oldest/newest/.. + if offset == "oldest" { + i = sarama.OffsetOldest + } else if offset == "newest" || offset == "latest" { + i = sarama.OffsetNewest + } else { + // Try timestamp + t, err := time.Parse(time.RFC3339, offset) + if err != nil { + errorExit("offset is neither offset nor timestamp", nil) + } + i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) + } + + o, err := client.GetOffset(topic, partition, i) + if err != nil { + errorExit("Failed to determine offset for timestamp: %v", err) + } + + if o == -1 { + fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition) + return + //errorExit("Determined offset -1 from timestamp. Skipping.", o) + } + + assignments <- Assignment{partition: partition, offset: o} + + fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o) + } else { + assignments <- Assignment{partition: partition, offset: i} + } + }(partition) + } + wg.Wait() + close(assignments) + + for assign := range assignments { + partitionOffsets[assign.partition] = assign.offset } - } else if partitionFlag != -1 { - partitions = []int32{partitionFlag} - } else { - errorExit("Either --partition or --all-partitions flag must be provided") } + // Verify the Consumer Group is Empty admin := getClusterAdmin() groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]}) if err != nil { @@ -184,63 +249,6 @@ func createGroupCommitOffsetCmd() *cobra.Command { } } - sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] }) - - type Assignment struct { - partition int32 - offset int64 - } - assignments := make(chan Assignment, len(partitions)) - - // TODO offset must be calced per partition - var wg sync.WaitGroup - for _, partition := range partitions { - wg.Add(1) - go func(partition int32) { - defer wg.Done() - i, err := strconv.ParseInt(offset, 10, 64) - if err != nil { - // Try oldest/newest/.. - if offset == "oldest" { - i = sarama.OffsetOldest - } else if offset == "newest" || offset == "latest" { - i = sarama.OffsetNewest - } else { - // Try timestamp - t, err := time.Parse(time.RFC3339, offset) - if err != nil { - errorExit("offset is neither offset nor timestamp", nil) - } - i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) - } - - o, err := client.GetOffset(topic, partition, i) - if err != nil { - errorExit("Failed to determine offset for timestamp: %v", err) - } - - if o == -1 { - fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition) - return - //errorExit("Determined offset -1 from timestamp. Skipping.", o) - } - - assignments <- Assignment{partition: partition, offset: o} - - fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o) - } else { - assignments <- Assignment{partition: partition, offset: i} - } - }(partition) - } - wg.Wait() - close(assignments) - - partitionOffsets := make(map[int32]int64, len(partitions)) - for assign := range assignments { - partitionOffsets[assign.partition] = assign.offset - } - fmt.Printf("Resetting offsets to: %v\n", partitionOffsets) if !noconfirm { @@ -272,12 +280,18 @@ func createGroupCommitOffsetCmd() *cobra.Command { } fmt.Printf("Successfully committed offsets to %v.\n", partitionOffsets) + + closeErr := g.Close() + if closeErr != nil { + fmt.Printf("Warning: Failed to close consumer group: %v\n", closeErr) + } }, } res.Flags().StringVarP(&topic, "topic", "t", "", "topic") res.Flags().StringVarP(&offset, "offset", "o", "", "offset to commit") res.Flags().Int32VarP(&partitionFlag, "partition", "p", 0, "partition") res.Flags().BoolVar(&allPartitions, "all-partitions", false, "apply to all partitions") + res.Flags().StringVar(&offsetMap, "offset-map", "", "set different offsets per different partitions in JSON format, e.g. {\"0\": 123, \"1\": 42}") res.Flags().BoolVar(&noconfirm, "noconfirm", false, "Do not prompt for confirmation") return res }