Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Add deliverytag to Message type (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
lawrencegripper authored and vcabbage committed Sep 7, 2018
1 parent 516d330 commit 271d628
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
17 changes: 13 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ func (s *Sender) Send(ctx context.Context, msg *Message) error {
// send is separated from Send so that the mutex unlock can be deferred without
// locking the transfer confirmation that happens in Send.
func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, error) {
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, errorErrorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
}

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -377,10 +381,13 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, er
deliveryID = atomic.AddUint32(&s.link.session.nextDeliveryID, 1)
)

// use uint64 encoded as []byte as deliveryTag
deliveryTag := make([]byte, 8)
binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
s.nextDeliveryTag++
deliveryTag := msg.DeliveryTag
if len(deliveryTag) == 0 {
// use uint64 encoded as []byte as deliveryTag
deliveryTag = make([]byte, 8)
binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
s.nextDeliveryTag++
}

fr := performTransfer{
Handle: s.link.handle,
Expand Down Expand Up @@ -1086,6 +1093,8 @@ func (l *link) muxReceive(fr performTransfer) error {
if fr.MessageFormat != nil {
l.msg.Format = *fr.MessageFormat
}

l.msg.DeliveryTag = fr.DeliveryTag
}

// ensure maxMessageSize will not be exceeded
Expand Down
33 changes: 28 additions & 5 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestIntegrationRoundTrip(t *testing.T) {
defer testClose(t, sender.Close)

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
cancel()
if err != nil {
Expand Down Expand Up @@ -163,6 +163,16 @@ func TestIntegrationRoundTrip(t *testing.T) {
return
}

if msg.DeliveryTag == nil {
receiveErr = fmt.Errorf("Error after %d receives: nil deliverytag received", i)
return
}

if msg.DeliveryTag != nil && len(msg.DeliveryTag) != 16 {
receiveErr = fmt.Errorf("Error after %d receives: deliverytag should be 16 length byte array representing a UUID. Got: %v", i, len(msg.DeliveryTag))
return
}

// Simulate processing after receiving. (This has revealed flow control bugs.)
time.Sleep(10 * time.Millisecond)

Expand Down Expand Up @@ -449,8 +459,9 @@ func TestIntegrationSend(t *testing.T) {
defer cleanup()

tests := []struct {
label string
data []string
label string
data []string
deliveryTag []byte
}{
{
label: "3 send, small payload",
Expand All @@ -459,6 +470,14 @@ func TestIntegrationSend(t *testing.T) {
"2Hi there!",
"2Ho there!",
},
deliveryTag: nil,
},
{
label: "1 send, deliverytagset",
data: []string{
"2Hey there - with tag!",
},
deliveryTag: []byte("37c4acb3"),
},
}

Expand Down Expand Up @@ -488,7 +507,11 @@ func TestIntegrationSend(t *testing.T) {

for i, data := range tt.data {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, amqp.NewMessage([]byte(data)))
msg := amqp.NewMessage([]byte(data))
if tt.deliveryTag != nil {
msg.DeliveryTag = tt.deliveryTag
}
err = sender.Send(ctx, msg)
cancel()
if err != nil {
t.Fatalf("Error after %d sends: %+v", i, err)
Expand All @@ -501,7 +524,7 @@ func TestIntegrationSend(t *testing.T) {
checkLeaks() // this is done here because queuesClient starts additional goroutines

// Wait for Azure to update stats
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)

q, err := queuesClient.Get(context.Background(), resourceGroup, namespace, queueName)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,8 @@ func (c *performClose) String() string {
return fmt.Sprintf("*performClose{Error: %s}", c.Error)
}

const maxDeliveryTagLength = 32

// Message is an AMQP message.
type Message struct {
// Message format code.
Expand All @@ -1644,6 +1646,9 @@ type Message struct {
// given version of a format is forwards compatible with all higher versions.
Format uint32

// The DeliveryTag can be up to 32 octets of binary data.
DeliveryTag []byte

// The header section carries standard delivery details about the transfer
// of a message through the AMQP network.
Header *MessageHeader
Expand Down

0 comments on commit 271d628

Please sign in to comment.