Skip to content

Commit

Permalink
Merge pull request #20 from Clarilab/add-more-handlers-functionality
Browse files Browse the repository at this point in the history
Add more handlers functionality
  • Loading branch information
nicoandrewss authored Feb 2, 2024
2 parents b847aa8 + f6a0b30 commit 672d657
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 31 deletions.
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ var ErrCouldNotBeRouted = errors.New("message could not be routed")
// ErrFailedToPublishChannelClosed occurs when the channel accessed but is closed.
var ErrFailedToPublishChannelClosed = errors.New("amqp channel is closed")

// ErrErrHandlerNotRegistered is returned when calling RemoveHandler with a handler that is not registered.
var ErrHandlerNotRegistered = errors.New("handler not registered")

type AMQPError clarimq.AMQPError

func (e *AMQPError) Error() string {
Expand Down
54 changes: 48 additions & 6 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
// InfiniteRetries is the maximum number for recovery or event delivery retries.
InfiniteRetries int64 = math.MaxInt64

handlerType string = "eventbus"
aggregateTypeKey string = "aggregate_type"
eventTypeKey string = "event_type"

Expand All @@ -47,7 +48,7 @@ type EventBus struct {
topic string
addr string
clientID string
registered map[eh.EventHandlerType]struct{}
registered map[eh.EventHandlerType]*clarimq.Consumer
registeredMu sync.RWMutex
errCh chan error
ctx context.Context //nolint:containedctx // intended use
Expand Down Expand Up @@ -80,7 +81,7 @@ func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Optio
addr: addr,
topic: topic,
clientID: clientID,
registered: map[eh.EventHandlerType]struct{}{},
registered: make(map[eh.EventHandlerType]*clarimq.Consumer),
errCh: make(chan error, errChBuffSize),
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -110,7 +111,7 @@ func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Optio

// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (*EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
return eh.EventHandlerType(handlerType)
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
Expand Down Expand Up @@ -176,9 +177,6 @@ func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, hand
return eh.ErrHandlerAlreadyAdded
}

// Register handler.
b.registered[handler.HandlerType()] = struct{}{}

consumer, err := b.declareConsumer(ctx, matcher, handler)
if err != nil {
return fmt.Errorf(errMessage, err)
Expand All @@ -188,9 +186,53 @@ func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, hand
b.wg.Add(1)
go b.handle(consumer)

// Register handler.
b.registered[handler.HandlerType()] = consumer

return nil
}

// RemoveHandler removes a handler from the event bus by type.
func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error {
const errMessage = "failed to remove handler: %w"

// Check handler existence.
b.registeredMu.RLock()
if _, ok := b.registered[handlerType]; !ok {
b.registeredMu.RUnlock()

return fmt.Errorf(errMessage, ErrHandlerNotRegistered)
}

b.registeredMu.RUnlock()

b.registeredMu.Lock()
defer b.registeredMu.Unlock()

if err := b.registered[handlerType].Close(); err != nil {
return fmt.Errorf(errMessage, err)
}

// Unregister handler.
delete(b.registered, handlerType)

return nil
}

// RegisteredHandlers returns a slice of all registered handler types.
func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType {
b.registeredMu.RLock()
defer b.registeredMu.RUnlock()

handlerTypes := make([]eh.EventHandlerType, 0, len(b.registered))

for handlerType := range b.registered {
handlerTypes = append(handlerTypes, handlerType)
}

return handlerTypes
}

// Close implements the Close method of the eventhorizon.EventBus interface.
func (b *EventBus) Close() error {
const errMessage = "failed to close event bus: %w"
Expand Down
144 changes: 128 additions & 16 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ package rabbitmq_test

import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/Clarilab/clarimq"
rabbitmq "github.com/Clarilab/eh-rabbitmq"
"github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/eventbus"
"github.com/looplab/eventhorizon/mocks"
"github.com/looplab/eventhorizon/uuid"
)

Expand All @@ -45,6 +49,55 @@ func Test_Integration_AddHandler(t *testing.T) { //nolint:paralleltest // must n
eventbus.TestAddHandler(t, bus)
}

func Test_Integration_RemoveHandler(t *testing.T) { //nolint:paralleltest // must not run in parallel
if testing.Short() {
t.Skip("skipping integration test")
}

bus, _, err := newTestEventBus("app-id")
if err != nil {
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus.Close() })

t.Run("happy path", func(t *testing.T) { //nolint:paralleltest // must not run in parallel
handler := mocks.NewEventHandler("handler-1")
queueName := fmt.Sprintf("%s_%s", "app-id", handler.HandlerType())

if err := bus.AddHandler(context.Background(), eventhorizon.MatchAll{}, handler); err != nil {
t.Fatal("there should be no error:", err)
}

if len(bus.RegisteredHandlers()) != 1 {
t.Fatal("there should be 1 registered handler")
}

if expectQueueConsumerCount(t, queueName, 1); err != nil {
t.Fatal("there should be 1 consumer on the queue")
}

if err := bus.RemoveHandler(eventhorizon.EventHandlerType("handler-1")); err != nil {
t.Fatal("there should be no error:", err)
}

if len(bus.RegisteredHandlers()) != 0 {
t.Fatal("there should be 0 registered handler")
}

if expectQueueConsumerCount(t, queueName, 0); err != nil {
t.Fatal("there should be 0 consumer on the queue")
}
})

t.Run("handler not registered", func(t *testing.T) { //nolint:paralleltest // must not run in parallel
err := bus.RemoveHandler(eventhorizon.EventHandlerType("handler-1"))
if !errors.Is(err, rabbitmq.ErrHandlerNotRegistered) {
t.Fatal("error should be: 'handler not registered'", err)
}
})
}

func Test_Integration_EventBus(t *testing.T) { //nolint:paralleltest // must not run in parallel
if testing.Short() {
t.Skip("skipping integration test")
Expand Down Expand Up @@ -108,10 +161,8 @@ func Test_Integration_ExternalConnections(t *testing.T) { //nolint:paralleltest

time.Sleep(waitTime) // wait for connections to be fully established

connCount := getConnectionCount(t)

if connCount != 6 { // expecting 6 connections: 1 publish and 1 consume connections per event bus
t.Fatal("there should be 6 connections, got:", connCount)
if err := expectConnectionCount(t, 6); err != nil {
t.Fatal("there should be 6 connections")
}
})

Expand Down Expand Up @@ -144,10 +195,8 @@ func Test_Integration_ExternalConnections(t *testing.T) { //nolint:paralleltest

time.Sleep(waitTime) // wait for connections to be fully established

connCount := getConnectionCount(t)

if connCount != 2 { // expecting 2 connections
t.Fatal("there should be 2 connections, got:", connCount)
if err := expectConnectionCount(t, 2); err != nil {
t.Fatal("there should be 2 connections")
}
})
}
Expand Down Expand Up @@ -184,7 +233,7 @@ func newTestEventBus(appID string) (*rabbitmq.EventBus, string, error) {
return bus, appID, nil
}

func getConnectionCount(t *testing.T) int {
func expectConnectionCount(t *testing.T, expected int) error {
t.Helper()

type rqmAPIResponse []struct{}
Expand All @@ -196,9 +245,76 @@ func getConnectionCount(t *testing.T) int {

request.SetBasicAuth("guest", "guest")

restClient := http.Client{}
var apiResp rqmAPIResponse

resp, err := restClient.Do(request)
do := func() bool {
return len(apiResp) == expected
}

return compare(t, request, &apiResp, do)
}

func expectQueueConsumerCount(t *testing.T, queueName string, expected int) error {
t.Helper()

type queue struct {
Name string `json:"name"`
Consumers int `json:"consumers"`
}

type rqmAPIResponse []queue

request, err := http.NewRequest(http.MethodGet, "http://localhost:15672/api/queues", nil)
if err != nil {
t.Fatal("there should be no error:", err)
}

request.SetBasicAuth("guest", "guest")

var apiResp rqmAPIResponse

do := func() bool {
for i := range apiResp {
return apiResp[i].Name == queueName && apiResp[i].Consumers == expected
}

return false
}

return compare(t, request, &apiResp, do)
}

func compare(t *testing.T, request *http.Request, result any, compareFn func() bool) error {
t.Helper()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

retries := 0

restClient := new(http.Client)

for range ticker.C {
retries++

pollRMQ(t, restClient, request, &result)

if compareFn() {
return nil
}

if retries >= 10 {
return errors.New("failed to compare after retry limit exceeded") //nolint:goerr113 // test code
}
}

return nil
}

func pollRMQ(t *testing.T, client *http.Client, request *http.Request, result any) {
t.Helper()

resp, err := client.Do(request)
if err != nil {
t.Fatal("there should be no error:", err)
}
Expand All @@ -214,12 +330,8 @@ func getConnectionCount(t *testing.T) int {
t.Fatal("there should be no error:", err)
}

var apiResp rqmAPIResponse

err = json.Unmarshal(buff.Bytes(), &apiResp)
err = json.Unmarshal(buff.Bytes(), &result)
if err != nil {
t.Fatal("there should be no error:", err)
}

return len(apiResp)
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ module github.com/Clarilab/eh-rabbitmq
go 1.21

require (
github.com/Clarilab/clarimq v1.2.0
github.com/Clarilab/clarimq v1.3.0
github.com/Clarilab/eh-tracygo v1.0.0
github.com/Clarilab/tracygo/v2 v2.1.0
github.com/google/uuid v1.5.0
github.com/google/uuid v1.6.0
github.com/looplab/eventhorizon v0.16.0
)

require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Clarilab/clarimq v1.2.0 h1:DO+wgdTzeW0wij9DJr/Jwk2uP4CG+nRMCgtfyDdCU/o=
github.com/Clarilab/clarimq v1.2.0/go.mod h1:xMNpFq9qb6MvssW9H3567qeO1ohfnrVedXJ50z27NsM=
github.com/Clarilab/clarimq v1.3.0 h1:+xUz8NVIa0x/clEtfYp1ZWGLp3HDT2jfy8XFN5G1ZYo=
github.com/Clarilab/clarimq v1.3.0/go.mod h1:xMNpFq9qb6MvssW9H3567qeO1ohfnrVedXJ50z27NsM=
github.com/Clarilab/eh-tracygo v1.0.0 h1:2KlaQfPO5ajqBinHK4Hu3vXauTG2q0W9jJ7Aim5pQFw=
github.com/Clarilab/eh-tracygo v1.0.0/go.mod h1:bC1/XWv6byi0+YzkvuoioeV8vPruyK0jHkFo5gN5+Qc=
github.com/Clarilab/tracygo/v2 v2.1.0 h1:5QU3NTTboXD9PK0RCvSe7hBnO/vafkX05S2IYe3m44s=
Expand All @@ -8,8 +8,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand All @@ -25,8 +25,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down

0 comments on commit 672d657

Please sign in to comment.