Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor/rabbitmq): Add support for RPC #798

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions executors/rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Venom - Executor RabbitMQ

Step to use publish / subscribe on a RabbitMQ
Three types of execution are supported:
- **publisher**: publish a message to a queue or to an exchange.
- **subscriber**: bind to a queue or an exchange (using routing key) and wait for message(s) to be consumed.
- **client**: publish a message to a queue or to an exchange and wait for the response message to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue.

Steps to use publish / subscribe on a RabbitMQ:

## Input
In your yaml file, you can use:
Expand All @@ -12,7 +17,7 @@ In your yaml file, you can use:
- user optional (default guest)
- password optional (default guest)

- clientType mandatory (publisher or subscriber)
- clientType mandatory (publisher, subscriber or client)

# RabbitMQ Q configuration
- qName mandatory
Expand All @@ -22,10 +27,10 @@ In your yaml file, you can use:
- exchangeType optional (default "fanout")
- exchange optional (default "")

# For subscriber only
# For subscriber and client only
- messageLimit optional (default 1)

# For publisher only
# For publisher and client only
- messages
- durable optional (true or false) (default false)
- contentType optional
Expand Down Expand Up @@ -140,3 +145,34 @@ vars:
- result.messages.messages0.contentencoding ShouldEqual utf8
- result.messages.messages0.contenttype ShouldEqual application/json
```

### Client (pubsub RPC)
```yaml
name: TestSuite RabbitMQ
vars:
addrs: 'amqp://localhost:5672'
user:
password:
testcases:
- name: RabbitMQ request/reply
steps:
- type: rabbitmq
addrs: "{{.addrs}}"
user: "{{.user}}"
password: "{{.password}}"
clientType: client
exchange: exchange_test
routingKey: pubsub_test
messages:
- value: '{"a": "b"}'
contentType: application/json
contentEncoding: utf8
persistent: false
headers:
myCustomHeader: value
myCustomHeader2: value2
messageLimit: 1
assertions:
- result.bodyjson.bodyjson0 ShouldContainKey Status
- result.bodyjson.bodyjson0.Status ShouldEqual Succeeded
```
119 changes: 78 additions & 41 deletions executors/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Message struct {
Persistent bool `json:"persistent" yaml:"persistent"`
ContentType string `json:"content_type" yaml:"contentType"`
ContentEncoding string `json:"content_encoding" yaml:"contentEncoding"`
ReplyTo string `json:"reply_to" yaml:"replyTo"`
}

// Executor represents a Test Exec
Expand Down Expand Up @@ -103,7 +104,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
switch e.ClientType {
case "publisher":
workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err := e.publishMessages(ctx, workdir)
err := e.publishMessages(ctx, workdir, nil, nil, false)
if err != nil {
result.Err = err.Error()
return nil, err
Expand All @@ -115,8 +116,35 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
result.Err = err.Error()
return nil, err
}
case "client":
var conn, ch, err = e.openChannel(ctx)
if err != nil {
result.Err = err.Error()
return nil, err
}
defer ch.Close()
defer conn.Close()
var delivery, consumererr = ch.Consume("amq.rabbitmq.reply-to", "", true, false, false, false, nil)
if consumererr != nil {
return nil, consumererr
}
venom.Info(ctx, "Reply consumer started.")

workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err = e.publishMessages(ctx, workdir, conn, ch, true)
if err != nil {
result.Err = err.Error()
return nil, err
}

var d = <-delivery
body := []string{}
bodyJSON := []interface{}{}
body, bodyJSON = e.processMessage(ctx, d, true, body, bodyJSON)
result.Body = body
result.BodyJSON = bodyJSON
default:
return nil, fmt.Errorf("clientType %q must be publisher or subscriber", e.ClientType)
return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client", e.ClientType)
}

elapsed := time.Since(start)
Expand All @@ -125,26 +153,19 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
return result, nil
}

func (e Executor) publishMessages(ctx context.Context, workdir string) error {
uri, err := amqp.ParseURI(e.Addrs)
if err != nil {
return err
}
uri.Username = e.User
uri.Password = e.Password

conn, err := amqp.Dial(uri.String())
if err != nil {
return err
}
venom.Debug(ctx, "connection opened")
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
func (e Executor) publishMessages(ctx context.Context, workdir string, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error {
var ch *amqp.Channel
var err error
if connection == nil || channel == nil {
ch, conn, err := e.openChannel(ctx)
if err != nil {
return err
}
defer conn.Close()
defer ch.Close()
} else {
ch = channel
}
venom.Debug(ctx, "channel opened")
defer ch.Close()

// If an exchange if defined
routingKey := e.RoutingKey
Expand Down Expand Up @@ -186,6 +207,10 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
if !e.Messages[i].Persistent {
deliveryMode = amqp.Transient
}
var replyTo string = e.Messages[i].ReplyTo
if rpc {
replyTo = "amq.rabbitmq.reply-to"
}
err = ch.Publish(
e.Exchange, // exchange
routingKey, // routing key
Expand All @@ -195,6 +220,7 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
DeliveryMode: deliveryMode,
ContentType: e.Messages[i].ContentType,
ContentEncoding: e.Messages[i].ContentEncoding,
ReplyTo: replyTo,
Body: []byte(e.Messages[i].Value),
Headers: e.Messages[i].Headers,
})
Expand All @@ -208,26 +234,51 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
return nil
}

func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
func (e Executor) openChannel(ctx context.Context) (*amqp.Connection, *amqp.Channel, error) {
uri, err := amqp.ParseURI(e.Addrs)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
uri.Username = e.User
uri.Password = e.Password

conn, err := amqp.Dial(uri.String())
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
venom.Debug(ctx, "connection opened")
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
venom.Debug(ctx, "channel opened")
return conn, ch, nil
}

func (e Executor) processMessage(ctx context.Context, msg amqp.Delivery, ok bool, body []string, bodyJSON []interface{}) ([]string, []interface{}) {
venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType)
venom.Debug(ctx, "receive: %s", string(msg.Body))
body = append(body, string(msg.Body))

bodyJSONArray := []interface{}{}
if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil {
bodyJSONMap := map[string]interface{}{}
venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint
bodyJSON = append(bodyJSON, bodyJSONMap)
} else {
bodyJSON = append(bodyJSON, bodyJSONArray)
}

return body, bodyJSON
}

func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
conn, ch, err := e.openChannel(ctx)
if err != nil {
return nil, nil, nil, nil, err
}
defer conn.Close()
defer ch.Close()

q, err := ch.QueueDeclare(
Expand Down Expand Up @@ -287,21 +338,7 @@ func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{},

headers = append(headers, msg.Headers)
messages = append(messages, msg)

venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType)

venom.Debug(ctx, "receive: %s", string(msg.Body))
body = append(body, string(msg.Body))

bodyJSONArray := []interface{}{}
if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil {
bodyJSONMap := map[string]interface{}{}
venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint
bodyJSON = append(bodyJSON, bodyJSONMap)
} else {
bodyJSON = append(bodyJSON, bodyJSONArray)
}

body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON)
}

return body, bodyJSON, messages, headers, err
Expand Down