From c6d8d523bc2bedf4c3da82190225fc8e51090ccd Mon Sep 17 00:00:00 2001 From: Aitor Perez Cedres Date: Tue, 7 May 2024 13:09:36 +0100 Subject: [PATCH 1/2] Fix data races in the example Some users rely on this example as a starting point to their applications. This commit fixes a data race that could cause issues in any code that relied on the example as base. Related to #72 Signed-off-by: Aitor Perez Cedres --- example_client_test.go | 65 +++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/example_client_test.go b/example_client_test.go index 34c7fc3..a93c509 100644 --- a/example_client_test.go +++ b/example_client_test.go @@ -8,9 +8,9 @@ package amqp091_test import ( "context" "errors" - "fmt" "log" "os" + "sync" "time" amqp "github.com/rabbitmq/amqp091-go" @@ -23,9 +23,10 @@ import ( // It doesn't automatically ack each message, but leaves that // to the parent process, since it is usage-dependent. // -// Try running this in one terminal, and `rabbitmq-server` in another. +// Try running this in one terminal, and rabbitmq-server in another. +// // Stop & restart RabbitMQ to see how the queue reacts. -func Example() { +func Example_publish() { queueName := "job_queue" addr := "amqp://guest:guest@localhost:5672/" queue := New(queueName, addr) @@ -39,12 +40,14 @@ loop: // Attempt to push a message every 2 seconds case <-time.After(time.Second * 2): if err := queue.Push(message); err != nil { - fmt.Printf("Push failed: %s\n", err) + log.Printf("Push failed: %s\n", err) } else { - fmt.Println("Push succeeded!") + log.Println("Push succeeded!") } case <-ctx.Done(): - queue.Close() + if err := queue.Close(); err != nil { + log.Printf("Close failed: %s\n", err) + } break loop } } @@ -55,7 +58,7 @@ func Example_consume() { addr := "amqp://guest:guest@localhost:5672/" queue := New(queueName, addr) - // Give the connection sometime to setup + // Give the connection sometime to set up <-time.After(time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) @@ -63,7 +66,7 @@ func Example_consume() { deliveries, err := queue.Consume() if err != nil { - fmt.Printf("Could not start consuming: %s\n", err) + log.Printf("Could not start consuming: %s\n", err) return } @@ -78,19 +81,22 @@ func Example_consume() { for { select { case <-ctx.Done(): - queue.Close() + err := queue.Close() + if err != nil { + log.Printf("Close failed: %s\n", err) + } return case amqErr := <-chClosedCh: // This case handles the event of closed channel e.g. abnormal shutdown - fmt.Printf("AMQP Channel closed due to: %s\n", amqErr) + log.Printf("AMQP Channel closed due to: %s\n", amqErr) deliveries, err = queue.Consume() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library - fmt.Println("Error trying to consume, will try again") + log.Println("Error trying to consume, will try again") continue } @@ -101,16 +107,21 @@ func Example_consume() { case delivery := <-deliveries: // Ack a message every 2 seconds - fmt.Printf("Received message: %s\n", delivery.Body) + log.Printf("Received message: %s\n", delivery.Body) if err := delivery.Ack(false); err != nil { - fmt.Printf("Error acknowledging message: %s\n", err) + log.Printf("Error acknowledging message: %s\n", err) } <-time.After(time.Second * 2) } } } +// Client is the base struct for handling connection recovery, consumption and +// publishing. Note that this struct has an internal mutex to safeguard against +// data races. As you develop and iterate over this example, you may need to add +// further locks, or safeguards, to keep your application safe from data races type Client struct { + m *sync.Mutex queueName string logger *log.Logger connection *amqp.Connection @@ -143,6 +154,7 @@ var ( // attempts to connect to the server. func New(queueName, addr string) *Client { client := Client{ + m: &sync.Mutex{}, logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), @@ -155,7 +167,10 @@ func New(queueName, addr string) *Client { // notifyConnClose, and then continuously attempt to reconnect. func (client *Client) handleReconnect(addr string) { for { + client.m.Lock() client.isReady = false + client.m.Unlock() + client.logger.Println("Attempting to connect") conn, err := client.connect(addr) @@ -194,7 +209,9 @@ func (client *Client) connect(addr string) (*amqp.Connection, error) { // and then continuously attempt to re-initialize both channels func (client *Client) handleReInit(conn *amqp.Connection) bool { for { + client.m.Lock() client.isReady = false + client.m.Unlock() err := client.init(conn) @@ -251,7 +268,9 @@ func (client *Client) init(conn *amqp.Connection) error { } client.changeChannel(ch) + client.m.Lock() client.isReady = true + client.m.Unlock() client.logger.Println("Setup!") return nil @@ -275,13 +294,16 @@ func (client *Client) changeChannel(channel *amqp.Channel) { client.channel.NotifyPublish(client.notifyConfirm) } -// Push will push data onto the queue, and wait for a confirm. -// This will block until the server sends a confirm. Errors are +// Push will push data onto the queue, and wait for a confirmation. +// This will block until the server sends a confirmation. Errors are // only returned if the push action itself fails, see UnsafePush. func (client *Client) Push(data []byte) error { + client.m.Lock() if !client.isReady { + client.m.Unlock() return errors.New("failed to push: not connected") } + client.m.Unlock() for { err := client.UnsafePush(data) if err != nil { @@ -306,9 +328,12 @@ func (client *Client) Push(data []byte) error { // No guarantees are provided for whether the server will // receive the message. func (client *Client) UnsafePush(data []byte) error { + client.m.Lock() if !client.isReady { + client.m.Unlock() return errNotConnected } + client.m.Unlock() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -331,13 +356,16 @@ func (client *Client) UnsafePush(data []byte) error { // successfully processed, or delivery.Nack when it fails. // Ignoring this will cause data to build up on the server. func (client *Client) Consume() (<-chan amqp.Delivery, error) { + client.m.Lock() if !client.isReady { + client.m.Unlock() return nil, errNotConnected } + client.m.Unlock() if err := client.channel.Qos( 1, // prefetchCount - 0, // prefrechSize + 0, // prefetchSize false, // global ); err != nil { return nil, err @@ -356,6 +384,11 @@ func (client *Client) Consume() (<-chan amqp.Delivery, error) { // Close will cleanly shut down the channel and connection. func (client *Client) Close() error { + client.m.Lock() + // we read and write isReady in two locations, so we grab the lock and hold onto + // it until we are finished + defer client.m.Unlock() + if !client.isReady { return errAlreadyClosed } From 441c55d04b793befb9cf84dd924687fbc3e94919 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 7 May 2024 11:20:21 -0700 Subject: [PATCH 2/2] Finish up example Client --- _examples/client/client.go | 433 +++++++++++++++++++++++++++++++++ _examples/consumer/consumer.go | 5 + 2 files changed, 438 insertions(+) create mode 100644 _examples/client/client.go diff --git a/_examples/client/client.go b/_examples/client/client.go new file mode 100644 index 0000000..58cf883 --- /dev/null +++ b/_examples/client/client.go @@ -0,0 +1,433 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package main + +import ( + "context" + "errors" + "log" + "os" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// This exports a Client object that wraps this library. It +// automatically reconnects when the connection fails, and +// blocks all pushes until the connection succeeds. It also +// confirms every outgoing message, so none are lost. +// It doesn't automatically ack each message, but leaves that +// to the parent process, since it is usage-dependent. +// +// Try running this in one terminal, and rabbitmq-server in another. +// +// Stop & restart RabbitMQ to see how the queue reacts. +func publish(done chan struct{}) { + queueName := "job_queue" + addr := "amqp://guest:guest@localhost:5672/" + queue := New(queueName, addr) + message := []byte("message") + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*20)) + defer cancel() +loop: + for { + select { + // Attempt to push a message every 2 seconds + case <-time.After(time.Second * 2): + if err := queue.Push(message); err != nil { + queue.errlog.Printf("push failed: %s\n", err) + } else { + queue.infolog.Println("push succeeded") + } + case <-ctx.Done(): + if err := queue.Close(); err != nil { + queue.errlog.Printf("close failed: %s\n", err) + } + break loop + } + } + + close(done) +} + +func consume(done chan struct{}) { + queueName := "job_queue" + addr := "amqp://guest:guest@localhost:5672/" + queue := New(queueName, addr) + + // Give the connection sometime to set up + <-time.After(time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*25) + defer cancel() + + deliveries, err := queue.Consume() + if err != nil { + queue.errlog.Printf("could not start consuming: %s\n", err) + return + } + + // This channel will receive a notification when a channel closed event + // happens. This must be different from Client.notifyChanClose because the + // library sends only one notification and Client.notifyChanClose already has + // a receiver in handleReconnect(). + // Recommended to make it buffered to avoid deadlocks + chClosedCh := make(chan *amqp.Error, 1) + queue.channel.NotifyClose(chClosedCh) + +loop: + for { + select { + case <-ctx.Done(): + err := queue.Close() + if err != nil { + queue.errlog.Printf("close failed: %s\n", err) + } + break loop + + case amqErr := <-chClosedCh: + // This case handles the event of closed channel e.g. abnormal shutdown + queue.errlog.Printf("AMQP Channel closed due to: %s\n", amqErr) + + deliveries, err = queue.Consume() + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + queue.errlog.Println("error trying to consume, will try again") + continue + } + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + chClosedCh = make(chan *amqp.Error, 1) + queue.channel.NotifyClose(chClosedCh) + + case delivery := <-deliveries: + // Ack a message every 2 seconds + queue.infolog.Printf("received message: %s\n", delivery.Body) + if err := delivery.Ack(false); err != nil { + queue.errlog.Printf("error acknowledging message: %s\n", err) + } + <-time.After(time.Second * 2) + } + } + + close(done) +} + +// Client is the base struct for handling connection recovery, consumption and +// publishing. Note that this struct has an internal mutex to safeguard against +// data races. As you develop and iterate over this example, you may need to add +// further locks, or safeguards, to keep your application safe from data races +type Client struct { + m *sync.Mutex + queueName string + infolog *log.Logger + errlog *log.Logger + connection *amqp.Connection + channel *amqp.Channel + done chan bool + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error + notifyConfirm chan amqp.Confirmation + isReady bool +} + +const ( + // When reconnecting to the server after connection failure + reconnectDelay = 5 * time.Second + + // When setting up the channel after a channel exception + reInitDelay = 2 * time.Second + + // When resending messages the server didn't confirm + resendDelay = 5 * time.Second +) + +var ( + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) + +// New creates a new consumer state instance, and automatically +// attempts to connect to the server. +func New(queueName, addr string) *Client { + client := Client{ + m: &sync.Mutex{}, + infolog: log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix), + errlog: log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix), + queueName: queueName, + done: make(chan bool), + } + go client.handleReconnect(addr) + return &client +} + +// handleReconnect will wait for a connection error on +// notifyConnClose, and then continuously attempt to reconnect. +func (client *Client) handleReconnect(addr string) { + for { + client.m.Lock() + client.isReady = false + client.m.Unlock() + + client.infolog.Println("attempting to connect") + + conn, err := client.connect(addr) + + if err != nil { + client.errlog.Println("failed to connect. Retrying...") + + select { + case <-client.done: + return + case <-time.After(reconnectDelay): + } + continue + } + + if done := client.handleReInit(conn); done { + break + } + } +} + +// connect will create a new AMQP connection +func (client *Client) connect(addr string) (*amqp.Connection, error) { + conn, err := amqp.Dial(addr) + + if err != nil { + return nil, err + } + + client.changeConnection(conn) + client.infolog.Println("connected") + return conn, nil +} + +// handleReInit will wait for a channel error +// and then continuously attempt to re-initialize both channels +func (client *Client) handleReInit(conn *amqp.Connection) bool { + for { + client.m.Lock() + client.isReady = false + client.m.Unlock() + + err := client.init(conn) + + if err != nil { + client.errlog.Println("failed to initialize channel, retrying...") + + select { + case <-client.done: + return true + case <-client.notifyConnClose: + client.infolog.Println("connection closed, reconnecting...") + return false + case <-time.After(reInitDelay): + } + continue + } + + select { + case <-client.done: + return true + case <-client.notifyConnClose: + client.infolog.Println("connection closed, reconnecting...") + return false + case <-client.notifyChanClose: + client.infolog.Println("channel closed, re-running init...") + } + } +} + +// init will initialize channel & declare queue +func (client *Client) init(conn *amqp.Connection) error { + ch, err := conn.Channel() + + if err != nil { + return err + } + + err = ch.Confirm(false) + + if err != nil { + return err + } + _, err = ch.QueueDeclare( + client.queueName, + false, // Durable + false, // Delete when unused + false, // Exclusive + false, // No-wait + nil, // Arguments + ) + + if err != nil { + return err + } + + client.changeChannel(ch) + client.m.Lock() + client.isReady = true + client.m.Unlock() + client.infolog.Println("client init done") + + return nil +} + +// changeConnection takes a new connection to the queue, +// and updates the close listener to reflect this. +func (client *Client) changeConnection(connection *amqp.Connection) { + client.connection = connection + client.notifyConnClose = make(chan *amqp.Error, 1) + client.connection.NotifyClose(client.notifyConnClose) +} + +// changeChannel takes a new channel to the queue, +// and updates the channel listeners to reflect this. +func (client *Client) changeChannel(channel *amqp.Channel) { + client.channel = channel + client.notifyChanClose = make(chan *amqp.Error, 1) + client.notifyConfirm = make(chan amqp.Confirmation, 1) + client.channel.NotifyClose(client.notifyChanClose) + client.channel.NotifyPublish(client.notifyConfirm) +} + +// Push will push data onto the queue, and wait for a confirmation. +// This will block until the server sends a confirmation. Errors are +// only returned if the push action itself fails, see UnsafePush. +func (client *Client) Push(data []byte) error { + client.m.Lock() + if !client.isReady { + client.m.Unlock() + return errors.New("failed to push: not connected") + } + client.m.Unlock() + for { + err := client.UnsafePush(data) + if err != nil { + client.errlog.Println("push failed. Retrying...") + select { + case <-client.done: + return errShutdown + case <-time.After(resendDelay): + } + continue + } + confirm := <-client.notifyConfirm + if confirm.Ack { + client.infolog.Printf("push confirmed [%d]", confirm.DeliveryTag) + return nil + } + } +} + +// UnsafePush will push to the queue without checking for +// confirmation. It returns an error if it fails to connect. +// No guarantees are provided for whether the server will +// receive the message. +func (client *Client) UnsafePush(data []byte) error { + client.m.Lock() + if !client.isReady { + client.m.Unlock() + return errNotConnected + } + client.m.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + return client.channel.PublishWithContext( + ctx, + "", // Exchange + client.queueName, // Routing key + false, // Mandatory + false, // Immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: data, + }, + ) +} + +// Consume will continuously put queue items on the channel. +// It is required to call delivery.Ack when it has been +// successfully processed, or delivery.Nack when it fails. +// Ignoring this will cause data to build up on the server. +func (client *Client) Consume() (<-chan amqp.Delivery, error) { + client.m.Lock() + if !client.isReady { + client.m.Unlock() + return nil, errNotConnected + } + client.m.Unlock() + + if err := client.channel.Qos( + 1, // prefetchCount + 0, // prefetchSize + false, // global + ); err != nil { + return nil, err + } + + return client.channel.Consume( + client.queueName, + "", // Consumer + false, // Auto-Ack + false, // Exclusive + false, // No-local + false, // No-Wait + nil, // Args + ) +} + +// Close will cleanly shut down the channel and connection. +func (client *Client) Close() error { + client.m.Lock() + // we read and write isReady in two locations, so we grab the lock and hold onto + // it until we are finished + defer client.m.Unlock() + + if !client.isReady { + return errAlreadyClosed + } + close(client.done) + err := client.channel.Close() + if err != nil { + return err + } + err = client.connection.Close() + if err != nil { + return err + } + + client.isReady = false + return nil +} + +func main() { + publishDone := make(chan struct{}) + consumeDone := make(chan struct{}) + + go publish(publishDone) + go consume(consumeDone) + + select { + case <-publishDone: + log.Println("publishing is done") + } + + select { + case <-consumeDone: + log.Println("consuming is done") + } + + log.Println("exiting") +} diff --git a/_examples/consumer/consumer.go b/_examples/consumer/consumer.go index 63a7b4f..bf9401c 100644 --- a/_examples/consumer/consumer.go +++ b/_examples/consumer/consumer.go @@ -1,3 +1,8 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// // This example declares a durable Exchange, an ephemeral (auto-delete) Queue, // binds the Queue to the Exchange with a binding key, and consumes every // message published to that Exchange with that routing key.