diff --git a/autopaho/auto.go b/autopaho/auto.go index aca176f..9c51709 100644 --- a/autopaho/auto.go +++ b/autopaho/auto.go @@ -520,24 +520,27 @@ connectionLoop: // and then return (at this point any pub1+ publish will be in the session so will be retried) c.debug.Printf("publishing message from queue with topic %s", pub2.Topic) if _, err = cli.PublishWithOptions(ctx, &pub2, paho.PublishOptions{Method: paho.PublishMethod_AsyncSend}); err != nil { + c.errors.Printf("error publishing from queue: %s", err) if errors.Is(err, paho.ErrNetworkErrorAfterStored) { // Message in session so remove from queue - if rErr := entry.Remove(); rErr != nil { - c.errors.Printf("error removing queue entry: %s", rErr) + if err := entry.Remove(); err != nil { + c.errors.Printf("error removing queue entry: %s", err) } } else { if err := entry.Leave(); err != nil { // the message was not sent, so leave it in the queue c.errors.Printf("error leaving queue entry: %s", err) } } - c.errors.Printf("error publishing from queue: %s", err) - // Wait for connection to drop before continuing (small delay before the client processes this) + // The error might be fatal (connection will drop) or could be temporary (i.e. PacketTimeout exceeded) + // as a result we currently retry unless we know the connection has dropped, or it's time to exit select { case <-ctx.Done(): return ctx.Err() case <-connDown: + continue connectionLoop + default: // retry + continue } - continue connectionLoop } if err := entry.Remove(); err != nil { // successfully published c.errors.Printf("error removing queue entry: %s", err) diff --git a/autopaho/queue_test.go b/autopaho/queue_test.go index 7031c99..aa87e52 100644 --- a/autopaho/queue_test.go +++ b/autopaho/queue_test.go @@ -4,12 +4,15 @@ import ( "bytes" "context" "fmt" + "io" "net" "net/url" + "strconv" "sync/atomic" "testing" "time" + "github.com/eclipse/paho.golang/autopaho/queue/memory" "github.com/eclipse/paho.golang/internal/testserver" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" @@ -192,3 +195,135 @@ func TestQueuedMessages(t *testing.T) { } } } + +// TestPreloadPublish begin connection with PUBLISH packets in queue and a slow server +// Replicates issue #196 - this was caused by the wait for receive maximum slot taking +// longer than paho.PacketTimeout +func TestPreloadPublish(t *testing.T) { + t.Parallel() + + // Bring up server + server, _ := url.Parse(dummyURL) + serverLogger := paholog.NewTestLogger(t, "testServer:") + logger := paholog.NewTestLogger(t, "test:") + + ts := testserver.New(serverLogger) + + var publishReceived int32 + gotMessage := make(map[string]bool) + got5Messages := make(chan struct{}) + + ts.SetPacketReceivedCallback(func(cp *packets.ControlPacket) error { + if cp.Type != packets.PUBLISH { + return nil // Ignore packets other than PUBLISH + } + pub := cp.Content.(*packets.Publish) + gotMessage[pub.Topic] = true + if len(gotMessage) == 5 { + gotMessage["z"] = true // stop the above from being true if called again + close(got5Messages) + } + + if publishReceived == 0 { // test server process in one go routine, so this will block all initial PUBLISH requests + time.Sleep(shortDelay) // delay ack + } + publishReceived++ + return nil + }) + ts.SetConnectCallback(func(cp *packets.Connect, cap *packets.Connack) { + rm := uint16(2) + cap.Properties.ReceiveMaximum = &rm + }) + + q := memory.New() + for i := 0; i < 5; i++ { + r, w := io.Pipe() + + go func() { + publish := packets.Publish{ + Topic: strconv.Itoa(i), + Payload: []byte("packet: " + strconv.Itoa(i)), + QoS: 1, + } + _, _ = publish.WriteTo(w) + w.Close() + }() + + if err := q.Enqueue(r); err != nil { + t.Fatalf("failed to enqueue: %s", err) + } + } + + // custom session because we don't want the client to close it when the connection is lost + var tsDone chan struct{} // Set on AttemptConnection and closed when that test server connection is done + session := state.NewInMemory() + session.SetErrorLogger(paholog.NewTestLogger(t, "sessionError:")) + session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:")) + defer session.Close() + config := ClientConfig{ + ServerUrls: []*url.URL{server}, + KeepAlive: 0, + ConnectRetryDelay: shortDelay, // Retry connection very quickly! + ConnectTimeout: shortDelay, // Connection should come up very quickly + Queue: q, + AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) { + var conn net.Conn + var err error + conn, tsDone, err = ts.Connect(ctx) + return conn, err + }, + Debug: logger, + PahoDebug: logger, + PahoErrors: logger, + CleanStartOnInitialConnection: false, // Want session to stay up (this is the default) + SessionExpiryInterval: 600, // If 0 then the state will be removed when the connection drops + ClientConfig: paho.ClientConfig{ + ClientID: "test", + Session: session, + Router: paho.NewStandardRouter(), + PacketTimeout: 250 * time.Millisecond, // test server should be able to respond very quickly! + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + cm, err := NewConnection(ctx, config) + if err != nil { + t.Fatalf("expected NewConnection success: %s", err) + } + + // Wait for all messages to be received + select { + case <-got5Messages: + case <-time.After(5 * longerDelay): // Need a bit longer... + t.Fatalf("timeout awaiting messages (received %d)", publishReceived) + } + + // Disconnect + disconnectErr := make(chan error) + go func() { + disconnectErr <- cm.Disconnect(ctx) + }() + select { + case err = <-disconnectErr: + if err != nil { + t.Fatalf("Disconnect returned error: %s", err) + } + case <-time.After(longerDelay): + t.Fatal("Disconnect should return relatively quickly") + } + + // Connection manager should be Done + select { + case <-cm.Done(): + case <-time.After(shortDelay): + t.Fatal("connection manager should be done after Disconnect Called") + } + + // The test server should have picked up the dropped connection + select { + case <-tsDone: + case <-time.After(shortDelay): + t.Fatal("test server did not shutdown within expected time") + } +} diff --git a/internal/testserver/testserver.go b/internal/testserver/testserver.go index a9d722b..8806165 100644 --- a/internal/testserver/testserver.go +++ b/internal/testserver/testserver.go @@ -148,10 +148,10 @@ func (i *Instance) Connect(ctx context.Context) (net.Conn, chan struct{}, error) } i.connPktDone = false // Connection packet should be the first thing we receive after each connection - // Note: net.Pipe is synchronous; an async pipe would probably better simulate a real connection - // Consider using something like github.com/grpc/grpc-go/test/bufconn/bufconn.go - // Output is buffered which means that there is some asynchronicity - userCon, ourCon := net.Pipe() + userCon, ourCon, err := netPipe(ctx) + if err != nil { + return nil, nil, err + } // Ensure both connections support thread safe writes userCon = packets.NewThreadSafeConn(userCon) @@ -587,3 +587,28 @@ func (m *MIDs) Free(i uint16) { func (m *MIDs) Clear() { m.index = make([]bool, int(midMax)) } + +// netPipe simulates a network link using a real connection. +// This is used over net.Pipe because net.Pipe is synchronous and this can create confusing results +// because it does not work like a real network connection (a call to `Write` will block until the other +// end calls `Read` whereas with a real connection there are buffers etc). +// There are many ways to do this, but using a real connection is simple and effective! +func netPipe(ctx context.Context) (net.Conn, net.Conn, error) { + var lc net.ListenConfig + l, err := lc.Listen(ctx, "tcp", "127.0.0.1:0") // Port 0 is wildcard port; OS will choose port for us + if err != nil { + return nil, nil, err + } + defer l.Close() + var d net.Dialer + userCon, err := d.DialContext(ctx, "tcp", l.Addr().String()) // Dial the port we just listened on + if err != nil { + return nil, nil, err + } + ourCon, err := l.Accept() // Should return immediately + if err != nil { + userCon.Close() + return nil, nil, err + } + return userCon, ourCon, nil +} diff --git a/paho/session/state/state.go b/paho/session/state/state.go index c5c5272..6ecd6a6 100644 --- a/paho/session/state/state.go +++ b/paho/session/state/state.go @@ -396,7 +396,7 @@ func (s *State) endClientGenerated(packetID uint16, recv *packets.ControlPacket) // Outgoing publish messages will be in the store (replaced with PUBREL that is sent) if cg.packetType == packets.PUBLISH || cg.packetType == packets.PUBREL { if qErr := s.inflight.Release(); qErr != nil { - s.errors.Printf("quota release due to %d: %s", recv.PacketType(), qErr) + s.errors.Printf("quota release due to %s: %s", recv.PacketType(), qErr) } if err := s.clientStore.Delete(packetID); err != nil { s.errors.Printf("failed to remove message %d from store: %s", packetID, err)