Skip to content

Commit

Permalink
Merge pull request #226 from ChIoT-Tech/Issue214
Browse files Browse the repository at this point in the history
Provide a way to identify operations that should not be retried
  • Loading branch information
MattBrittan authored Jan 13, 2024
2 parents 69e5719 + 753d446 commit 2aef8db
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
7 changes: 6 additions & 1 deletion autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,12 @@ connectionLoop:
c.debug.Printf("publishing message from queue with topic %s", pub2.Topic)
if _, err = cli.PublishWithOptions(ctx, &pub2, paho.PublishOptions{Method: paho.PublishMethod_AsyncSend}); err != nil {
c.errors.Printf("error publishing from queue: %s", err)
if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if errors.Is(err, paho.ErrInvalidArguments) { // Some errors should not be retried
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
// Need a way to notify the user of this
} else if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
Expand Down
20 changes: 20 additions & 0 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package autopaho
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -27,6 +28,7 @@ import (
"testing"
"time"

"github.com/eclipse/paho.golang/autopaho/queue"
memqueue "github.com/eclipse/paho.golang/autopaho/queue/memory"
"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
Expand Down Expand Up @@ -145,7 +147,20 @@ func TestQueuedMessages(t *testing.T) {
if err != nil {
t.Fatalf("expected NewConnection success: %s", err)
}

testFmt := "Test%d"
// Start with an invalid message; this will be queued but then rejected by paho.PublishWithOptions, and it's
// important that it's discarded (otherwise it would be retried continually) - issue #214
if err = cm.PublishViaQueue(ctx, &QueuePublish{
Publish: &paho.Publish{
QoS: 3,
Topic: fmt.Sprintf(testFmt, 0),
Properties: nil,
Payload: []byte(fmt.Sprintf(testFmt, 0)),
},
}); err != nil {
t.Fatalf("invalid publish failed")
}

// Transmit first 100 messages (should go into queue)
for i := 1; i <= 100; i++ {
Expand Down Expand Up @@ -221,6 +236,11 @@ func TestQueuedMessages(t *testing.T) {
t.Fatal("test server did not shutdown within expected time")
}

// Check that the queue is empty
if _, err := cm.queue.Peek(); !errors.Is(err, queue.ErrEmpty) {
t.Error("queue should be empty")
}

// Check that we received the expected messages, in the expected order
for i := 1; i <= 200; i++ {
exp := fmt.Sprintf(testFmt, i)
Expand Down
16 changes: 9 additions & 7 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var (
ErrManualAcknowledgmentDisabled = errors.New("manual acknowledgments disabled")
ErrNetworkErrorAfterStored = errors.New("error after packet added to state") // Could not send packet but its stored (and response will be sent on chan at some point in the future)
ErrConnectionLost = errors.New("connection lost after request transmitted") // We don't know whether the server received the request or not

ErrInvalidArguments = errors.New("invalid argument") // If included (errors.Join) in an error, there is a problem with the arguments passed. Retrying on the same connection with the same arguments will not succeed.
)

type (
Expand Down Expand Up @@ -652,17 +654,17 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) {
for _, sub := range s.Subscriptions {
if strings.ContainsAny(sub.Topic, "#+") {
// Using a wildcard in a subscription when not supported
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", sub.Topic)
return nil, fmt.Errorf("%w: cannot subscribe to %s, server does not support wildcards", ErrInvalidArguments, sub.Topic)
}
}
}
if !c.serverProps.SubIDAvailable && s.Properties != nil && s.Properties.SubscriptionIdentifier != nil {
return nil, fmt.Errorf("cannot send subscribe with subID set, server does not support subID")
return nil, fmt.Errorf("%w: cannot send subscribe with subID set, server does not support subID", ErrInvalidArguments)
}
if !c.serverProps.SharedSubAvailable {
for _, sub := range s.Subscriptions {
if strings.HasPrefix(sub.Topic, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", sub.Topic)
return nil, fmt.Errorf("%w: cannont subscribe to %s, server does not support shared subscriptions", ErrInvalidArguments, sub.Topic)
}
}
}
Expand Down Expand Up @@ -824,18 +826,18 @@ type PublishOptions struct {
// Warning: Publish may outlive the connection when QOS1+ (managed in `session_state`)
func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOptions) (*PublishResponse, error) {
if p.QoS > c.serverProps.MaximumQoS {
return nil, fmt.Errorf("cannot send Publish with QoS %d, server maximum QoS is %d", p.QoS, c.serverProps.MaximumQoS)
return nil, fmt.Errorf("%w: cannot send Publish with QoS %d, server maximum QoS is %d", ErrInvalidArguments, p.QoS, c.serverProps.MaximumQoS)
}
if p.Properties != nil && p.Properties.TopicAlias != nil {
if c.serverProps.TopicAliasMaximum > 0 && *p.Properties.TopicAlias > c.serverProps.TopicAliasMaximum {
return nil, fmt.Errorf("cannot send publish with TopicAlias %d, server topic alias maximum is %d", *p.Properties.TopicAlias, c.serverProps.TopicAliasMaximum)
return nil, fmt.Errorf("%w: cannot send publish with TopicAlias %d, server topic alias maximum is %d", ErrInvalidArguments, *p.Properties.TopicAlias, c.serverProps.TopicAliasMaximum)
}
}
if !c.serverProps.RetainAvailable && p.Retain {
return nil, fmt.Errorf("cannot send Publish with retain flag set, server does not support retained messages")
return nil, fmt.Errorf("%w: cannot send Publish with retain flag set, server does not support retained messages", ErrInvalidArguments)
}
if (p.Properties == nil || p.Properties.TopicAlias == nil) && p.Topic == "" {
return nil, fmt.Errorf("cannot send a publish with no TopicAlias and no Topic set")
return nil, fmt.Errorf("%w: cannot send a publish with no TopicAlias and no Topic set", ErrInvalidArguments)
}

if c.config.PublishHook != nil {
Expand Down

0 comments on commit 2aef8db

Please sign in to comment.