From 0e2a0628e8176e491a7e01a9046d046193d7521e Mon Sep 17 00:00:00 2001 From: Tomaz Zavrsnik Date: Tue, 11 Jun 2024 11:58:50 +0200 Subject: [PATCH 1/2] feat(executor/rabbitmq): Add support for RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduced new client type to use for testing RPC pattern using Direct Reply-to. Signed-off-by: Tomaž Završnik --- executors/rabbitmq/rabbitmq.go | 119 +++++++++++++++++++++------------ 1 file changed, 78 insertions(+), 41 deletions(-) diff --git a/executors/rabbitmq/rabbitmq.go b/executors/rabbitmq/rabbitmq.go index 2cffda41..1e367ea3 100644 --- a/executors/rabbitmq/rabbitmq.go +++ b/executors/rabbitmq/rabbitmq.go @@ -27,6 +27,7 @@ type Message struct { Persistent bool `json:"persistent" yaml:"persistent"` ContentType string `json:"content_type" yaml:"contentType"` ContentEncoding string `json:"content_encoding" yaml:"contentEncoding"` + ReplyTo string `json:"reply_to" yaml:"replyTo"` } // Executor represents a Test Exec @@ -103,7 +104,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro switch e.ClientType { case "publisher": workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir") - err := e.publishMessages(ctx, workdir) + err := e.publishMessages(ctx, workdir, nil, nil, false) if err != nil { result.Err = err.Error() return nil, err @@ -115,8 +116,35 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro result.Err = err.Error() return nil, err } + case "client": + var conn, ch, err = e.openChannel(ctx) + if err != nil { + result.Err = err.Error() + return nil, err + } + defer ch.Close() + defer conn.Close() + var delivery, consumererr = ch.Consume("amq.rabbitmq.reply-to", "", true, false, false, false, nil) + if consumererr != nil { + return nil, consumererr + } + venom.Info(ctx, "Reply consumer started.") + + workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir") + err = e.publishMessages(ctx, workdir, conn, ch, true) + if err != nil { + result.Err = err.Error() + return nil, err + } + + var d = <-delivery + body := []string{} + bodyJSON := []interface{}{} + body, bodyJSON = e.processMessage(ctx, d, true, body, bodyJSON) + result.Body = body + result.BodyJSON = bodyJSON default: - return nil, fmt.Errorf("clientType %q must be publisher or subscriber", e.ClientType) + return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client", e.ClientType) } elapsed := time.Since(start) @@ -125,26 +153,19 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro return result, nil } -func (e Executor) publishMessages(ctx context.Context, workdir string) error { - uri, err := amqp.ParseURI(e.Addrs) - if err != nil { - return err - } - uri.Username = e.User - uri.Password = e.Password - - conn, err := amqp.Dial(uri.String()) - if err != nil { - return err - } - venom.Debug(ctx, "connection opened") - defer conn.Close() - ch, err := conn.Channel() - if err != nil { - return err +func (e Executor) publishMessages(ctx context.Context, workdir string, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error { + var ch *amqp.Channel + var err error + if connection == nil || channel == nil { + ch, conn, err := e.openChannel(ctx) + if err != nil { + return err + } + defer conn.Close() + defer ch.Close() + } else { + ch = channel } - venom.Debug(ctx, "channel opened") - defer ch.Close() // If an exchange if defined routingKey := e.RoutingKey @@ -186,6 +207,10 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error { if !e.Messages[i].Persistent { deliveryMode = amqp.Transient } + var replyTo string = e.Messages[i].ReplyTo + if rpc { + replyTo = "amq.rabbitmq.reply-to" + } err = ch.Publish( e.Exchange, // exchange routingKey, // routing key @@ -195,6 +220,7 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error { DeliveryMode: deliveryMode, ContentType: e.Messages[i].ContentType, ContentEncoding: e.Messages[i].ContentEncoding, + ReplyTo: replyTo, Body: []byte(e.Messages[i].Value), Headers: e.Messages[i].Headers, }) @@ -208,26 +234,51 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error { return nil } -func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) { +func (e Executor) openChannel(ctx context.Context) (*amqp.Connection, *amqp.Channel, error) { uri, err := amqp.ParseURI(e.Addrs) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, err } uri.Username = e.User uri.Password = e.Password conn, err := amqp.Dial(uri.String()) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, err } venom.Debug(ctx, "connection opened") - defer conn.Close() ch, err := conn.Channel() if err != nil { - return nil, nil, nil, nil, err + return nil, nil, err } venom.Debug(ctx, "channel opened") + return conn, ch, nil +} + +func (e Executor) processMessage(ctx context.Context, msg amqp.Delivery, ok bool, body []string, bodyJSON []interface{}) ([]string, []interface{}) { + venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType) + venom.Debug(ctx, "receive: %s", string(msg.Body)) + body = append(body, string(msg.Body)) + + bodyJSONArray := []interface{}{} + if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil { + bodyJSONMap := map[string]interface{}{} + venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint + bodyJSON = append(bodyJSON, bodyJSONMap) + } else { + bodyJSON = append(bodyJSON, bodyJSONArray) + } + + return body, bodyJSON +} + +func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) { + conn, ch, err := e.openChannel(ctx) + if err != nil { + return nil, nil, nil, nil, err + } + defer conn.Close() defer ch.Close() q, err := ch.QueueDeclare( @@ -287,21 +338,7 @@ func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, headers = append(headers, msg.Headers) messages = append(messages, msg) - - venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType) - - venom.Debug(ctx, "receive: %s", string(msg.Body)) - body = append(body, string(msg.Body)) - - bodyJSONArray := []interface{}{} - if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil { - bodyJSONMap := map[string]interface{}{} - venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint - bodyJSON = append(bodyJSON, bodyJSONMap) - } else { - bodyJSON = append(bodyJSON, bodyJSONArray) - } - + body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON) } return body, bodyJSON, messages, headers, err From 8d1337cc436fc54990c001723de3392312fe6fde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Zavr=C5=A1nik?= Date: Wed, 14 Aug 2024 21:33:46 +0200 Subject: [PATCH 2/2] docs(executor/rabbitmq): Update RabbitMQ executor docs. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tomaž Završnik --- executors/rabbitmq/README.md | 44 ++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/executors/rabbitmq/README.md b/executors/rabbitmq/README.md index 46585a6e..88d18c84 100644 --- a/executors/rabbitmq/README.md +++ b/executors/rabbitmq/README.md @@ -1,6 +1,11 @@ # Venom - Executor RabbitMQ -Step to use publish / subscribe on a RabbitMQ +Three types of execution are supported: +- **publisher**: publish a message to a queue or to an exchange. +- **subscriber**: bind to a queue or an exchange (using routing key) and wait for message(s) to be consumed. +- **client**: publish a message to a queue or to an exchange and wait for the response message to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue. + +Steps to use publish / subscribe on a RabbitMQ: ## Input In your yaml file, you can use: @@ -12,7 +17,7 @@ In your yaml file, you can use: - user optional (default guest) - password optional (default guest) - - clientType mandatory (publisher or subscriber) + - clientType mandatory (publisher, subscriber or client) # RabbitMQ Q configuration - qName mandatory @@ -22,10 +27,10 @@ In your yaml file, you can use: - exchangeType optional (default "fanout") - exchange optional (default "") - # For subscriber only + # For subscriber and client only - messageLimit optional (default 1) - # For publisher only + # For publisher and client only - messages - durable optional (true or false) (default false) - contentType optional @@ -140,3 +145,34 @@ vars: - result.messages.messages0.contentencoding ShouldEqual utf8 - result.messages.messages0.contenttype ShouldEqual application/json ``` + +### Client (pubsub RPC) +```yaml +name: TestSuite RabbitMQ +vars: + addrs: 'amqp://localhost:5672' + user: + password: +testcases: + - name: RabbitMQ request/reply + steps: + - type: rabbitmq + addrs: "{{.addrs}}" + user: "{{.user}}" + password: "{{.password}}" + clientType: client + exchange: exchange_test + routingKey: pubsub_test + messages: + - value: '{"a": "b"}' + contentType: application/json + contentEncoding: utf8 + persistent: false + headers: + myCustomHeader: value + myCustomHeader2: value2 + messageLimit: 1 + assertions: + - result.bodyjson.bodyjson0 ShouldContainKey Status + - result.bodyjson.bodyjson0.Status ShouldEqual Succeeded +```