Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor client to return error on receiver.Receive #707

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
}()

// Start Polling.
errChan := make(chan error)
wg := sync.WaitGroup{}
for i := 0; i < c.pollGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
var msg binding.Message
var respFn protocol.ResponseFn
Expand All @@ -241,13 +240,9 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
respFn = noRespFn
}

if err == io.EOF { // Normal close
return
}

if err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while receiving a message: ", err)
continue
errChan <- err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all errors cause all protocols to not respond, some would like the message to be returned via Invoke, this change does not seem like it will work for all protocols.

Is there a change to the amqp impl that would allow it to handle the error? Like, perhaps it needs to respond with a io.EOF when the the server is disconnected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

I think it does make send to send io.EOF in case connection is down, I'll look in that direction.

return
}

// Do not block on the invoker.
Expand All @@ -270,6 +265,16 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
}
}

select {
case chanErr := <-errChan:
if chanErr != io.EOF {
err = chanErr
}
case <-ctx.Done():
// TODO: it might be important to actually return error from context
cecontext.LoggerFrom(ctx).Info("Error from closed context: ", ctx.Err())
}
// wait for all invoker processes to finish
wg.Wait()

return err
Expand Down
8 changes: 7 additions & 1 deletion v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -266,7 +267,7 @@ func TestClientReceive(t *testing.T) {
for n, tc := range testCases {
for _, path := range []string{"", "/", "/unittest/"} {
t.Run(n+" at path "+path, func(t *testing.T) {

wg := &sync.WaitGroup{}
events := make(chan event.Event)

p, err := cehttp.New(tc.optsFn(0, "")...)
Expand All @@ -280,7 +281,9 @@ func TestClientReceive(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.TODO())
wg.Add(1)
go func() {
defer wg.Done()
err := c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error {
go func() {
events <- event
Expand Down Expand Up @@ -352,6 +355,9 @@ func TestClientReceive(t *testing.T) {
if _, err := http.DefaultClient.Do(req); err == nil {
t.Fatalf("expected error to when sending request to stopped client")
}
// need to wait until receiver goroutines finish
// in case they result in test error
wg.Wait()
})
}
}
Expand Down