Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autopaho may stop publishing from queue if a publish times out #198

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,27 @@ connectionLoop:
// and then return (at this point any pub1+ publish will be in the session so will be retried)
c.debug.Printf("publishing message from queue with topic %s", pub2.Topic)
if _, err = cli.PublishWithOptions(ctx, &pub2, paho.PublishOptions{Method: paho.PublishMethod_AsyncSend}); err != nil {
c.errors.Printf("error publishing from queue: %s", err)
if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue
if rErr := entry.Remove(); rErr != nil {
c.errors.Printf("error removing queue entry: %s", rErr)
if err := entry.Remove(); err != nil {
c.errors.Printf("error removing queue entry: %s", err)
}
} else {
if err := entry.Leave(); err != nil { // the message was not sent, so leave it in the queue
c.errors.Printf("error leaving queue entry: %s", err)
}
}
c.errors.Printf("error publishing from queue: %s", err)

// Wait for connection to drop before continuing (small delay before the client processes this)
// The error might be fatal (connection will drop) or could be temporary (i.e. PacketTimeout exceeded)
// as a result we currently retry unless we know the connection has dropped, or it's time to exit
select {
case <-ctx.Done():
return ctx.Err()
case <-connDown:
continue connectionLoop
default: // retry
continue
}
continue connectionLoop
}
if err := entry.Remove(); err != nil { // successfully published
c.errors.Printf("error removing queue entry: %s", err)
Expand Down
135 changes: 135 additions & 0 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/eclipse/paho.golang/autopaho/queue/memory"
"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
Expand Down Expand Up @@ -192,3 +195,135 @@ func TestQueuedMessages(t *testing.T) {
}
}
}

// TestPreloadPublish begin connection with PUBLISH packets in queue and a slow server
// Replicates issue #196 - this was caused by the wait for receive maximum slot taking
// longer than paho.PacketTimeout
func TestPreloadPublish(t *testing.T) {
t.Parallel()

// Bring up server
server, _ := url.Parse(dummyURL)
serverLogger := paholog.NewTestLogger(t, "testServer:")
logger := paholog.NewTestLogger(t, "test:")

ts := testserver.New(serverLogger)

var publishReceived int32
gotMessage := make(map[string]bool)
got5Messages := make(chan struct{})

ts.SetPacketReceivedCallback(func(cp *packets.ControlPacket) error {
if cp.Type != packets.PUBLISH {
return nil // Ignore packets other than PUBLISH
}
pub := cp.Content.(*packets.Publish)
gotMessage[pub.Topic] = true
if len(gotMessage) == 5 {
gotMessage["z"] = true // stop the above from being true if called again
close(got5Messages)
}

if publishReceived == 0 { // test server process in one go routine, so this will block all initial PUBLISH requests
time.Sleep(shortDelay) // delay ack
}
publishReceived++
return nil
})
ts.SetConnectCallback(func(cp *packets.Connect, cap *packets.Connack) {
rm := uint16(2)
cap.Properties.ReceiveMaximum = &rm
})

q := memory.New()
for i := 0; i < 5; i++ {
r, w := io.Pipe()

go func() {
publish := packets.Publish{
Topic: strconv.Itoa(i),
Payload: []byte("packet: " + strconv.Itoa(i)),
QoS: 1,
}
_, _ = publish.WriteTo(w)
w.Close()
}()

if err := q.Enqueue(r); err != nil {
t.Fatalf("failed to enqueue: %s", err)
}
}

// custom session because we don't want the client to close it when the connection is lost
var tsDone chan struct{} // Set on AttemptConnection and closed when that test server connection is done
session := state.NewInMemory()
session.SetErrorLogger(paholog.NewTestLogger(t, "sessionError:"))
session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:"))
defer session.Close()
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 0,
ConnectRetryDelay: shortDelay, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
var conn net.Conn
var err error
conn, tsDone, err = ts.Connect(ctx)
return conn, err
},
Debug: logger,
PahoDebug: logger,
PahoErrors: logger,
CleanStartOnInitialConnection: false, // Want session to stay up (this is the default)
SessionExpiryInterval: 600, // If 0 then the state will be removed when the connection drops
ClientConfig: paho.ClientConfig{
ClientID: "test",
Session: session,
Router: paho.NewStandardRouter(),
PacketTimeout: 250 * time.Millisecond, // test server should be able to respond very quickly!
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cm, err := NewConnection(ctx, config)
if err != nil {
t.Fatalf("expected NewConnection success: %s", err)
}

// Wait for all messages to be received
select {
case <-got5Messages:
case <-time.After(5 * longerDelay): // Need a bit longer...
t.Fatalf("timeout awaiting messages (received %d)", publishReceived)
}

// Disconnect
disconnectErr := make(chan error)
go func() {
disconnectErr <- cm.Disconnect(ctx)
}()
select {
case err = <-disconnectErr:
if err != nil {
t.Fatalf("Disconnect returned error: %s", err)
}
case <-time.After(longerDelay):
t.Fatal("Disconnect should return relatively quickly")
}

// Connection manager should be Done
select {
case <-cm.Done():
case <-time.After(shortDelay):
t.Fatal("connection manager should be done after Disconnect Called")
}

// The test server should have picked up the dropped connection
select {
case <-tsDone:
case <-time.After(shortDelay):
t.Fatal("test server did not shutdown within expected time")
}
}
33 changes: 29 additions & 4 deletions internal/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (i *Instance) Connect(ctx context.Context) (net.Conn, chan struct{}, error)
}
i.connPktDone = false // Connection packet should be the first thing we receive after each connection

// Note: net.Pipe is synchronous; an async pipe would probably better simulate a real connection
// Consider using something like github.com/grpc/grpc-go/test/bufconn/bufconn.go
// Output is buffered which means that there is some asynchronicity
userCon, ourCon := net.Pipe()
userCon, ourCon, err := netPipe(ctx)
if err != nil {
return nil, nil, err
}

// Ensure both connections support thread safe writes
userCon = packets.NewThreadSafeConn(userCon)
Expand Down Expand Up @@ -587,3 +587,28 @@ func (m *MIDs) Free(i uint16) {
func (m *MIDs) Clear() {
m.index = make([]bool, int(midMax))
}

// netPipe simulates a network link using a real connection.
// This is used over net.Pipe because net.Pipe is synchronous and this can create confusing results
// because it does not work like a real network connection (a call to `Write` will block until the other
// end calls `Read` whereas with a real connection there are buffers etc).
// There are many ways to do this, but using a real connection is simple and effective!
func netPipe(ctx context.Context) (net.Conn, net.Conn, error) {
var lc net.ListenConfig
l, err := lc.Listen(ctx, "tcp", "127.0.0.1:0") // Port 0 is wildcard port; OS will choose port for us
if err != nil {
return nil, nil, err
}
defer l.Close()
var d net.Dialer
userCon, err := d.DialContext(ctx, "tcp", l.Addr().String()) // Dial the port we just listened on
if err != nil {
return nil, nil, err
}
ourCon, err := l.Accept() // Should return immediately
if err != nil {
userCon.Close()
return nil, nil, err
}
return userCon, ourCon, nil
}
2 changes: 1 addition & 1 deletion paho/session/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (s *State) endClientGenerated(packetID uint16, recv *packets.ControlPacket)
// Outgoing publish messages will be in the store (replaced with PUBREL that is sent)
if cg.packetType == packets.PUBLISH || cg.packetType == packets.PUBREL {
if qErr := s.inflight.Release(); qErr != nil {
s.errors.Printf("quota release due to %d: %s", recv.PacketType(), qErr)
s.errors.Printf("quota release due to %s: %s", recv.PacketType(), qErr)
}
if err := s.clientStore.Delete(packetID); err != nil {
s.errors.Printf("failed to remove message %d from store: %s", packetID, err)
Expand Down