-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathcreate_partitions_response.go
99 lines (85 loc) · 2.62 KB
/
create_partitions_response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package healer
import (
"encoding/binary"
"fmt"
)
// CreatePartitionsResponse holds the parameters of a create-partitions response
type CreatePartitionsResponse struct {
CorrelationID uint32 `json:"correlation_id"`
ThrottleTimeMS int32 `json:"throttle_time_ms"`
Results []createPartitionsResponseResultBlock `json:"results"`
// TAG_BUFFER
}
type createPartitionsResponseResultBlock struct {
TopicName string `json:"topic_name"`
ErrorCode int16 `json:"error_code"`
ErrorMessage *string `json:"error_message"`
// TAG_BUFFER
}
// Error implements the error interface, it returns error from error code in the response
func (r CreatePartitionsResponse) Error() error {
for _, result := range r.Results {
if result.ErrorCode != 0 {
return fmt.Errorf("create partitions error(%d): %s: %w", result.ErrorCode, *result.ErrorMessage, KafkaError(result.ErrorCode))
}
}
return nil
}
// NewCreatePartitionsResponse creates a new CreatePartitionsResponse from []byte
func NewCreatePartitionsResponse(payload []byte, version uint16) (r CreatePartitionsResponse, err error) {
var (
offset int
)
responseLength := int(binary.BigEndian.Uint32(payload))
if responseLength+4 != len(payload) {
return r, fmt.Errorf("create_partitions response length did not match: %d!=%d", responseLength+4, len(payload))
}
offset += 4
r.CorrelationID = binary.BigEndian.Uint32(payload[offset:])
offset += 4
if version == 2 {
// TAG_BUFFER
offset++
}
r.ThrottleTimeMS = int32(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
var blockCount int
if version >= 2 {
_blockCount, o := compactArrayLength(payload[offset:])
offset += o
blockCount = int(_blockCount)
} else {
blockCount = int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
}
r.Results = make([]createPartitionsResponseResultBlock, blockCount)
for i := 0; i < blockCount; i++ {
r.Results[i] = createPartitionsResponseResultBlock{}
if version >= 2 {
name, o := compactString(payload[offset:])
offset += o
r.Results[i].TopicName = name
} else {
l := int(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
r.Results[i].TopicName = string(payload[offset : offset+l])
offset += l
}
r.Results[i].ErrorCode = int16(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
if version >= 2 {
msg, o := compactNullableString(payload[offset:])
offset += o
r.Results[i].ErrorMessage = msg
} else {
msg, o := nullableString(payload[offset:])
offset += o
r.Results[i].ErrorMessage = msg
}
// TAG_BUFFER
offset++
}
// TAG_BUFFER
offset++
return
}