-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdescribe_configs_response.go
133 lines (108 loc) · 3.1 KB
/
describe_configs_response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package healer
import (
"encoding/binary"
"fmt"
)
// DescribeConfigsResponse holds the parameters of a describe-configs response.
type DescribeConfigsResponse struct {
CorrelationID uint32
ThrottleTimeMS uint32
Resources []describeConfigsResponseResource
}
func (r DescribeConfigsResponse) Error() error {
for _, resource := range r.Resources {
if resource.ErrorCode != 0 {
return fmt.Errorf("describe configs failed: %s", KafkaError(resource.ErrorCode))
}
}
return nil
}
type describeConfigsResponseResource struct {
ErrorCode int16
ErrorMessage string
ResourceType uint8
ResourceName string
ConfigEntries []describeConfigsResponseConfigEntry
}
func decodeToDescribeConfigsResponseResource(payload []byte) (r describeConfigsResponseResource, offset int) {
var l int
r.ErrorCode = int16(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
if r.ErrorCode != 0 {
l = int(int16(binary.BigEndian.Uint16(payload[offset:])))
offset += 2
if l > 0 {
r.ErrorMessage = string(payload[offset : offset+l])
offset += l
}
} else {
offset += 2
}
r.ResourceType = uint8(payload[offset])
offset++
l = int(int16(binary.BigEndian.Uint16(payload[offset:])))
offset += 2
if l >= 0 {
r.ResourceName = string(payload[offset : offset+l])
offset += l
}
count := int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
r.ConfigEntries = make([]describeConfigsResponseConfigEntry, count)
for i := range r.ConfigEntries {
c, o := decodeToDescribeConfigsResponseConfigEntry(payload[offset:])
r.ConfigEntries[i] = c
offset += o
}
return
}
type describeConfigsResponseConfigEntry struct {
ConfigName string
ConfigValue string
ReadOnly bool
IsDefault bool
IsSensitive bool
}
func decodeToDescribeConfigsResponseConfigEntry(payload []byte) (r describeConfigsResponseConfigEntry, offset int) {
l := int(int16(binary.BigEndian.Uint16(payload)))
offset += 2
r.ConfigName = string(payload[offset : offset+l])
offset += l
l = int(int16(binary.BigEndian.Uint16(payload[offset:])))
offset += 2
if l > 0 {
r.ConfigValue = string(payload[offset : offset+l])
offset += l
}
r.ReadOnly = payload[offset] != 0
offset++
r.IsDefault = payload[offset] != 0
offset++
r.IsSensitive = payload[offset] != 0
offset++
return
}
// NewDescribeConfigsResponse creates a new DescribeConfigsResponse from the given payload
func NewDescribeConfigsResponse(payload []byte) (r DescribeConfigsResponse, err error) {
var (
offset int = 0
)
responseLength := int(binary.BigEndian.Uint32(payload))
if responseLength+4 != len(payload) {
return r, fmt.Errorf("describe_configs response length did not match: %d!=%d", responseLength+4, len(payload))
}
offset += 4
r.CorrelationID = binary.BigEndian.Uint32(payload[offset:])
offset += 4
r.ThrottleTimeMS = binary.BigEndian.Uint32(payload[offset:])
offset += 4
count := binary.BigEndian.Uint32(payload[offset:])
offset += 4
r.Resources = make([]describeConfigsResponseResource, count)
for i := range r.Resources {
c, o := decodeToDescribeConfigsResponseResource(payload[offset:])
r.Resources[i] = c
offset += o
}
return r, err
}