diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 521dff097..b6407b582 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -4,6 +4,8 @@ import ( "time" ) +var NoNode = &Broker{id: -1, addr: ":-1"} + type FindCoordinatorResponse struct { Version int16 ThrottleTime time.Duration @@ -36,7 +38,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e } coordinator := new(Broker) - if err := coordinator.decode(pd, 0); err != nil { + if err := coordinator.decode(pd, version); err != nil { return err } if coordinator.addr == ":0" { @@ -60,10 +62,13 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { } } - if err := f.Coordinator.encode(pe, 0); err != nil { + coordinator := f.Coordinator + if coordinator == nil { + coordinator = NoNode + } + if err := coordinator.encode(pe, f.Version); err != nil { return err } - return nil } diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go index 39cec6469..0335fc8cb 100644 --- a/find_coordinator_response_test.go +++ b/find_coordinator_response_test.go @@ -5,42 +5,83 @@ import ( "time" ) -var ( - findCoordinatorResponse = []byte{ - 0, 0, 0, 100, - 0, 0, - 255, 255, // empty ErrMsg - 0, 0, 0, 1, - 0, 4, 'h', 'o', 's', 't', - 0, 0, 35, 132, - } - - findCoordinatorResponseError = []byte{ - 0, 0, 0, 100, - 0, 15, - 0, 3, 'm', 's', 'g', - 0, 0, 0, 1, - 0, 4, 'h', 'o', 's', 't', - 0, 0, 35, 132, - } -) - func TestFindCoordinatorResponse(t *testing.T) { - broker := NewBroker("host:9092") - broker.id = 1 - resp := &FindCoordinatorResponse{ - Version: 1, - ThrottleTime: 100 * time.Millisecond, - Err: ErrNoError, - ErrMsg: nil, - Coordinator: broker, - } - - testResponse(t, "version 1 - no error", resp, findCoordinatorResponse) + errMsg := "kaboom" + brokerRack := "foo" - msg := "msg" - resp.Err = ErrConsumerCoordinatorNotAvailable - resp.ErrMsg = &msg - - testResponse(t, "version 1 - error", resp, findCoordinatorResponseError) + for _, tc := range []struct { + desc string + response *FindCoordinatorResponse + encoded []byte + }{{ + desc: "version 0 - no error", + response: &FindCoordinatorResponse{ + Version: 0, + Err: ErrNoError, + Coordinator: &Broker{ + id: 7, + addr: "host:9092", + }, + }, + encoded: []byte{ + 0, 0, // Err + 0, 0, 0, 7, // Coordinator.ID + 0, 4, 'h', 'o', 's', 't', // Coordinator.Host + 0, 0, 35, 132, // Coordinator.Port + }, + }, { + desc: "version 1 - no error", + response: &FindCoordinatorResponse{ + Version: 1, + ThrottleTime: 100 * time.Millisecond, + Err: ErrNoError, + Coordinator: &Broker{ + id: 7, + addr: "host:9092", + rack: &brokerRack, + }, + }, + encoded: []byte{ + 0, 0, 0, 100, // ThrottleTime + 0, 0, // Err + 255, 255, // ErrMsg: empty + 0, 0, 0, 7, // Coordinator.ID + 0, 4, 'h', 'o', 's', 't', // Coordinator.Host + 0, 0, 35, 132, // Coordinator.Port + 0, 3, 'f', 'o', 'o', // Coordinator.Rack + }, + }, { + desc: "version 0 - error", + response: &FindCoordinatorResponse{ + Version: 0, + Err: ErrConsumerCoordinatorNotAvailable, + Coordinator: NoNode, + }, + encoded: []byte{ + 0, 15, // Err + 255, 255, 255, 255, // Coordinator.ID: -1 + 0, 0, // Coordinator.Host: "" + 255, 255, 255, 255, // Coordinator.Port: -1 + }, + }, { + desc: "version 1 - error", + response: &FindCoordinatorResponse{ + Version: 1, + ThrottleTime: 100 * time.Millisecond, + Err: ErrConsumerCoordinatorNotAvailable, + ErrMsg: &errMsg, + Coordinator: NoNode, + }, + encoded: []byte{ + 0, 0, 0, 100, // ThrottleTime + 0, 15, // Err + 0, 6, 'k', 'a', 'b', 'o', 'o', 'm', // ErrMsg + 255, 255, 255, 255, // Coordinator.ID: -1 + 0, 0, // Coordinator.Host: "" + 255, 255, 255, 255, // Coordinator.Port: -1 + 255, 255, // Coordinator.Rack: empty + }, + }} { + testResponse(t, tc.desc, tc.response, tc.encoded) + } } diff --git a/mockresponses.go b/mockresponses.go index f79a9d5e9..e12849f54 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -326,6 +326,60 @@ func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder { return res } +// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder. +type MockFindCoordinatorResponse struct { + groupCoordinators map[string]interface{} + transCoordinators map[string]interface{} + t TestReporter +} + +func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse { + return &MockFindCoordinatorResponse{ + groupCoordinators: make(map[string]interface{}), + transCoordinators: make(map[string]interface{}), + t: t, + } +} + +func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse { + switch coordinatorType { + case CoordinatorGroup: + mr.groupCoordinators[group] = broker + case CoordinatorTransaction: + mr.transCoordinators[group] = broker + } + return mr +} + +func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse { + switch coordinatorType { + case CoordinatorGroup: + mr.groupCoordinators[group] = kerror + case CoordinatorTransaction: + mr.transCoordinators[group] = kerror + } + return mr +} + +func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*FindCoordinatorRequest) + res := &FindCoordinatorResponse{} + var v interface{} + switch req.CoordinatorType { + case CoordinatorGroup: + v = mr.groupCoordinators[req.CoordinatorKey] + case CoordinatorTransaction: + v = mr.transCoordinators[req.CoordinatorKey] + } + switch v := v.(type) { + case *MockBroker: + res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()} + case KError: + res.Err = v + } + return res +} + // MockOffsetCommitResponse is a `OffsetCommitResponse` builder. type MockOffsetCommitResponse struct { errors map[string]map[string]map[int32]KError