Skip to content

Commit

Permalink
refactor logging to use zerolog
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 22, 2024
1 parent fda9fa2 commit cb2892d
Show file tree
Hide file tree
Showing 27 changed files with 315 additions and 271 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.0
require (
github.com/FZambia/eagle v0.1.0
github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b
github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d
github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e
github.com/cristalhq/jwt/v5 v5.4.0
github.com/go-viper/mapstructure/v2 v2.2.1
Expand All @@ -14,7 +14,7 @@ require (
github.com/gorilla/securecookie v1.1.2
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-envparse v0.1.0
github.com/jackc/pgx/v5 v5.7.1
github.com/jackc/pgx/v5 v5.7.2
github.com/justinas/alice v1.2.0
github.com/mattn/go-isatty v0.0.20
github.com/nats-io/nats.go v1.38.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d h1:xr1lVKyxwcsfXtGBWLuiF0N7bRR5XuS3fN/lVuzImss=
github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d/go.mod h1:hKy1+IjduZJge3EDS3NSSZdTpWd4qhz+AlANNfyv/jE=
github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348 h1:F2UZujbtzJwNxcXt6BoXfXFe5NIUkVjYjlvPnzLf8pE=
github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348/go.mod h1:hKy1+IjduZJge3EDS3NSSZdTpWd4qhz+AlANNfyv/jE=
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e h1:+GbuEwJybDuHz6e8S17t/f0I4aTDnZjk37c0aGNFbwc=
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -82,8 +82,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs=
github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down
45 changes: 23 additions & 22 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/centrifugal/centrifugo/v5/internal/subsource"

"github.com/centrifugal/centrifuge"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -164,7 +165,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes
resp := &PublishResponse{}

if ch == "" {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel required for publish", nil))
log.Error().Err(errors.New("channel required for publish")).Msg("bad publish request")
resp.Error = ErrorBadRequest
return resp
}
Expand All @@ -182,7 +183,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes
}

if len(data) == 0 {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "data required for publish", nil))
log.Error().Err(errors.New("data required for publish")).Msg("bad publish request")
resp.Error = ErrorBadRequest
return resp
}
Expand Down Expand Up @@ -218,7 +219,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes
centrifuge.WithDelta(delta),
)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel", map[string]any{"error": err.Error(), "channel": cmd.Channel}))
log.Error().Err(err).Str("channel", cmd.Channel).Msg("error publishing data to channel")
resp.Error = ErrorInternal
return resp
}
Expand All @@ -245,7 +246,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
}

if len(channels) == 0 {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channels required for broadcast", nil))
log.Error().Err(errors.New("channels required for broadcast")).Msg("bad broadcast request")
resp.Error = ErrorBadRequest
return resp
}
Expand All @@ -263,7 +264,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
}

if len(data) == 0 {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "data required for broadcast", nil))
log.Error().Err(errors.New("data required for broadcast")).Msg("bad broadcast request")
resp.Error = ErrorBadRequest
return resp
}
Expand All @@ -281,7 +282,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
if ch == "" {
respError := ErrorBadRequest
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel can not be blank in broadcast", nil))
log.Error().Err(errors.New("channel can not be blank in broadcast")).Msg("bad broadcast request")
responses[i] = &PublishResponse{Error: respError}
return
}
Expand All @@ -290,14 +291,14 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
if err != nil {
respError := ErrorInternal
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error getting options for channel", map[string]any{"channel": ch, "error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error getting options for channel")
responses[i] = &PublishResponse{Error: respError}
return
}
if !found {
respError := ErrorUnknownChannel
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "can't find namespace for channel", map[string]any{"channel": ch}))
log.Error().Err(errors.New("channel not found")).Str("channel", ch).Msg("error getting options for channel")
responses[i] = &PublishResponse{Error: respError}
return
}
Expand Down Expand Up @@ -331,7 +332,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
} else {
respError := ErrorInternal
incError(h.config.Protocol, "publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel during broadcast", map[string]any{"channel": ch, "error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error publishing data to channel during broadcast")
resp.Error = respError
}
responses[i] = resp
Expand All @@ -353,7 +354,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib
channel := cmd.Channel

if channel == "" {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel required for subscribe", map[string]any{"channel": channel, "user": user}))
log.Error().Err(errors.New("channel required for subscribe")).Msg("bad subscribe request")
resp.Error = ErrorBadRequest
return resp
}
Expand Down Expand Up @@ -413,7 +414,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib
centrifuge.WithSubscribeHistoryMetaTTL(chOpts.HistoryMetaTTL.ToDuration()),
)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing user to a channel", map[string]any{"channel": channel, "user": user, "error": err.Error()}))
log.Error().Err(err).Str("channel", channel).Str("user", user).Msg("error subscribing user to channel")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -445,7 +446,7 @@ func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *Unsu

err := h.node.Unsubscribe(user, channel, centrifuge.WithUnsubscribeClient(cmd.Client), centrifuge.WithUnsubscribeSession(cmd.Session))
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unsubscribing user from a channel", map[string]any{"channel": channel, "user": user, "error": err.Error()}))
log.Error().Err(err).Str("channel", channel).Str("user", user).Msg("error unsubscribing user from channel")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -477,7 +478,7 @@ func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *Discon
centrifuge.WithDisconnectSession(cmd.Session),
centrifuge.WithDisconnectClientWhitelist(cmd.Whitelist))
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error disconnecting user", map[string]any{"user": cmd.User, "error": err.Error()}))
log.Error().Err(err).Str("user", user).Msg("error disconnecting user")
resp.Error = ErrorInternal
return resp
}
Expand All @@ -501,7 +502,7 @@ func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshRespo
centrifuge.WithRefreshInfo(cmd.Info),
)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error refreshing user", map[string]any{"user": cmd.User, "error": err.Error()}))
log.Error().Err(err).Str("user", user).Msg("error refreshing user")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceRe

presence, err := h.node.Presence(ch)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling presence", map[string]any{"error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error getting presence for channel")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -590,7 +591,7 @@ func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) *

stats, err := h.node.PresenceStats(cmd.Channel)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling presence stats", map[string]any{"error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error getting presence stats for channel")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -649,7 +650,7 @@ func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryRespo
centrifuge.WithReverse(cmd.Reverse),
)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling history", map[string]any{"error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error getting history for channel")
if errors.Is(err, centrifuge.ErrorUnrecoverablePosition) {
resp.Error = ErrorUnrecoverablePosition
return resp
Expand Down Expand Up @@ -715,7 +716,7 @@ func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) *

err = h.node.RemoveHistory(ch)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling history remove", map[string]any{"error": err.Error()}))
log.Error().Err(err).Str("channel", ch).Msg("error removing history for channel")
resp.Error = ErrorInternal
return resp
}
Expand All @@ -731,7 +732,7 @@ func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse {

info, err := h.node.Info()
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling info", map[string]any{"error": err.Error()}))
log.Error().Err(err).Msg("error calling info")
resp.Error = ErrorInternal
return resp
}
Expand Down Expand Up @@ -771,7 +772,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse {
resp := &RPCResponse{}

if cmd.Method == "" {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "rpc method required", map[string]any{}))
log.Error().Err(errors.New("rpc method required")).Msg("bad rpc request")
resp.Error = ErrorBadRequest
return resp
}
Expand All @@ -786,7 +787,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse {

data, err := handler(ctx, cmd.Params)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error sending rpc", map[string]any{"error": err.Error()}))
log.Error().Err(err).Str("method", cmd.Method).Msg("error calling rpc method")
resp.Error = toAPIErr(err)
return resp
}
Expand All @@ -807,7 +808,7 @@ func (h *Executor) Channels(ctx context.Context, cmd *ChannelsRequest) *Channels

channels, err := h.surveyCaller.Channels(ctx, cmd)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling channels", map[string]any{"error": err.Error()}))
log.Error().Err(err).Msg("error calling channels")
resp.Error = toAPIErr(err)
return resp
}
Expand Down
3 changes: 2 additions & 1 deletion internal/api/consuming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/centrifugal/centrifugo/v5/internal/apiproto"

"github.com/centrifugal/centrifuge"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -34,7 +35,7 @@ func NewConsumingHandler(n *centrifuge.Node, apiExecutor *Executor, c ConsumingH
}

func (h *ConsumingHandler) logNonRetryableConsumingError(err error, method string) {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming, skip message", map[string]any{"error": err.Error(), "method": method}))
log.Error().Err(err).Str("method", method).Msg("non retryable error during consuming, skip message")
}

func (h *ConsumingHandler) Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error {
Expand Down
Loading

0 comments on commit cb2892d

Please sign in to comment.