Skip to content

Commit

Permalink
Return an error when you try to send a message that's too large.
Browse files Browse the repository at this point in the history
This now works just like the message batch - you'll get an ErrMessageTooLarge
if you attempt to send a message that's too large for the link's configured
size.

NOTE: there's a patch to `internal/go-amqp/Sender.go` to match what's in go-amqp's
main so it returns a programmatically useful error when the message is too large.

Fixes Azure#20647
  • Loading branch information
richardpark-msft authored May 1, 2023
1 parent 9111616 commit aa8f10c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
10 changes: 8 additions & 2 deletions sdk/messaging/azservicebus/internal/go-amqp/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
)
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
}
}

s.mu.Lock()
Expand All @@ -114,7 +117,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
}

if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
return nil, fmt.Errorf("encoded message size exceeds max of %d", s.l.maxMessageSize)
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
}
}

senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/message_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

// ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")
// or if the message is being sent on its own and is too large for the link.
var ErrMessageTooLarge = errors.New("the message is too large")

type (
// MessageBatch represents a batch of messages to send to Service Bus in a single message
Expand Down
15 changes: 12 additions & 3 deletions sdk/messaging/azservicebus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azservicebus

import (
"context"
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
Expand Down Expand Up @@ -33,7 +34,7 @@ type MessageBatchOptions struct {
// NewMessageBatch can be used to create a batch that contain multiple
// messages. Sending a batch of messages is more efficient than sending the
// messages one at a time.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error) {
var batch *MessageBatch

Expand Down Expand Up @@ -61,7 +62,9 @@ type SendMessageOptions struct {
}

// SendMessage sends a Message to a queue or topic.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error {
return s.sendMessage(ctx, message)
}
Expand All @@ -74,7 +77,9 @@ type SendAMQPAnnotatedMessageOptions struct {
// SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic.
// Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better
// interoperability with pure AMQP clients.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error {
return s.sendMessage(ctx, message)
}
Expand Down Expand Up @@ -171,6 +176,10 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage)
return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil)
}, RetryOptions(s.retryOptions))

if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded {
return ErrMessageTooLarge
}

return internal.TransformError(err)
}

Expand Down
44 changes: 44 additions & 0 deletions sdk/messaging/azservicebus/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,47 @@ func (rm receivedMessages) Less(i, j int) bool {
func (rm receivedMessages) Swap(i, j int) {
rm[i], rm[j] = rm[j], rm[i]
}

func Test_Sender_Send_MessageTooBig(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
ClientOptions: &ClientOptions{
RetryOptions: RetryOptions{
// This is a purposefully ridiculous wait time but we'll never hit it
// because exceeding the max message size is NOT a retryable error.
RetryDelay: time.Hour,
},
},
QueueProperties: &admin.QueueProperties{
EnablePartitioning: to.Ptr(true),
}})
defer cleanup()

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)

hugePayload := []byte{}

for i := 0; i < 1000*1000; i++ {
hugePayload = append(hugePayload, 100)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err = sender.SendMessage(ctx, &Message{
MessageID: to.Ptr("message with a message ID"),
Body: hugePayload,
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)

ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
defer cancel()

err = sender.SendAMQPAnnotatedMessage(ctx, &AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{hugePayload},
},
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)
}

0 comments on commit aa8f10c

Please sign in to comment.