Skip to content

Commit

Permalink
Merge pull request #12 from Clarilab/add-external-connections-option
Browse files Browse the repository at this point in the history
add external connections option
  • Loading branch information
nicoandrewss authored Nov 9, 2023
2 parents a968fd4 + 9bed02d commit 235d14f
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 68 deletions.
18 changes: 8 additions & 10 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:

jobs:
ci:
name: Vet, lint and test
name: Vet, Lint, Test and Vulnerability Check
runs-on: ubuntu-latest
steps:
- name: Checkout repo
Expand All @@ -36,14 +36,12 @@ jobs:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
golangci-lint run --out-format=github-actions
- name: Integration Test
run: make test_integration

govulncheck:
runs-on: ubuntu-latest
name: Run govulncheck
steps:
- id: govulncheck
- name: Vulnerability Check
uses: golang/govulncheck-action@v1
with:
go-package: ./...
go-package: ./...
go-version-input: ">=1.21.0"
check-latest: true

- name: Integration Test
run: make test_integration
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ version: "3.4"
services:
rabbitmq:
container_name: rabbitmq
image: 'rabbitmq:3.12.2-alpine'
image: 'rabbitmq:3.11.9-management'
ports:
- "5672:5672"
- "15672:15672"
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 5s
Expand Down
75 changes: 40 additions & 35 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"time"

"github.com/Clarilab/clarimq"
ehtracygo "github.com/Clarilab/eh-tracygo"
"github.com/Clarilab/tracygo/v2"
"github.com/google/uuid"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/codec/json"
)

const (
// InfiniteRetries is the value to retry without discarding an event.
// InfiniteRetries is the maximum number for recovery or event delivery retries.
InfiniteRetries int64 = math.MaxInt64

aggregateTypeKey string = "aggregate_type"
Expand All @@ -43,28 +43,30 @@ const (
// EventBus is a local event bus that delegates handling of published events
// to all matching registered handlers, in order of registration.
type EventBus struct {
appID string
exchangeName string
topic string
addr string
clientID string
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan error
ctx context.Context //nolint:containedctx // intended use
cancel context.CancelFunc
wg sync.WaitGroup
eventCodec eh.EventCodec
publishConn *clarimq.Connection
publisher *clarimq.Publisher
consumeConn *clarimq.Connection
consumerMu sync.RWMutex
useRetry bool
maxRetries int64
queueDelays []time.Duration
logger *logger
loggers []*slog.Logger
publishingCache clarimq.PublishingCache
appID string
exchangeName string
topic string
addr string
clientID string
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan error
ctx context.Context //nolint:containedctx // intended use
cancel context.CancelFunc
wg sync.WaitGroup
eventCodec eh.EventCodec
publishConn *clarimq.Connection
publisher *clarimq.Publisher
consumeConn *clarimq.Connection
consumerMu sync.RWMutex
useRetry bool
maxRetries int64
maxRecoveryRetries int64
queueDelays []time.Duration
logger *logger
loggers []*slog.Logger
publishingCache clarimq.PublishingCache
tracer *tracygo.TracyGo
}

// NewEventBus creates an EventBus, with optional settings.
Expand All @@ -74,16 +76,19 @@ func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Optio
ctx, cancel := context.WithCancel(context.Background())

bus := &EventBus{
appID: appID,
exchangeName: exchange,
addr: addr,
topic: topic,
clientID: clientID,
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan error, errChBuffSize),
ctx: ctx,
cancel: cancel,
eventCodec: &json.EventCodec{},
appID: appID,
exchangeName: exchange,
addr: addr,
topic: topic,
clientID: clientID,
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan error, errChBuffSize),
ctx: ctx,
cancel: cancel,
eventCodec: &json.EventCodec{},
maxRetries: InfiniteRetries,
maxRecoveryRetries: InfiniteRetries,
tracer: tracygo.New(),
}

// Apply configuration options.
Expand Down Expand Up @@ -132,7 +137,7 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error {
clarimq.WithPublishOptionDeliveryMode(clarimq.PersistentDelivery),
clarimq.WithPublishOptionExchange(b.exchangeName),
clarimq.WithPublishOptionMessageID(uuid.NewString()),
clarimq.WithPublishOptionTracing(ehtracygo.FromContext(ctx)),
clarimq.WithPublishOptionTracing(b.tracer.CorrelationIDromContext(ctx)),
clarimq.WithPublishOptionHeaders(
map[string]any{
aggregateTypeKey: event.AggregateType().String(),
Expand Down
120 changes: 120 additions & 0 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package rabbitmq_test

import (
"bytes"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"

"github.com/Clarilab/clarimq"
rabbitmq "github.com/Clarilab/eh-rabbitmq"
"github.com/looplab/eventhorizon/eventbus"
"github.com/looplab/eventhorizon/uuid"
Expand All @@ -36,6 +40,8 @@ func Test_Integration_AddHandler(t *testing.T) { //nolint:paralleltest // must n
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus.Close() })

eventbus.TestAddHandler(t, bus)
}

Expand All @@ -49,11 +55,15 @@ func Test_Integration_EventBus(t *testing.T) { //nolint:paralleltest // must not
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus1.Close() })

bus2, _, err := newTestEventBus(appID)
if err != nil {
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus2.Close() })

t.Logf("using stream: %s_events", appID)

eventbus.AcceptanceTest(t, bus1, bus2, time.Second)
Expand All @@ -69,17 +79,87 @@ func Test_Integration_EventBusLoadTest(t *testing.T) { //nolint:paralleltest //
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus.Close() })

t.Logf("using stream: %s_events", appID)

eventbus.LoadTest(t, bus)
}

func Test_Integration_ExternalConnections(t *testing.T) { //nolint:paralleltest // must not run in parallel
amqpURI := "amqp://guest:guest@localhost:5672/"
waitTime := 5 * time.Second

t.Run("without external connections", func(t *testing.T) { //nolint:paralleltest // must not run in parallel
for i := 0; i < 3; i++ {
bus, err := rabbitmq.NewEventBus(
amqpURI,
"test-app",
uuid.New().String(),
"eh-rabbitmq-test",
"rabbit",
)
if err != nil {
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus.Close() })
}

time.Sleep(waitTime) // wait for connections to be fully established

connCount := getConnectionCount(t)

if connCount != 6 { // expecting 6 connections: 1 publish and 1 consume connections per event bus
t.Fatal("there should be 6 connections, got:", connCount)
}
})

t.Run("with external connections", func(t *testing.T) { //nolint:paralleltest // must not run in parallel
publishConn, err := clarimq.NewConnection(amqpURI)
if err != nil {
t.Fatal("there should be no error:", err)
}

consumeConn, err := clarimq.NewConnection(amqpURI)
if err != nil {
t.Fatal("there should be no error:", err)
}

for i := 0; i < 3; i++ {
bus, err := rabbitmq.NewEventBus(
"it does not matter what the uri is",
"test-app",
uuid.New().String(),
"eh-rabbitmq-test",
"rabbit",
rabbitmq.WithClariMQConnections(publishConn, consumeConn),
)
if err != nil {
t.Fatal("there should be no error:", err)
}

t.Cleanup(func() { bus.Close() })
}

time.Sleep(waitTime) // wait for connections to be fully established

connCount := getConnectionCount(t)

if connCount != 2 { // expecting 2 connections
t.Fatal("there should be 2 connections, got:", connCount)
}
})
}

func Benchmark_EventBus(b *testing.B) {
bus, appID, err := newTestEventBus("")
if err != nil {
b.Fatal("there should be no error:", err)
}

b.Cleanup(func() { bus.Close() })

b.Logf("using stream: %s_events", appID)

eventbus.Benchmark(b, bus)
Expand All @@ -103,3 +183,43 @@ func newTestEventBus(appID string) (*rabbitmq.EventBus, string, error) {

return bus, appID, nil
}

func getConnectionCount(t *testing.T) int {
t.Helper()

type rqmAPIResponse []struct{}

request, err := http.NewRequest(http.MethodGet, "http://localhost:15672/api/connections", nil)
if err != nil {
t.Fatal("there should be no error:", err)
}

request.SetBasicAuth("guest", "guest")

restClient := http.Client{}

resp, err := restClient.Do(request)
if err != nil {
t.Fatal("there should be no error:", err)
}

if resp.StatusCode != http.StatusOK {
t.Fatal("RabbitMQ management API should return status code 200")
}

buff := new(bytes.Buffer)

_, err = buff.ReadFrom(resp.Body)
if err != nil {
t.Fatal("there should be no error:", err)
}

var apiResp rqmAPIResponse

err = json.Unmarshal(buff.Bytes(), &apiResp)
if err != nil {
t.Fatal("there should be no error:", err)
}

return len(apiResp)
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ module github.com/Clarilab/eh-rabbitmq
go 1.21

require (
github.com/Clarilab/clarimq v0.3.1
github.com/Clarilab/clarimq v1.1.0
github.com/Clarilab/eh-tracygo v1.0.0
github.com/Clarilab/tracygo/v2 v2.1.0
github.com/google/uuid v1.4.0
github.com/looplab/eventhorizon v0.16.0
)

require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
github.com/Clarilab/clarimq v0.3.1 h1:qMb0I7zDXCGUyhJrjb+RuJI8iXQ8mhTz8Eqw4jZzFm8=
github.com/Clarilab/clarimq v0.3.1/go.mod h1:9WQEAIGXZdhTdhLP7fc/Pe+h+xQ6ZPI3QM5UhUFrqR8=
github.com/Clarilab/clarimq v1.1.0 h1:2/gk7QSMXq/DBZLmjNw+TXtWtPiAnShzfTcb5OHpchw=
github.com/Clarilab/clarimq v1.1.0/go.mod h1:9WQEAIGXZdhTdhLP7fc/Pe+h+xQ6ZPI3QM5UhUFrqR8=
github.com/Clarilab/eh-tracygo v1.0.0 h1:2KlaQfPO5ajqBinHK4Hu3vXauTG2q0W9jJ7Aim5pQFw=
github.com/Clarilab/eh-tracygo v1.0.0/go.mod h1:bC1/XWv6byi0+YzkvuoioeV8vPruyK0jHkFo5gN5+Qc=
github.com/Clarilab/tracygo/v2 v2.1.0 h1:5QU3NTTboXD9PK0RCvSe7hBnO/vafkX05S2IYe3m44s=
github.com/Clarilab/tracygo/v2 v2.1.0/go.mod h1:DTUarfWtuAsbaLsXnzlQDS5nkTzXcna7ZvLIqriw5Vk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -20,8 +22,8 @@ github.com/looplab/eventhorizon v0.16.0/go.mod h1:ym7NXMZtXypXMQgEavLerP3LfESP+W
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
Expand Down
Loading

0 comments on commit 235d14f

Please sign in to comment.