Skip to content

Commit

Permalink
Change autopaho to use backoff strategy instead of configurable value
Browse files Browse the repository at this point in the history
  • Loading branch information
ViToni committed Apr 18, 2024
1 parent dad6ecc commit 84be3e1
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 51 deletions.
19 changes: 14 additions & 5 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ type ClientConfig struct {
CleanStartOnInitialConnection bool // Clean Start flag, if true, existing session information will be cleared on the first connection (it will be false for subsequent connections)
SessionExpiryInterval uint32 // Session Expiry Interval in seconds (if 0 the Session ends when the Network Connection is closed)

ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s)
ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s)
WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection
// Deprecated: ConnectRetryDelay is deprecated and its functionality is replaced by ReconnectBackoffStrategy.
ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s)
ReconnectBackoffStrategy BackoffStrategy // How long to wait between connection attempts (defaults to 10s)
ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s)
WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection

Queue queue.Queue // Used to queue up publish messages (if nil an error will be returned if publish could not be transmitted)

Expand Down Expand Up @@ -242,8 +244,15 @@ func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, e
if cfg.Errors == nil {
cfg.Errors = log.NOOPLogger{}
}
if cfg.ConnectRetryDelay == 0 {
cfg.ConnectRetryDelay = 10 * time.Second
if cfg.ReconnectBackoffStrategy == nil {
// for backwards compatibility we check for ConnectRetryDelay first
// before using the default constant backoff strategy (which behaves
// identically to the previous behaviour)
if cfg.ConnectRetryDelay == 0 {
cfg.ReconnectBackoffStrategy = NewConstantBackoffStrategy(10 * time.Second)
} else {
cfg.ReconnectBackoffStrategy = NewConstantBackoffStrategy(cfg.ConnectRetryDelay)
}
}
if cfg.ConnectTimeout == 0 {
cfg.ConnectTimeout = 10 * time.Second
Expand Down
40 changes: 20 additions & 20 deletions autopaho/auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func TestDisconnect(t *testing.T) {

errCh := make(chan error, 2)
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
ctx, cancel := context.WithCancel(ctx)
conn, done, err := ts.Connect(ctx)
Expand Down Expand Up @@ -186,10 +186,10 @@ func TestReconnect(t *testing.T) {
pinger.SetDebug(paholog.NewTestLogger(t, "pinger:"))

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality
Expand Down Expand Up @@ -299,10 +299,10 @@ func TestBasicPubSub(t *testing.T) {
atCount := 0

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount > 1 { // force failure if a reconnection is attempted (the connection should not drop in this test)
Expand Down Expand Up @@ -444,10 +444,10 @@ func TestAuthenticate(t *testing.T) {
atCount := 0

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestClientConfig_buildConnectPacket(t *testing.T) {
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 5,
ConnectRetryDelay: 5 * time.Second,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(5 * time.Second),
ConnectTimeout: 5 * time.Second,
CleanStartOnInitialConnection: true, // Should set Clean Start flag on first connection attempt
// extends the lower-level paho.ClientConfig
Expand Down Expand Up @@ -627,9 +627,9 @@ func TestClientConfig_buildConnectPacket(t *testing.T) {
func ExampleClientConfig_ConnectPacketBuilder() {
serverURL, _ := url.Parse("mqtt://mqtt_user:mqtt_pass@127.0.0.1:1883")
config := ClientConfig{
ServerUrls: []*url.URL{serverURL},
ConnectRetryDelay: 5 * time.Second,
ConnectTimeout: 5 * time.Second,
ServerUrls: []*url.URL{serverURL},
ReconnectBackoffStrategy: NewConstantBackoffStrategy(5 * time.Second),
ConnectTimeout: 5 * time.Second,
ClientConfig: paho.ClientConfig{
ClientID: "test",
},
Expand Down
2 changes: 1 addition & 1 deletion autopaho/examples/docker/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
KeepAlive: cfg.keepAlive,
CleanStartOnInitialConnection: false, // the default
SessionExpiryInterval: 60, // Session remains live 60 seconds after disconnect
ConnectRetryDelay: cfg.connectRetryDelay,
ReconnectBackoffStrategy: autopaho.NewConstantBackoffStrategy(cfg.connectRetryDelay),
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { fmt.Println("mqtt connection up") },
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
Debug: log.NOOPLogger{},
Expand Down
2 changes: 1 addition & 1 deletion autopaho/examples/docker/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
KeepAlive: cfg.keepAlive,
CleanStartOnInitialConnection: false, // the default
SessionExpiryInterval: 60, // Session remains live 60 seconds after disconnect
ConnectRetryDelay: cfg.connectRetryDelay,
ReconnectBackoffStrategy: autopaho.NewConstantBackoffStrategy(cfg.connectRetryDelay),
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Expand Down
10 changes: 5 additions & 5 deletions autopaho/examples/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ func main() {
}

genericCfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{serverUrl},
KeepAlive: 30,
ConnectRetryDelay: 2 * time.Second,
ConnectTimeout: 5 * time.Second,
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ServerUrls: []*url.URL{serverUrl},
KeepAlive: 30,
ReconnectBackoffStrategy: autopaho.NewConstantBackoffStrategy(2 * time.Second),
ConnectTimeout: 5 * time.Second,
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
OnClientError: func(err error) { fmt.Printf("requested disconnect: %s\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
Expand Down
3 changes: 2 additions & 1 deletion autopaho/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConne
// Note: We do not touch b.cli in order to avoid adding thread safety issues.
var err error

backoff := cfg.ReconnectBackoffStrategy.Backoff()
for {
for _, u := range cfg.ServerUrls {
connectionCtx, cancelConnCtx := context.WithTimeout(ctx, cfg.ConnectTimeout)
Expand Down Expand Up @@ -106,7 +107,7 @@ func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConne

// Delay before attempting another connection
select {
case <-time.After(cfg.ConnectRetryDelay):
case <-time.After(backoff.Next()):
case <-ctx.Done():
return nil, nil
}
Expand Down
16 changes: 8 additions & 8 deletions autopaho/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func TestDisconnectAfterOutgoingPublish(t *testing.T) {
defer session.Close()
connectCount := 0
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
ctx, cancel := context.WithCancel(ctx) // Note: go vet warning is invalid
conn, done, err := ts.Connect(ctx)
Expand Down Expand Up @@ -240,10 +240,10 @@ func TestQueueResume(t *testing.T) {
defer session.Close()
connectCount := 0
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
ctx, cancel := context.WithCancel(ctx) // Note: go vet warning is invalid
conn, done, err := ts.Connect(ctx)
Expand Down
20 changes: 10 additions & 10 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func TestQueuedMessages(t *testing.T) {

connectCount := 0
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: 500 * time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(500 * time.Millisecond), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
if !allowConnection.Load() {
return nil, fmt.Errorf("some random error")
Expand Down Expand Up @@ -315,11 +315,11 @@ func TestPreloadPublish(t *testing.T) {
session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:"))
defer session.Close()
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 0,
ConnectRetryDelay: shortDelay, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
ServerUrls: []*url.URL{server},
KeepAlive: 0,
ReconnectBackoffStrategy: NewConstantBackoffStrategy(shortDelay), // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
var conn net.Conn
var err error
Expand Down

0 comments on commit 84be3e1

Please sign in to comment.