From ca53772c6c7e0fcb68235d8273643f7940fdb4c1 Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Sat, 15 Jun 2019 10:18:47 -0700 Subject: [PATCH] Return server link creation error at time of Sender/Receiver creation. Server errors during link creation are reported by sending an Attach without a source or target and immediately sending a follow up Detach. Check for this situation and wait for the Detach so the error can be returned to the user immediately. --- client.go | 36 ++++++++++++ integration_test.go | 133 ++++++++++++++++++++++++++++++-------------- 2 files changed, 127 insertions(+), 42 deletions(-) diff --git a/client.go b/client.go index 0aed35e2..88193caa 100644 --- a/client.go +++ b/client.go @@ -935,6 +935,42 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { return nil, errorErrorf("unexpected attach response: %#v", fr) } + // If the remote encounters an error during the attach it returns an Attach + // with no Source or Target. The remote then sends a Detach with an error. + // + // Note that if the application chooses not to create a terminus, the session + // endpoint will still create a link endpoint and issue an attach indicating + // that the link endpoint has no associated local terminus. In this case, the + // session endpoint MUST immediately detach the newly created link endpoint. + // + // http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144 + if resp.Source == nil && resp.Target == nil { + // wait for detach + select { + case <-s.done: + return nil, s.err + case fr = <-l.rx: + } + + detach, ok := fr.(*performDetach) + if !ok { + return nil, errorErrorf("unexpected frame while waiting for detach: %#v", fr) + } + + // send return detach + fr = &performDetach{ + Handle: l.handle, + Closed: true, + } + debug(1, "TX: %s", fr) + s.txFrame(fr, nil) + + if detach.Error == nil { + return nil, errorErrorf("received detach with no error specified") + } + return nil, detach.Error + } + if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize { l.maxMessageSize = resp.MaxMessageSize } diff --git a/integration_test.go b/integration_test.go index bb8596de..56ab1aaf 100644 --- a/integration_test.go +++ b/integration_test.go @@ -586,7 +586,6 @@ func TestIntegrationSend_Concurrent(t *testing.T) { // Create a sender sender, err := session.NewSender( amqp.LinkTargetAddress(queueName), - amqp.LinkReceiverSettle(amqp.ModeSecond), ) if err != nil { t.Fatal(err) @@ -633,6 +632,7 @@ func TestIntegrationSend_Concurrent(t *testing.T) { }) } } + func TestIntegrationSessionHandleMax(t *testing.T) { queueName, _, cleanup := newTestQueue(t, "sessionwrap") defer cleanup() @@ -785,6 +785,54 @@ func TestIntegrationLinkName(t *testing.T) { } } +func TestIntegrationAttachError(t *testing.T) { + queueName, _, cleanup := newTestQueue(t, "linkName") + defer cleanup() + + tests := []struct { + name string + error string + }{ + { + name: "invalid-session-filter", + error: "A sessionful message receiver cannot be created on an entity that does not require sessions.", + }, + } + + for _, tt := range tests { + label := fmt.Sprintf("name %v", tt.name) + t.Run(label, func(t *testing.T) { + // Create client + client := newSBClient(t, label) + defer client.Close() + + // Open a session + session, err := client.NewSession() + if err != nil { + t.Fatal(err) + } + + // Creating link to a queue with a session filter should fail + r, err := session.NewReceiver( + amqp.LinkSourceAddress(queueName), + amqp.LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, "invalid"), + ) + if err == nil { + testClose(t, r.Close) + } + + switch { + case err == nil && tt.error == "": + // success + case err == nil: + t.Fatalf("expected error to contain %q, but it was nil", tt.error) + case !strings.Contains(err.Error(), tt.error): + t.Errorf("expected error to contain %q, but it was %q", tt.error, err) + } + }) + } +} + func TestIntegrationClose(t *testing.T) { queueName, _, cleanup := newTestQueue(t, "close") defer cleanup() @@ -1118,47 +1166,48 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) { checkLeaks() }) - label = "issue48-receiver-mode-second" - t.Run(label, func(t *testing.T) { - checkLeaks := leaktest.CheckTimeout(t, 60*time.Second) - - // Create client - client := newEHClient(t, "issue48") - defer client.Close() - - // Open a session - session, err := client.NewSession() - if err != nil { - t.Fatal(err) - } - - // Create a sender - sender, err := session.NewSender( - amqp.LinkTargetAddress(hubName), - amqp.LinkReceiverSettle(amqp.ModeSecond), - ) - if err != nil { - t.Fatalf("%+v\n", err) - } - - err = sender.Send(context.Background(), &amqp.Message{ - Format: 0x80013700, - Data: [][]byte{ - []byte("hello"), - []byte("there"), - }, - }) - if err == nil { - t.Fatal("Expected error, got nil") - } - if err, ok := err.(*amqp.Error); !ok || !azDescription.MatchString(err.Description) { - t.Fatalf("Unexpected error response: %+v", err) - } - - client.Close() // close before leak check - - checkLeaks() - }) + // TODO: Re-enable with broker that supports rcv-mode-second + // label = "issue48-receiver-mode-second" + // t.Run(label, func(t *testing.T) { + // checkLeaks := leaktest.CheckTimeout(t, 60*time.Second) + + // // Create client + // client := newEHClient(t, "issue48") + // defer client.Close() + + // // Open a session + // session, err := client.NewSession() + // if err != nil { + // t.Fatal(err) + // } + + // // Create a sender + // sender, err := session.NewSender( + // amqp.LinkTargetAddress(hubName), + // amqp.LinkReceiverSettle(amqp.ModeSecond), + // ) + // if err != nil { + // t.Fatalf("%+v\n", err) + // } + + // err = sender.Send(context.Background(), &amqp.Message{ + // Format: 0x80013700, + // Data: [][]byte{ + // []byte("hello"), + // []byte("there"), + // }, + // }) + // if err == nil { + // t.Fatal("Expected error, got nil") + // } + // if err, ok := err.(*amqp.Error); !ok || !azDescription.MatchString(err.Description) { + // t.Fatalf("Unexpected error response: %+v", err) + // } + + // client.Close() // close before leak check + + // checkLeaks() + // }) } func dump(i interface{}) {