Skip to content

Commit

Permalink
autopaho may stop publishing from queue if a publish times out
Browse files Browse the repository at this point in the history
This could lead to the client stalling with no messages going out.
  • Loading branch information
MattBrittan authored Nov 14, 2023
2 parents da39978 + 699fa78 commit a8b46ef
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 10 deletions.
13 changes: 8 additions & 5 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,27 @@ connectionLoop:
// and then return (at this point any pub1+ publish will be in the session so will be retried)
c.debug.Printf("publishing message from queue with topic %s", pub2.Topic)
if _, err = cli.PublishWithOptions(ctx, &pub2, paho.PublishOptions{Method: paho.PublishMethod_AsyncSend}); err != nil {
c.errors.Printf("error publishing from queue: %s", err)
if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if rErr := entry.Remove(); rErr != nil {
c.errors.Printf("error removing queue entry: %s", rErr)
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
} else {
if err := entry.Leave(); err != nil { // the message was not sent, so leave it in the queue
c.errors.Printf("error leaving queue entry: %s", err)
}
}
c.errors.Printf("error publishing from queue: %s", err)

// Wait for connection to drop before continuing (small delay before the client processes this)
// The error might be fatal (connection will drop) or could be temporary (i.e. PacketTimeout exceeded)
// as a result we currently retry unless we know the connection has dropped, or it's time to exit
select {
case <-ctx.Done():
return ctx.Err()
case <-connDown:
continue connectionLoop
default: // retry
continue
}
continue connectionLoop
}
if err := entry.Remove(); err != nil { // successfully published
c.errors.Printf("error removing queue entry: %s", err)
Expand Down
135 changes: 135 additions & 0 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/eclipse/paho.golang/autopaho/queue/memory"
"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
Expand Down Expand Up @@ -192,3 +195,135 @@ func TestQueuedMessages(t *testing.T) {
}
}
}

// TestPreloadPublish begin connection with PUBLISH packets in queue and a slow server
// Replicates issue #196 - this was caused by the wait for receive maximum slot taking
// longer than paho.PacketTimeout
func TestPreloadPublish(t *testing.T) {
t.Parallel()

// Bring up server
server, _ := url.Parse(dummyURL)
serverLogger := paholog.NewTestLogger(t, "testServer:")
logger := paholog.NewTestLogger(t, "test:")

ts := testserver.New(serverLogger)

var publishReceived int32
gotMessage := make(map[string]bool)
got5Messages := make(chan struct{})

ts.SetPacketReceivedCallback(func(cp *packets.ControlPacket) error {
if cp.Type != packets.PUBLISH {
return nil // Ignore packets other than PUBLISH
}
pub := cp.Content.(*packets.Publish)
gotMessage[pub.Topic] = true
if len(gotMessage) == 5 {
gotMessage["z"] = true // stop the above from being true if called again
close(got5Messages)
}

if publishReceived == 0 { // test server process in one go routine, so this will block all initial PUBLISH requests
time.Sleep(shortDelay) // delay ack
}
publishReceived++
return nil
})
ts.SetConnectCallback(func(cp *packets.Connect, cap *packets.Connack) {
rm := uint16(2)
cap.Properties.ReceiveMaximum = &rm
})

q := memory.New()
for i := 0; i < 5; i++ {
r, w := io.Pipe()

go func() {
publish := packets.Publish{
Topic: strconv.Itoa(i),
Payload: []byte("packet: " + strconv.Itoa(i)),
QoS: 1,
}
_, _ = publish.WriteTo(w)
w.Close()
}()

if err := q.Enqueue(r); err != nil {
t.Fatalf("failed to enqueue: %s", err)
}
}

// custom session because we don't want the client to close it when the connection is lost
var tsDone chan struct{} // Set on AttemptConnection and closed when that test server connection is done
session := state.NewInMemory()
session.SetErrorLogger(paholog.NewTestLogger(t, "sessionError:"))
session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:"))
defer session.Close()
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 0,
ConnectRetryDelay: shortDelay, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
var conn net.Conn
var err error
conn, tsDone, err = ts.Connect(ctx)
return conn, err
},
Debug: logger,
PahoDebug: logger,
PahoErrors: logger,
CleanStartOnInitialConnection: false, // Want session to stay up (this is the default)
SessionExpiryInterval: 600, // If 0 then the state will be removed when the connection drops
ClientConfig: paho.ClientConfig{
ClientID: "test",
Session: session,
Router: paho.NewStandardRouter(),
PacketTimeout: 250 * time.Millisecond, // test server should be able to respond very quickly!
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cm, err := NewConnection(ctx, config)
if err != nil {
t.Fatalf("expected NewConnection success: %s", err)
}

// Wait for all messages to be received
select {
case <-got5Messages:
case <-time.After(5 * longerDelay): // Need a bit longer...
t.Fatalf("timeout awaiting messages (received %d)", publishReceived)
}

// Disconnect
disconnectErr := make(chan error)
go func() {
disconnectErr <- cm.Disconnect(ctx)
}()
select {
case err = <-disconnectErr:
if err != nil {
t.Fatalf("Disconnect returned error: %s", err)
}
case <-time.After(longerDelay):
t.Fatal("Disconnect should return relatively quickly")
}

// Connection manager should be Done
select {
case <-cm.Done():
case <-time.After(shortDelay):
t.Fatal("connection manager should be done after Disconnect Called")
}

// The test server should have picked up the dropped connection
select {
case <-tsDone:
case <-time.After(shortDelay):
t.Fatal("test server did not shutdown within expected time")
}
}
33 changes: 29 additions & 4 deletions internal/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (i *Instance) Connect(ctx context.Context) (net.Conn, chan struct{}, error)
}
i.connPktDone = false // Connection packet should be the first thing we receive after each connection

// Note: net.Pipe is synchronous; an async pipe would probably better simulate a real connection
// Consider using something like github.com/grpc/grpc-go/test/bufconn/bufconn.go
// Output is buffered which means that there is some asynchronicity
userCon, ourCon := net.Pipe()
userCon, ourCon, err := netPipe(ctx)
if err != nil {
return nil, nil, err
}

// Ensure both connections support thread safe writes
userCon = packets.NewThreadSafeConn(userCon)
Expand Down Expand Up @@ -587,3 +587,28 @@ func (m *MIDs) Free(i uint16) {
func (m *MIDs) Clear() {
m.index = make([]bool, int(midMax))
}

// netPipe simulates a network link using a real connection.
// This is used over net.Pipe because net.Pipe is synchronous and this can create confusing results
// because it does not work like a real network connection (a call to `Write` will block until the other
// end calls `Read` whereas with a real connection there are buffers etc).
// There are many ways to do this, but using a real connection is simple and effective!
func netPipe(ctx context.Context) (net.Conn, net.Conn, error) {
var lc net.ListenConfig
l, err := lc.Listen(ctx, "tcp", "127.0.0.1:0") // Port 0 is wildcard port; OS will choose port for us
if err != nil {
return nil, nil, err
}
defer l.Close()
var d net.Dialer
userCon, err := d.DialContext(ctx, "tcp", l.Addr().String()) // Dial the port we just listened on
if err != nil {
return nil, nil, err
}
ourCon, err := l.Accept() // Should return immediately
if err != nil {
userCon.Close()
return nil, nil, err
}
return userCon, ourCon, nil
}
2 changes: 1 addition & 1 deletion paho/session/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (s *State) endClientGenerated(packetID uint16, recv *packets.ControlPacket)
// Outgoing publish messages will be in the store (replaced with PUBREL that is sent)
if cg.packetType == packets.PUBLISH || cg.packetType == packets.PUBREL {
if qErr := s.inflight.Release(); qErr != nil {
s.errors.Printf("quota release due to %d: %s", recv.PacketType(), qErr)
s.errors.Printf("quota release due to %s: %s", recv.PacketType(), qErr)
}
if err := s.clientStore.Delete(packetID); err != nil {
s.errors.Printf("failed to remove message %d from store: %s", packetID, err)
Expand Down

0 comments on commit a8b46ef

Please sign in to comment.