Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
Merge pull request #146 from NeoyeElf/fix/consumer-close
Browse files Browse the repository at this point in the history
[fix(ekafka)] 修正 originErr 没有正确赋值
  • Loading branch information
AaronJan authored Aug 23, 2021
2 parents 978e07f + dbafbac commit 931f6ba
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
24 changes: 12 additions & 12 deletions ekafka/consumerserver/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,20 @@ func (cmp *Component) launchOnConsumerGroupStart() error {

var originErr error
select {
case originErr := <-handlerExit:
case originErr = <-handlerExit:
if originErr != nil {
cmp.logger.Error("terminating ConsumerServer because an error", elog.FieldErr(originErr))
cmp.logger.Error("terminating ConsumerGroup because an error", elog.FieldErr(originErr))
} else {
cmp.logger.Info("message handler exited without any error, terminating ConsumerServer")
cmp.logger.Info("message handler exited without any error, terminating ConsumerGroup")
}
cmp.stopServer()
case <-cmp.ServerCtx.Done():
originErr := cmp.ServerCtx.Err()
cmp.logger.Error("terminating ConsumerServer because a context error", elog.FieldErr(originErr))
originErr = cmp.ServerCtx.Err()
cmp.logger.Error("terminating ConsumerGroup because a context error", elog.FieldErr(originErr))

err := <-handlerExit
if err != nil {
cmp.logger.Error("terminating ConsumerServer because an error", elog.FieldErr(err))
cmp.logger.Error("terminating ConsumerGroup because an error", elog.FieldErr(err))
} else {
cmp.logger.Info("message handler exited without any error")
}
Expand Down Expand Up @@ -195,20 +195,20 @@ func (cmp *Component) launchOnConsumerStart() error {

var originErr error
select {
case originErr := <-handlerExit:
case originErr = <-handlerExit:
if originErr != nil {
cmp.logger.Error("terminating ConsumerGroup because an error", elog.FieldErr(originErr))
cmp.logger.Error("terminating ConsumerServer because an error", elog.FieldErr(originErr))
} else {
cmp.logger.Info("message handler exited without any error, terminating ConsumerGroup")
cmp.logger.Info("message handler exited without any error, terminating ConsumerServer")
}
cmp.stopServer()
case <-cmp.ServerCtx.Done():
originErr := cmp.ServerCtx.Err()
cmp.logger.Error("terminating ConsumerGroup because a context error", elog.FieldErr(originErr))
originErr = cmp.ServerCtx.Err()
cmp.logger.Error("terminating ConsumerServer because a context error", elog.FieldErr(originErr))

err := <-handlerExit
if err != nil {
cmp.logger.Error("terminating ConsumerGroup because an error", elog.FieldErr(err))
cmp.logger.Error("terminating ConsumerServer because an error", elog.FieldErr(err))
} else {
cmp.logger.Info("message handler exited without any error")
}
Expand Down
2 changes: 1 addition & 1 deletion ekafka/test/e2e/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_ConsumeWithConsumer(t *testing.T) {
)
consumer := cmp.Consumer("c1")
for {
msg, err := consumer.ReadMessage(ctx)
msg, _, err := consumer.ReadMessage(ctx)
if err != nil {
consumerGroupErr <- err
return
Expand Down
2 changes: 1 addition & 1 deletion ekafka/test/e2e/consumerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Test_ConsumeServer_OnConsumerStart(t *testing.T) {
consumerServerComponent.OnStart(
func(ctx context.Context, consumer *ekafka.Consumer) error {
for {
msg, err := consumer.ReadMessage(ctx)
msg, _, err := consumer.ReadMessage(ctx)
if err != nil {
return err
}
Expand Down

0 comments on commit 931f6ba

Please sign in to comment.