Skip to content

Commit

Permalink
Merge pull request #17 from Clarilab/fix-recovery-error+accept-logger…
Browse files Browse the repository at this point in the history
…-interface

fix: recovery error + accept logger interface
  • Loading branch information
nicoandrewss authored Jan 12, 2024
2 parents 0ac57d0 + f6d0690 commit a059ecf
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ">=1.21.0"

Expand Down
8 changes: 7 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func (e *AMQPError) Error() string {

// ErrRecoveryFailed occurs when the recovery failed after a connection loss.
type RecoveryFailedError struct {
err error
err error
connectionName string
}

// Error implements the Error method of the error interface.
Expand All @@ -38,6 +39,11 @@ func (e *RecoveryFailedError) Error() string {
return str
}

// ConnectionName returns the name of the connection that failed to recover.
func (e *RecoveryFailedError) ConnectionName() string {
return e.connectionName
}

// EventBusError is an async error containing the error returned from a handler and the event that it happened on.
// Its a wrapper around the eventhorizon.EventBusError with extra information about the handler.
type EventBusError struct {
Expand Down
3 changes: 1 addition & 2 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"sync"
"time"
Expand Down Expand Up @@ -64,7 +63,7 @@ type EventBus struct {
maxRecoveryRetries int64
queueDelays []time.Duration
logger *logger
loggers []*slog.Logger
loggers []clarimq.Logger
publishingCache clarimq.PublishingCache
tracer *tracygo.TracyGo
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Clarilab/eh-rabbitmq
go 1.21

require (
github.com/Clarilab/clarimq v1.1.0
github.com/Clarilab/clarimq v1.2.0
github.com/Clarilab/eh-tracygo v1.0.0
github.com/Clarilab/tracygo/v2 v2.1.0
github.com/google/uuid v1.4.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/Clarilab/clarimq v1.1.0 h1:2/gk7QSMXq/DBZLmjNw+TXtWtPiAnShzfTcb5OHpchw=
github.com/Clarilab/clarimq v1.1.0/go.mod h1:9WQEAIGXZdhTdhLP7fc/Pe+h+xQ6ZPI3QM5UhUFrqR8=
github.com/Clarilab/clarimq v1.1.1 h1:iRO3uoH4bNQRAAiHVq1QxUS15DSeH7rSeWeqWANMORc=
github.com/Clarilab/clarimq v1.1.1/go.mod h1:xMNpFq9qb6MvssW9H3567qeO1ohfnrVedXJ50z27NsM=
github.com/Clarilab/clarimq v1.2.0 h1:DO+wgdTzeW0wij9DJr/Jwk2uP4CG+nRMCgtfyDdCU/o=
github.com/Clarilab/clarimq v1.2.0/go.mod h1:xMNpFq9qb6MvssW9H3567qeO1ohfnrVedXJ50z27NsM=
github.com/Clarilab/eh-tracygo v1.0.0 h1:2KlaQfPO5ajqBinHK4Hu3vXauTG2q0W9jJ7Aim5pQFw=
github.com/Clarilab/eh-tracygo v1.0.0/go.mod h1:bC1/XWv6byi0+YzkvuoioeV8vPruyK0jHkFo5gN5+Qc=
github.com/Clarilab/tracygo/v2 v2.1.0 h1:5QU3NTTboXD9PK0RCvSe7hBnO/vafkX05S2IYe3m44s=
Expand Down
6 changes: 3 additions & 3 deletions logging.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package rabbitmq

import (
"log/slog"
"github.com/Clarilab/clarimq"
)

type logger struct {
loggers []*slog.Logger
loggers []clarimq.Logger
}

func newLogger(loggers []*slog.Logger) *logger {
func newLogger(loggers []clarimq.Logger) *logger {
return &logger{loggers}
}

Expand Down
3 changes: 1 addition & 2 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rabbitmq

import (
"log/slog"
"time"

"github.com/Clarilab/clarimq"
Expand All @@ -19,7 +18,7 @@ func WithEventCodec(codec eh.EventCodec) Option {
}

// WithLogging enables logging to the given loggers.
func WithLogging(loggers []*slog.Logger) Option {
func WithLogging(loggers ...clarimq.Logger) Option {
return func(b *EventBus) {
b.loggers = loggers
}
Expand Down
20 changes: 11 additions & 9 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ func (b *EventBus) setupPublishConnection() (err error) { //nolint:nonamedreturn
const errMessage = "failed to setup publish connection: %w"

if b.publishConn == nil {
if b.publishConn, err = b.setupConnection(
if b.publishConn, err = b.establishConnection(
clarimq.WithConnectionOptionConnectionName(fmt.Sprintf("%s_publish_connection", b.appID)),
); err != nil {
return fmt.Errorf(errMessage, err)
}
}

b.watchConnectionErrors(b.publishConn)

if len(b.loggers) > 0 { // in case a user already applied loggers to the connection, but not the event bus.
b.publishConn.SetLoggers(b.loggers)
b.publishConn.SetLoggers(b.loggers...)
}

b.publishConn.SetReturnHandler(b.returnHandler)
Expand All @@ -70,32 +72,32 @@ func (b *EventBus) setupConsumeConnection() (err error) { //nolint:nonamedreturn
const errMessage = "failed to setup consume connection: %w"

if b.consumeConn == nil {
if b.consumeConn, err = b.setupConnection(
if b.consumeConn, err = b.establishConnection(
clarimq.WithConnectionOptionConnectionName(fmt.Sprintf("%s_consume_connection", b.appID)),
); err != nil {
return fmt.Errorf(errMessage, err)
}
}

b.watchConnectionErrors(b.consumeConn)

if len(b.loggers) > 0 { // in case a user already applied loggers to the connection, but not the event bus.
b.publishConn.SetLoggers(b.loggers)
b.publishConn.SetLoggers(b.loggers...)
}

b.consumeConn.SetMaxRecoveryRetries(int(b.maxRecoveryRetries))

return err
}

func (b *EventBus) setupConnection(options ...clarimq.ConnectionOption) (*clarimq.Connection, error) {
const errMessage = "failed to setup connection: %w"
func (b *EventBus) establishConnection(options ...clarimq.ConnectionOption) (*clarimq.Connection, error) {
const errMessage = "failed to establish connection: %w"

conn, err := clarimq.NewConnection(b.addr, options...)
if err != nil {
return nil, fmt.Errorf(errMessage, err)
}

b.watchConnectionErrors(conn)

return conn, nil
}

Expand All @@ -116,7 +118,7 @@ func (b *EventBus) watchConnectionErrors(conn *clarimq.Connection) {
b.errCh <- &err

case errors.As(err, &recoveryFailed):
b.errCh <- &RecoveryFailedError{err}
b.errCh <- &RecoveryFailedError{err, recoveryFailed.ConnectionName}

default:
b.errCh <- err
Expand Down

0 comments on commit a059ecf

Please sign in to comment.