Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Return server link creation error at time of Sender/Receiver creation.
Browse files Browse the repository at this point in the history
Server errors during link creation are reported by sending an Attach
without a source or target and immediately sending a follow up Detach.
Check for this situation and wait for the Detach so the error can be
returned to the user immediately.
  • Loading branch information
vcabbage committed Jun 19, 2019
1 parent a69a1d5 commit ca53772
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 42 deletions.
36 changes: 36 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,42 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
return nil, errorErrorf("unexpected attach response: %#v", fr)
}

// If the remote encounters an error during the attach it returns an Attach
// with no Source or Target. The remote then sends a Detach with an error.
//
// Note that if the application chooses not to create a terminus, the session
// endpoint will still create a link endpoint and issue an attach indicating
// that the link endpoint has no associated local terminus. In this case, the
// session endpoint MUST immediately detach the newly created link endpoint.
//
// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
if resp.Source == nil && resp.Target == nil {
// wait for detach
select {
case <-s.done:
return nil, s.err
case fr = <-l.rx:
}

detach, ok := fr.(*performDetach)
if !ok {
return nil, errorErrorf("unexpected frame while waiting for detach: %#v", fr)
}

// send return detach
fr = &performDetach{
Handle: l.handle,
Closed: true,
}
debug(1, "TX: %s", fr)
s.txFrame(fr, nil)

if detach.Error == nil {
return nil, errorErrorf("received detach with no error specified")
}
return nil, detach.Error
}

if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize {
l.maxMessageSize = resp.MaxMessageSize
}
Expand Down
133 changes: 91 additions & 42 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ func TestIntegrationSend_Concurrent(t *testing.T) {
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(queueName),
amqp.LinkReceiverSettle(amqp.ModeSecond),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -633,6 +632,7 @@ func TestIntegrationSend_Concurrent(t *testing.T) {
})
}
}

func TestIntegrationSessionHandleMax(t *testing.T) {
queueName, _, cleanup := newTestQueue(t, "sessionwrap")
defer cleanup()
Expand Down Expand Up @@ -785,6 +785,54 @@ func TestIntegrationLinkName(t *testing.T) {
}
}

func TestIntegrationAttachError(t *testing.T) {
queueName, _, cleanup := newTestQueue(t, "linkName")
defer cleanup()

tests := []struct {
name string
error string
}{
{
name: "invalid-session-filter",
error: "A sessionful message receiver cannot be created on an entity that does not require sessions.",
},
}

for _, tt := range tests {
label := fmt.Sprintf("name %v", tt.name)
t.Run(label, func(t *testing.T) {
// Create client
client := newSBClient(t, label)
defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
t.Fatal(err)
}

// Creating link to a queue with a session filter should fail
r, err := session.NewReceiver(
amqp.LinkSourceAddress(queueName),
amqp.LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, "invalid"),
)
if err == nil {
testClose(t, r.Close)
}

switch {
case err == nil && tt.error == "":
// success
case err == nil:
t.Fatalf("expected error to contain %q, but it was nil", tt.error)
case !strings.Contains(err.Error(), tt.error):
t.Errorf("expected error to contain %q, but it was %q", tt.error, err)
}
})
}
}

func TestIntegrationClose(t *testing.T) {
queueName, _, cleanup := newTestQueue(t, "close")
defer cleanup()
Expand Down Expand Up @@ -1118,47 +1166,48 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) {
checkLeaks()
})

label = "issue48-receiver-mode-second"
t.Run(label, func(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)

// Create client
client := newEHClient(t, "issue48")
defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
t.Fatal(err)
}

// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(hubName),
amqp.LinkReceiverSettle(amqp.ModeSecond),
)
if err != nil {
t.Fatalf("%+v\n", err)
}

err = sender.Send(context.Background(), &amqp.Message{
Format: 0x80013700,
Data: [][]byte{
[]byte("hello"),
[]byte("there"),
},
})
if err == nil {
t.Fatal("Expected error, got nil")
}
if err, ok := err.(*amqp.Error); !ok || !azDescription.MatchString(err.Description) {
t.Fatalf("Unexpected error response: %+v", err)
}

client.Close() // close before leak check

checkLeaks()
})
// TODO: Re-enable with broker that supports rcv-mode-second
// label = "issue48-receiver-mode-second"
// t.Run(label, func(t *testing.T) {
// checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)

// // Create client
// client := newEHClient(t, "issue48")
// defer client.Close()

// // Open a session
// session, err := client.NewSession()
// if err != nil {
// t.Fatal(err)
// }

// // Create a sender
// sender, err := session.NewSender(
// amqp.LinkTargetAddress(hubName),
// amqp.LinkReceiverSettle(amqp.ModeSecond),
// )
// if err != nil {
// t.Fatalf("%+v\n", err)
// }

// err = sender.Send(context.Background(), &amqp.Message{
// Format: 0x80013700,
// Data: [][]byte{
// []byte("hello"),
// []byte("there"),
// },
// })
// if err == nil {
// t.Fatal("Expected error, got nil")
// }
// if err, ok := err.(*amqp.Error); !ok || !azDescription.MatchString(err.Description) {
// t.Fatalf("Unexpected error response: %+v", err)
// }

// client.Close() // close before leak check

// checkLeaks()
// })
}

func dump(i interface{}) {
Expand Down

0 comments on commit ca53772

Please sign in to comment.