From cb2892d8b0a40e9cd00839d4ba1ad1755b03f13d Mon Sep 17 00:00:00 2001 From: FZambia Date: Sun, 22 Dec 2024 21:36:43 +0200 Subject: [PATCH] refactor logging to use zerolog --- go.mod | 4 +- go.sum | 8 +- internal/api/api.go | 45 ++++----- internal/api/consuming.go | 3 +- internal/api/handler.go | 43 +++++---- internal/app/node.go | 4 +- internal/app/run.go | 16 ++-- internal/client/handler.go | 106 +++++++++++---------- internal/consuming/consuming.go | 12 +-- internal/consuming/kafka.go | 35 +++---- internal/consuming/postgresql.go | 17 ++-- internal/logging/logging.go | 96 +++++++++++-------- internal/middleware/connlimit.go | 4 +- internal/natsbroker/broker.go | 6 +- internal/proxy/connect_handler.go | 12 ++- internal/proxy/publish_handler.go | 8 +- internal/proxy/refresh_handler.go | 8 +- internal/proxy/rpc_handler.go | 12 ++- internal/proxy/sub_refresh_handler.go | 10 +- internal/proxy/subscribe_handler.go | 10 +- internal/proxy/subscribe_stream_handler.go | 10 +- internal/unigrpc/grpc.go | 9 +- internal/unihttpstream/handler.go | 16 ++-- internal/unisse/handler.go | 18 ++-- internal/uniws/handler.go | 16 ++-- internal/usage/usage.go | 40 ++++---- internal/wt/handler.go | 18 ++-- 27 files changed, 315 insertions(+), 271 deletions(-) diff --git a/go.mod b/go.mod index 8011df35ab..49c6dccaa5 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.0 require ( github.com/FZambia/eagle v0.1.0 github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b - github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d + github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348 github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e github.com/cristalhq/jwt/v5 v5.4.0 github.com/go-viper/mapstructure/v2 v2.2.1 @@ -14,7 +14,7 @@ require ( github.com/gorilla/securecookie v1.1.2 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-envparse v0.1.0 - github.com/jackc/pgx/v5 v5.7.1 + github.com/jackc/pgx/v5 v5.7.2 github.com/justinas/alice v1.2.0 github.com/mattn/go-isatty v0.0.20 github.com/nats-io/nats.go v1.38.0 diff --git a/go.sum b/go.sum index 2c391e0baf..45b4072dfd 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d h1:xr1lVKyxwcsfXtGBWLuiF0N7bRR5XuS3fN/lVuzImss= -github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d/go.mod h1:hKy1+IjduZJge3EDS3NSSZdTpWd4qhz+AlANNfyv/jE= +github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348 h1:F2UZujbtzJwNxcXt6BoXfXFe5NIUkVjYjlvPnzLf8pE= +github.com/centrifugal/centrifuge v0.33.5-0.20241222164738-7c718f40f348/go.mod h1:hKy1+IjduZJge3EDS3NSSZdTpWd4qhz+AlANNfyv/jE= github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e h1:+GbuEwJybDuHz6e8S17t/f0I4aTDnZjk37c0aGNFbwc= github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -82,8 +82,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= -github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/internal/api/api.go b/internal/api/api.go index c293a89916..ae2660224e 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -13,6 +13,7 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/subsource" "github.com/centrifugal/centrifuge" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -164,7 +165,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes resp := &PublishResponse{} if ch == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel required for publish", nil)) + log.Error().Err(errors.New("channel required for publish")).Msg("bad publish request") resp.Error = ErrorBadRequest return resp } @@ -182,7 +183,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes } if len(data) == 0 { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "data required for publish", nil)) + log.Error().Err(errors.New("data required for publish")).Msg("bad publish request") resp.Error = ErrorBadRequest return resp } @@ -218,7 +219,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes centrifuge.WithDelta(delta), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel", map[string]any{"error": err.Error(), "channel": cmd.Channel})) + log.Error().Err(err).Str("channel", cmd.Channel).Msg("error publishing data to channel") resp.Error = ErrorInternal return resp } @@ -245,7 +246,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc } if len(channels) == 0 { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channels required for broadcast", nil)) + log.Error().Err(errors.New("channels required for broadcast")).Msg("bad broadcast request") resp.Error = ErrorBadRequest return resp } @@ -263,7 +264,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc } if len(data) == 0 { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "data required for broadcast", nil)) + log.Error().Err(errors.New("data required for broadcast")).Msg("bad broadcast request") resp.Error = ErrorBadRequest return resp } @@ -281,7 +282,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc if ch == "" { respError := ErrorBadRequest incError(h.config.Protocol, "broadcast_publish", respError.Code) - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel can not be blank in broadcast", nil)) + log.Error().Err(errors.New("channel can not be blank in broadcast")).Msg("bad broadcast request") responses[i] = &PublishResponse{Error: respError} return } @@ -290,14 +291,14 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc if err != nil { respError := ErrorInternal incError(h.config.Protocol, "broadcast_publish", respError.Code) - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error getting options for channel", map[string]any{"channel": ch, "error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error getting options for channel") responses[i] = &PublishResponse{Error: respError} return } if !found { respError := ErrorUnknownChannel incError(h.config.Protocol, "broadcast_publish", respError.Code) - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "can't find namespace for channel", map[string]any{"channel": ch})) + log.Error().Err(errors.New("channel not found")).Str("channel", ch).Msg("error getting options for channel") responses[i] = &PublishResponse{Error: respError} return } @@ -331,7 +332,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc } else { respError := ErrorInternal incError(h.config.Protocol, "publish", respError.Code) - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel during broadcast", map[string]any{"channel": ch, "error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error publishing data to channel during broadcast") resp.Error = respError } responses[i] = resp @@ -353,7 +354,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib channel := cmd.Channel if channel == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel required for subscribe", map[string]any{"channel": channel, "user": user})) + log.Error().Err(errors.New("channel required for subscribe")).Msg("bad subscribe request") resp.Error = ErrorBadRequest return resp } @@ -413,7 +414,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib centrifuge.WithSubscribeHistoryMetaTTL(chOpts.HistoryMetaTTL.ToDuration()), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing user to a channel", map[string]any{"channel": channel, "user": user, "error": err.Error()})) + log.Error().Err(err).Str("channel", channel).Str("user", user).Msg("error subscribing user to channel") resp.Error = ErrorInternal return resp } @@ -445,7 +446,7 @@ func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *Unsu err := h.node.Unsubscribe(user, channel, centrifuge.WithUnsubscribeClient(cmd.Client), centrifuge.WithUnsubscribeSession(cmd.Session)) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unsubscribing user from a channel", map[string]any{"channel": channel, "user": user, "error": err.Error()})) + log.Error().Err(err).Str("channel", channel).Str("user", user).Msg("error unsubscribing user from channel") resp.Error = ErrorInternal return resp } @@ -477,7 +478,7 @@ func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *Discon centrifuge.WithDisconnectSession(cmd.Session), centrifuge.WithDisconnectClientWhitelist(cmd.Whitelist)) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error disconnecting user", map[string]any{"user": cmd.User, "error": err.Error()})) + log.Error().Err(err).Str("user", user).Msg("error disconnecting user") resp.Error = ErrorInternal return resp } @@ -501,7 +502,7 @@ func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshRespo centrifuge.WithRefreshInfo(cmd.Info), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error refreshing user", map[string]any{"user": cmd.User, "error": err.Error()})) + log.Error().Err(err).Str("user", user).Msg("error refreshing user") resp.Error = ErrorInternal return resp } @@ -539,7 +540,7 @@ func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceRe presence, err := h.node.Presence(ch) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling presence", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error getting presence for channel") resp.Error = ErrorInternal return resp } @@ -590,7 +591,7 @@ func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) * stats, err := h.node.PresenceStats(cmd.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling presence stats", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error getting presence stats for channel") resp.Error = ErrorInternal return resp } @@ -649,7 +650,7 @@ func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryRespo centrifuge.WithReverse(cmd.Reverse), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling history", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error getting history for channel") if errors.Is(err, centrifuge.ErrorUnrecoverablePosition) { resp.Error = ErrorUnrecoverablePosition return resp @@ -715,7 +716,7 @@ func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) * err = h.node.RemoveHistory(ch) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling history remove", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("channel", ch).Msg("error removing history for channel") resp.Error = ErrorInternal return resp } @@ -731,7 +732,7 @@ func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse { info, err := h.node.Info() if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling info", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error calling info") resp.Error = ErrorInternal return resp } @@ -771,7 +772,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse { resp := &RPCResponse{} if cmd.Method == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "rpc method required", map[string]any{})) + log.Error().Err(errors.New("rpc method required")).Msg("bad rpc request") resp.Error = ErrorBadRequest return resp } @@ -786,7 +787,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse { data, err := handler(ctx, cmd.Params) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error sending rpc", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("method", cmd.Method).Msg("error calling rpc method") resp.Error = toAPIErr(err) return resp } @@ -807,7 +808,7 @@ func (h *Executor) Channels(ctx context.Context, cmd *ChannelsRequest) *Channels channels, err := h.surveyCaller.Channels(ctx, cmd) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling channels", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error calling channels") resp.Error = toAPIErr(err) return resp } diff --git a/internal/api/consuming.go b/internal/api/consuming.go index af157118c3..7fed555a14 100644 --- a/internal/api/consuming.go +++ b/internal/api/consuming.go @@ -7,6 +7,7 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifuge" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -34,7 +35,7 @@ func NewConsumingHandler(n *centrifuge.Node, apiExecutor *Executor, c ConsumingH } func (h *ConsumingHandler) logNonRetryableConsumingError(err error, method string) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming, skip message", map[string]any{"error": err.Error(), "method": method})) + log.Error().Err(err).Str("method", method).Msg("non retryable error during consuming, skip message") } func (h *ConsumingHandler) Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error { diff --git a/internal/api/handler.go b/internal/api/handler.go index d1039b1115..060af20196 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -2,9 +2,12 @@ package api import ( "context" + "errors" "io" "net/http" + "github.com/rs/zerolog/log" + . "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifuge" @@ -80,13 +83,13 @@ func (s *Handler) handleAPI(w http.ResponseWriter, r *http.Request) { data, err = io.ReadAll(r.Body) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error reading API request body", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error reading API request body") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if len(data) == 0 { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "no data in API request")) + log.Error().Err(errors.New("no data in API request")).Msg("bad request") http.Error(w, "Bad Request", http.StatusBadRequest) return } @@ -100,7 +103,7 @@ func (s *Handler) handleAPI(w http.ResponseWriter, r *http.Request) { for { command, decodeErr := decoder.Decode() if decodeErr != nil && decodeErr != io.EOF { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding API data", map[string]any{"error": decodeErr.Error()})) + log.Error().Err(decodeErr).Msg("error decoding API data") http.Error(w, "Bad Request", http.StatusBadRequest) return } @@ -115,7 +118,7 @@ func (s *Handler) handleAPI(w http.ResponseWriter, r *http.Request) { span := trace.SpanFromContext(r.Context()) span.SetStatus(codes.Error, err.Error()) } - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling API command", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error handling API command") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } @@ -125,7 +128,7 @@ func (s *Handler) handleAPI(w http.ResponseWriter, r *http.Request) { } err = encoder.Encode(rep) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error encoding API reply", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error encoding API reply") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } @@ -158,7 +161,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_PUBLISH: cmd, err := decoder.DecodePublish(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding publish params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding publish params") rep.Error = ErrorBadRequest return rep, nil } @@ -176,7 +179,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_BROADCAST: cmd, err := decoder.DecodeBroadcast(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding broadcast params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding broadcast params") rep.Error = ErrorBadRequest return rep, nil } @@ -194,7 +197,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_SUBSCRIBE: cmd, err := decoder.DecodeSubscribe(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding subscribe params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding subscribe params") rep.Error = ErrorBadRequest return rep, nil } @@ -212,7 +215,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_UNSUBSCRIBE: cmd, err := decoder.DecodeUnsubscribe(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding unsubscribe params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding unsubscribe params") rep.Error = ErrorBadRequest return rep, nil } @@ -230,7 +233,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_DISCONNECT: cmd, err := decoder.DecodeDisconnect(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding disconnect params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding disconnect params") rep.Error = ErrorBadRequest return rep, nil } @@ -248,7 +251,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_PRESENCE: cmd, err := decoder.DecodePresence(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding presence params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding presence params") rep.Error = ErrorBadRequest return rep, nil } @@ -266,7 +269,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_PRESENCE_STATS: cmd, err := decoder.DecodePresenceStats(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding presence stats params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding presence stats params") rep.Error = ErrorBadRequest return rep, nil } @@ -284,7 +287,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_HISTORY: cmd, err := decoder.DecodeHistory(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding history params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding history params") rep.Error = ErrorBadRequest return rep, nil } @@ -302,7 +305,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_HISTORY_REMOVE: cmd, err := decoder.DecodeHistoryRemove(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding history remove params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding history remove params") rep.Error = ErrorBadRequest return rep, nil } @@ -333,7 +336,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_RPC: cmd, err := decoder.DecodeRPC(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding rpc params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding rpc params") rep.Error = ErrorBadRequest return rep, nil } @@ -351,7 +354,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_REFRESH: cmd, err := decoder.DecodeRefresh(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding refresh params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding refresh params") rep.Error = ErrorBadRequest return rep, nil } @@ -369,7 +372,7 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e case Command_CHANNELS: cmd, err := decoder.DecodeChannels(params) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding channels params", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error decoding channels params") rep.Error = ErrorBadRequest return rep, nil } @@ -396,17 +399,17 @@ func (s *Handler) handleAPICommand(ctx context.Context, cmd *Command) (*Reply, e } func (s *Handler) handleReadDataError(r *http.Request, w http.ResponseWriter, err error) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error reading API request body", map[string]any{"error": err.Error(), "path": r.URL.Path})) + log.Error().Err(err).Str("path", r.URL.Path).Msg("error reading API request body") http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } func (s *Handler) handleUnmarshalError(r *http.Request, w http.ResponseWriter, err error) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding API request body", map[string]any{"error": err.Error(), "path": r.URL.Path})) + log.Error().Err(err).Str("path", r.URL.Path).Msg("error decoding API request body") http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) } func (s *Handler) handleMarshalError(r *http.Request, w http.ResponseWriter, err error) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error encoding API response", map[string]any{"error": err.Error(), "path": r.URL.Path})) + log.Error().Err(err).Str("path", r.URL.Path).Msg("error encoding API response") http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } diff --git a/internal/app/node.go b/internal/app/node.go index baaf990eb3..e75d48573a 100644 --- a/internal/app/node.go +++ b/internal/app/node.go @@ -11,7 +11,7 @@ import ( "github.com/centrifugal/centrifuge" ) -func centrifugeNodeConfig(version string, cfgContainer *config.Container) centrifuge.Config { +func centrifugeNodeConfig(version string, cfgContainer *config.Container, logHandler centrifuge.LogHandler) centrifuge.Config { appCfg := cfgContainer.Config() cfg := centrifuge.Config{} cfg.Version = version @@ -35,7 +35,7 @@ func centrifugeNodeConfig(version string, cfgContainer *config.Container) centri cfg.HistoryMetaTTL = appCfg.Channel.HistoryMetaTTL.ToDuration() cfg.ClientConnectIncludeServerTime = appCfg.Client.ConnectIncludeServerTime cfg.LogLevel = logging.CentrifugeLogLevel(strings.ToLower(appCfg.LogLevel)) - cfg.LogHandler = logging.NewCentrifugeLogHandler().Handle + cfg.LogHandler = logHandler if appCfg.Client.ConnectCodeToUnidirectionalDisconnect.Enabled { uniCodeTransforms := make(map[uint32]centrifuge.Disconnect) for _, transform := range appCfg.Client.ConnectCodeToUnidirectionalDisconnect.Transforms { diff --git a/internal/app/run.go b/internal/app/run.go index 8c70ab5b9a..ce9d990784 100644 --- a/internal/app/run.go +++ b/internal/app/run.go @@ -43,9 +43,13 @@ func Run(cmd *cobra.Command, configFile string) { if err != nil { log.Fatal().Msgf("error getting config: %v", err) } - logFileHandler := logging.Setup(cfg) - if logFileHandler != nil { - defer func() { _ = logFileHandler.Close() }() + + ctx, serviceCancel := context.WithCancel(context.Background()) + defer serviceCancel() + + centrifugeLogHandler, logCloseFn := logging.Setup(cfg) + if logCloseFn != nil { + defer logCloseFn() } if cfgMeta.FileNotFound { log.Warn().Msg("config file not found, continue using environment and flag options") @@ -97,7 +101,7 @@ func Run(cmd *cobra.Command, configFile string) { log.Fatal().Msgf("error building proxy map: %v", err) } - nodeCfg := centrifugeNodeConfig(build.Version, cfgContainer) + nodeCfg := centrifugeNodeConfig(build.Version, cfgContainer, centrifugeLogHandler) node, err := centrifuge.New(nodeCfg) if err != nil { @@ -173,7 +177,7 @@ func Run(cmd *cobra.Command, configFile string) { UseOpenTelemetry: useConsumingOpentelemetry, }) - consumingServices, err := consuming.New(node.ID(), node, consumingHandler, cfg.Consumers) + consumingServices, err := consuming.New(node.ID(), consumingHandler, cfg.Consumers) if err != nil { log.Fatal().Msgf("error initializing consumers: %v", err) } @@ -235,8 +239,6 @@ func Run(cmd *cobra.Command, configFile string) { log.Fatal().Msgf("error running node: %v", err) } - ctx, serviceCancel := context.WithCancel(context.Background()) - defer serviceCancel() serviceManager.Run(ctx) var grpcAPIServer *grpc.Server diff --git a/internal/client/handler.go b/internal/client/handler.go index b39230ad81..39c6360810 100644 --- a/internal/client/handler.go +++ b/internal/client/handler.go @@ -6,6 +6,10 @@ import ( "errors" "unicode" + "github.com/centrifugal/centrifugo/v5/internal/logging" + + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/clientcontext" "github.com/centrifugal/centrifugo/v5/internal/clientstorage" "github.com/centrifugal/centrifugo/v5/internal/config" @@ -135,7 +139,7 @@ func (h *Handler) Setup() error { personalChannel := h.cfgContainer.PersonalChannel(userID) presenceStats, err := h.node.PresenceStats(personalChannel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error calling presence stats", map[string]any{"error": err.Error(), "client": client.ID(), "user": client.UserID()})) + log.Error().Err(err).Str("channel", personalChannel).Str("user", userID).Str("client", client.ID()).Msg("error calling presence stats") client.Disconnect(centrifuge.DisconnectServerError) return } @@ -146,7 +150,7 @@ func (h *Handler) Setup() error { centrifuge.WithDisconnectClientWhitelist([]string{client.ID()}), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error sending disconnect", map[string]any{"error": err.Error(), "client": client.ID(), "user": client.UserID()})) + log.Error().Err(err).Str("user", userID).Str("client", client.ID()).Msg("error disconnecting user") client.Disconnect(centrifuge.DisconnectServerError) return } @@ -306,16 +310,16 @@ func (h *Handler) OnClientConnecting( return centrifuge.ConnectReply{}, centrifuge.ErrorTokenExpired } if errors.Is(err, jwtverify.ErrInvalidToken) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "invalid connection token", map[string]any{"error": err.Error(), "client": e.ClientID})) + log.Info().Err(err).Str("client", e.ClientID).Msg("invalid connection token") return centrifuge.ConnectReply{}, centrifuge.DisconnectInvalidToken } - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "internal server error", map[string]any{"error": err.Error(), "client": e.ClientID})) + log.Error().Err(err).Str("client", e.ClientID).Msg("error verifying connection token") return centrifuge.ConnectReply{}, err } if token.UserID == "" && cfg.Client.DisallowAnonymousConnectionTokens { - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "anonymous connection tokens disallowed", map[string]any{"client": e.ClientID})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("client", e.ClientID).Msg("anonymous connection tokens disallowed") } return centrifuge.ConnectReply{}, centrifuge.DisconnectPermissionDenied } @@ -366,11 +370,11 @@ func (h *Handler) OnClientConnecting( personalChannel := h.cfgContainer.PersonalChannel(credentials.UserID) _, _, chOpts, found, err := h.cfgContainer.ChannelOptions(personalChannel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "subscribe channel options error", map[string]any{"error": err.Error(), "channel": personalChannel})) + log.Error().Err(err).Str("channel", personalChannel).Msg("error getting personal channel options") return centrifuge.ConnectReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe unknown personal channel", map[string]any{"channel": personalChannel})) + log.Info().Str("channel", personalChannel).Msg("subscribe unknown personal channel") return centrifuge.ConnectReply{}, centrifuge.ErrorUnknownChannel } subscriptions[personalChannel] = centrifuge.SubscribeOptions{ @@ -392,11 +396,11 @@ func (h *Handler) OnClientConnecting( for _, ch := range e.Channels { _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(ch) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel options error", map[string]any{"error": err.Error(), "channel": ch})) + log.Error().Err(err).Str("channel", ch).Msg("error getting channel options") return centrifuge.ConnectReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe unknown channel", map[string]any{"channel": ch})) + log.Info().Str("channel", ch).Msg("subscribe unknown channel") return centrifuge.ConnectReply{}, centrifuge.DisconnectBadRequest } @@ -437,8 +441,8 @@ func (h *Handler) OnClientConnecting( } } } else { - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "ignoring subscription to a channel", map[string]any{"channel": ch, "client": e.ClientID, "user": userID})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("channel", ch).Str("client", e.ClientID).Str("user", userID).Msg("ignoring subscription to a channel") } } } @@ -502,14 +506,14 @@ func (h *Handler) OnRefresh(c Client, e centrifuge.RefreshEvent, refreshProxyHan return centrifuge.RefreshReply{Expired: true}, RefreshExtra{}, nil } if errors.Is(err, jwtverify.ErrInvalidToken) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "invalid refresh token", map[string]any{"error": err.Error(), "user": c.UserID(), "client": c.ID()})) + log.Info().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Msg("invalid refresh token") return centrifuge.RefreshReply{}, RefreshExtra{}, centrifuge.DisconnectInvalidToken } - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error verifying refresh token", map[string]any{"error": err.Error(), "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Msg("error verifying refresh token") return centrifuge.RefreshReply{}, RefreshExtra{}, err } if token.UserID != c.UserID() { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "refresh token user mismatch", map[string]any{"tokenUser": token.UserID, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("token_user", token.UserID).Str("user", c.UserID()).Str("client", c.ID()).Msg("refresh token user mismatch") return centrifuge.RefreshReply{}, RefreshExtra{}, centrifuge.DisconnectInvalidToken } if token.Meta != nil { @@ -540,11 +544,11 @@ func (h *Handler) OnSubRefresh(c Client, subRefreshProxyHandler proxy.SubRefresh if e.Token == "" && subRefreshProxyHandler != nil { _, _, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "sub refresh channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "sub refresh unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("sub refresh unknown channel") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.ErrorUnknownChannel } r, _, err := subRefreshProxyHandler(c, e, chOpts, getPerCallData(c)) @@ -560,26 +564,26 @@ func (h *Handler) OnSubRefresh(c Client, subRefreshProxyHandler proxy.SubRefresh return centrifuge.SubRefreshReply{Expired: true}, SubRefreshExtra{}, nil } if errors.Is(err, jwtverify.ErrInvalidToken) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "invalid subscription refresh token", map[string]any{"error": err.Error(), "client": c.ID(), "user": c.UserID()})) + log.Info().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Str("channel", e.Channel).Msg("invalid subscription refresh token") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.DisconnectInvalidToken } - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error verifying subscription refresh token", map[string]any{"error": err.Error(), "client": c.ID(), "user": c.UserID()})) + log.Error().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Str("channel", e.Channel).Msg("error verifying subscription refresh token") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, err } if e.Channel == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "sub refresh empty channel", map[string]any{"tokenChannel": token.Channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("token_channel", token.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("sub refresh empty channel") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.DisconnectInvalidToken } if e.Channel != token.Channel { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "sub refresh token channel mismatch", map[string]any{"channel": e.Channel, "tokenChannel": token.Channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("token_channel", token.Channel).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("sub refresh token channel mismatch") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.DisconnectInvalidToken } if token.UserID != c.UserID() { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "sub refresh token user mismatch", map[string]any{"channel": e.Channel, "tokenUser": token.UserID, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", e.Channel).Str("token_user", token.UserID).Str("client", c.ID()).Str("user", c.UserID()).Msg("sub refresh token user mismatch") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.DisconnectInvalidToken } if token.Client != "" && c.ID() != token.Client { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "token client mismatch", map[string]any{"channel": e.Channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", e.Channel).Str("token_client", token.Client).Str("client", c.ID()).Str("user", c.UserID()).Msg("sub refresh token client mismatch") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.DisconnectInvalidToken } return centrifuge.SubRefreshReply{ @@ -612,11 +616,11 @@ func (h *Handler) validChannelName(rest string, chOpts configtypes.ChannelOption func (h *Handler) validateChannelName(c Client, rest string, chOpts configtypes.ChannelOptions, channel string) error { ok, err := h.validChannelName(rest, chOpts, channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "error checking channel name", map[string]any{"channel": channel, "error": err.Error(), "client": c.ID(), "user": c.UserID()})) + log.Info().Err(err).Str("channel", channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error checking channel name") return centrifuge.ErrorInternal } if !ok { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "invalid channel name", map[string]any{"channel": channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("invalid channel name") return centrifuge.ErrorBadRequest } return nil @@ -630,17 +634,17 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr cfg := h.cfgContainer.Config() if e.Channel == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe empty channel", map[string]any{"user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("subscribe empty channel") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorUnknownChannel } _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "subscribe channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.SubscribeReply{}, SubscribeExtra{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("subscribe unknown channel") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorUnknownChannel } if err = h.validateChannelName(c, rest, chOpts, e.Channel); err != nil { @@ -664,7 +668,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr isUserLimitedChannel := chOpts.UserLimitedChannels && h.cfgContainer.IsUserLimited(e.Channel) if isPrivateChannel && e.Token == "" { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "attempt to subscribe on private channel without token", map[string]any{"channel": e.Channel, "client": c.ID(), "user": c.UserID()})) + log.Warn().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to subscribe on private channel without token") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorPermissionDenied } @@ -679,22 +683,22 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorTokenExpired } if errors.Is(err, jwtverify.ErrInvalidToken) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "invalid subscription token", map[string]any{"error": err.Error(), "client": c.ID(), "user": c.UserID()})) + log.Info().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Str("channel", e.Channel).Msg("invalid subscription token") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorPermissionDenied } - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error verifying subscription token", map[string]any{"error": err.Error(), "client": c.ID(), "user": c.UserID()})) + log.Error().Err(err).Str("client", c.ID()).Str("user", c.UserID()).Str("channel", e.Channel).Msg("error verifying subscription token") return centrifuge.SubscribeReply{}, SubscribeExtra{}, err } if e.Channel != token.Channel { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "token channel mismatch", map[string]any{"channel": e.Channel, "tokenChannel": token.Channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", e.Channel).Str("token_channel", token.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("token channel mismatch") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.DisconnectInvalidToken } if token.UserID != c.UserID() { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "token user mismatch", map[string]any{"channel": e.Channel, "tokenUser": token.UserID, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", e.Channel).Str("token_user", token.UserID).Str("client", c.ID()).Str("user", c.UserID()).Msg("token user mismatch") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.DisconnectInvalidToken } if token.Client != "" && c.ID() != token.Client { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "token client mismatch", map[string]any{"channel": e.Channel, "client": c.ID(), "user": c.UserID()})) + log.Info().Str("channel", e.Channel).Str("token_client", token.Client).Str("client", c.ID()).Str("user", c.UserID()).Msg("token client mismatch") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.DisconnectInvalidToken } options = token.Options @@ -705,7 +709,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr options.Source = subsource.UserLimited } else if (chOpts.SubscribeProxyEnabled) && !isUserLimitedChannel { if subscribeProxyHandler == nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("subscribe proxy not enabled") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable } r, _, err := subscribeProxyHandler(c, e, chOpts, getPerCallData(c)) @@ -715,7 +719,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr return r, SubscribeExtra{}, err } else if (chOpts.SubscribeStreamProxyEnabled) && !isUserLimitedChannel { if subscribeStreamHandlerFunc == nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "stream proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("stream proxy not enabled") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable } r, publishFunc, cancelFunc, err := subscribeStreamHandlerFunc(c, chOpts.SubscribeStreamBidirectional, e, chOpts, getPerCallData(c)) @@ -738,7 +742,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr } if !allowed { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "attempt to subscribe without sufficient permission", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to subscribe without sufficient permission") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorPermissionDenied } @@ -766,11 +770,11 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "publish channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.PublishReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "publish to unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("publish unknown channel") return centrifuge.PublishReply{}, centrifuge.ErrorUnknownChannel } if err = h.validateChannelName(c, rest, chOpts, e.Channel); err != nil { @@ -781,7 +785,7 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan if chOpts.PublishProxyEnabled { if publishProxyHandler == nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "publish proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("publish proxy not enabled") return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable } return publishProxyHandler(c, e, chOpts, getPerCallData(c)) @@ -811,7 +815,7 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan } if !allowed { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "attempt to publish without sufficient permission", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to publish without sufficient permission") return centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied } @@ -822,7 +826,7 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan centrifuge.WithDelta(chOpts.DeltaPublish), ) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "publish error", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID(), "error": err.Error()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error publishing message") } return centrifuge.PublishReply{Result: &result}, err } @@ -842,11 +846,11 @@ func (h *Handler) hasAccessToPresence(c Client, channel string, chOpts configtyp func (h *Handler) OnPresence(c Client, e centrifuge.PresenceEvent) (centrifuge.PresenceReply, error) { _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "presence channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.PresenceReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "presence for unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("presence for unknown channel") return centrifuge.PresenceReply{}, centrifuge.ErrorUnknownChannel } if err = h.validateChannelName(c, rest, chOpts, e.Channel); err != nil { @@ -859,7 +863,7 @@ func (h *Handler) OnPresence(c Client, e centrifuge.PresenceEvent) (centrifuge.P allowed := h.hasAccessToPresence(c, e.Channel, chOpts, false) if !allowed { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "attempt to call presence without sufficient permission", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to call presence without sufficient permission") return centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied } @@ -870,11 +874,11 @@ func (h *Handler) OnPresence(c Client, e centrifuge.PresenceEvent) (centrifuge.P func (h *Handler) OnPresenceStats(c Client, e centrifuge.PresenceStatsEvent) (centrifuge.PresenceStatsReply, error) { _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "presence stats channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.PresenceStatsReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "presence stats for unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("presence stats for unknown channel") return centrifuge.PresenceStatsReply{}, centrifuge.ErrorUnknownChannel } if err = h.validateChannelName(c, rest, chOpts, e.Channel); err != nil { @@ -887,7 +891,7 @@ func (h *Handler) OnPresenceStats(c Client, e centrifuge.PresenceStatsEvent) (ce allowed := h.hasAccessToPresence(c, e.Channel, chOpts, false) if !allowed { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "attempt to call presence stats without sufficient permission", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to call presence stats without sufficient permission") return centrifuge.PresenceStatsReply{}, centrifuge.ErrorPermissionDenied } @@ -909,11 +913,11 @@ func (h *Handler) hasAccessToHistory(c Client, channel string, chOpts configtype func (h *Handler) OnHistory(c Client, e centrifuge.HistoryEvent) (centrifuge.HistoryReply, error) { _, rest, chOpts, found, err := h.cfgContainer.ChannelOptions(e.Channel) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "history channel options error", map[string]any{"error": err.Error(), "channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Error().Err(err).Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("error getting channel options") return centrifuge.HistoryReply{}, err } if !found { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "history for unknown channel", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("history for unknown channel") return centrifuge.HistoryReply{}, centrifuge.ErrorUnknownChannel } if err = h.validateChannelName(c, rest, chOpts, e.Channel); err != nil { @@ -926,7 +930,7 @@ func (h *Handler) OnHistory(c Client, e centrifuge.HistoryEvent) (centrifuge.His allowed := h.hasAccessToHistory(c, e.Channel, chOpts, false) if !allowed { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "attempt to call history without sufficient permission", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + log.Info().Str("channel", e.Channel).Str("client", c.ID()).Str("user", c.UserID()).Msg("attempt to call history without sufficient permission") return centrifuge.HistoryReply{}, centrifuge.ErrorPermissionDenied } diff --git a/internal/consuming/consuming.go b/internal/consuming/consuming.go index 8be36a4984..cc28f65d55 100644 --- a/internal/consuming/consuming.go +++ b/internal/consuming/consuming.go @@ -8,7 +8,6 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/service" - "github.com/centrifugal/centrifuge" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" ) @@ -20,12 +19,7 @@ type Dispatcher interface { Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error } -type Logger interface { - LogEnabled(level centrifuge.LogLevel) bool - Log(node centrifuge.LogEntry) -} - -func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error) { +func New(nodeID string, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error) { metrics := newCommonMetrics(prometheus.DefaultRegisterer) var services []service.Service @@ -35,14 +29,14 @@ func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []Consumer continue } if config.Type == configtypes.ConsumerTypePostgres { - consumer, err := NewPostgresConsumer(config.Name, logger, dispatcher, config.Postgres, metrics) + consumer, err := NewPostgresConsumer(config.Name, dispatcher, config.Postgres, metrics) if err != nil { return nil, fmt.Errorf("error initializing PostgreSQL consumer (%s): %w", config.Name, err) } log.Info().Str("consumer_name", config.Name).Msg("running consumer") services = append(services, consumer) } else if config.Type == configtypes.ConsumerTypeKafka { - consumer, err := NewKafkaConsumer(config.Name, nodeID, logger, dispatcher, config.Kafka, metrics) + consumer, err := NewKafkaConsumer(config.Name, nodeID, dispatcher, config.Kafka, metrics) if err != nil { return nil, fmt.Errorf("error initializing Kafka consumer (%s): %w", config.Name, err) } diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index adc8279114..765c4a0b70 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -15,7 +15,7 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifugo/v5/internal/configtypes" - "github.com/centrifugal/centrifuge" + "github.com/rs/zerolog/log" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/sasl/aws" @@ -40,7 +40,6 @@ type KafkaConsumer struct { name string client *kgo.Client nodeID string - logger Logger dispatcher Dispatcher config KafkaConfig consumers map[topicPartition]*partitionConsumer @@ -83,7 +82,7 @@ type KafkaJSONEvent struct { } func NewKafkaConsumer( - name string, nodeID string, logger Logger, dispatcher Dispatcher, config KafkaConfig, metrics *commonMetrics, + name string, nodeID string, dispatcher Dispatcher, config KafkaConfig, metrics *commonMetrics, ) (*KafkaConsumer, error) { if len(config.Brokers) == 0 { return nil, errors.New("brokers required") @@ -103,7 +102,6 @@ func NewKafkaConsumer( consumer := &KafkaConsumer{ name: name, nodeID: nodeID, - logger: logger, dispatcher: dispatcher, config: config, consumers: make(map[topicPartition]*partitionConsumer), @@ -219,11 +217,11 @@ func (c *KafkaConsumer) Run(ctx context.Context) error { // consumers to skip calling CommitMarkedOffsets on revoke. Otherwise, we get // "UNKNOWN_MEMBER_ID" error (since group already left). if err := c.client.CommitMarkedOffsets(closeCtx); err != nil { - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "commit marked offsets error on shutdown", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer", c.name).Msg("error committing marked offsets on shutdown") } err := c.leaveGroup(closeCtx, c.client) if err != nil { - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error leaving consumer group", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer", c.name).Msg("error leaving consumer group") } c.client.CloseAllowingRebalance() } @@ -234,18 +232,18 @@ func (c *KafkaConsumer) Run(ctx context.Context) error { if errors.Is(err, context.Canceled) { return ctx.Err() } - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error polling Kafka", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer", c.name).Msg("error polling Kafka") } // Upon returning from polling loop we are re-initializing consumer client. c.client.CloseAllowingRebalance() c.client = nil - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "start re-initializing Kafka consumer client", map[string]any{})) + log.Info().Str("consumer", c.name).Msg("re-initializing Kafka consumer client") err = c.reInitClient(ctx) if err != nil { // Only context.Canceled may be returned. return err } - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "Kafka consumer client re-initialized", map[string]any{})) + log.Info().Str("consumer", c.name).Msg("Kafka consumer client re-initialized") } } @@ -270,10 +268,7 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { return ctx.Err() } errs = append(errs, fetchErr.Err) - c.logger.Log(centrifuge.NewLogEntry( - centrifuge.LogLevelError, "error while polling Kafka", - map[string]any{"error": fetchErr.Err.Error(), "topic": fetchErr.Topic, "partition": fetchErr.Partition}), - ) + log.Error().Err(fetchErr.Err).Str("topic", fetchErr.Topic).Int32("partition", fetchErr.Partition).Msg("error while polling Kafka") } return fmt.Errorf("poll error: %w", errors.Join(errs...)) } @@ -361,7 +356,7 @@ func (c *KafkaConsumer) reInitClient(ctx context.Context) error { if err != nil { retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error initializing Kafka client", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer", c.name).Msg("error initializing Kafka client") select { case <-ctx.Done(): return ctx.Err() @@ -397,7 +392,6 @@ func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned m pc := &partitionConsumer{ partitionCtx: partitionCtx, dispatcher: c.dispatcher, - logger: c.logger, cl: cl, topic: topic, partition: partition, @@ -422,7 +416,7 @@ func (c *KafkaConsumer) revoked(ctx context.Context, cl *kgo.Client, revoked map // Do not try to CommitMarkedOffsets since on shutdown we call it manually. default: if err := cl.CommitMarkedOffsets(ctx); err != nil { - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "commit error on revoked partitions", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer", c.name).Msg("error committing marked offsets on revoke") } } } @@ -451,7 +445,6 @@ func (c *KafkaConsumer) killConsumers(lost map[string][]int32) { type partitionConsumer struct { partitionCtx context.Context dispatcher Dispatcher - logger Logger cl *kgo.Client topic string partition int32 @@ -482,7 +475,7 @@ func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, r var err error delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeader)) if err != nil { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error parsing delta header value, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) + log.Error().Err(err).Str("topic", record.Topic).Int32("partition", record.Partition).Msg("error parsing delta header value, skip message") return nil } } @@ -499,7 +492,7 @@ func (pc *partitionConsumer) processAPICommandRecord(ctx context.Context, record var e KafkaJSONEvent err := json.Unmarshal(record.Value, &e) if err != nil { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) + log.Error().Err(err).Str("consumer_name", pc.name).Str("topic", record.Topic).Int32("partition", record.Partition).Msg("error unmarshalling event from Kafka, skip message") return nil } return pc.dispatcher.Dispatch(ctx, e.Method, e.Payload) @@ -526,7 +519,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { err := pc.processRecord(pc.partitionCtx, record) if err == nil { if retries > 0 { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing message after errors", map[string]any{"topic": record.Topic, "partition": record.Partition})) + log.Info().Str("consumer_name", pc.name).Str("topic", record.Topic).Int32("partition", record.Partition).Msg("OK processing message after errors") } pc.metrics.processedTotal.WithLabelValues(pc.name).Inc() pc.cl.MarkCommitRecords(record) @@ -535,7 +528,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) pc.metrics.errorsTotal.WithLabelValues(pc.name).Inc() - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed record", map[string]any{"error": err.Error(), "next_attempt_in": backoffDuration.String(), "topic": record.Topic, "partition": record.Partition})) + log.Error().Err(err).Str("consumer_name", pc.name).Str("topic", record.Topic).Int32("partition", record.Partition).Dur("next_attempt_in", backoffDuration).Msg("error processing consumed record") select { case <-time.After(backoffDuration): case <-pc.partitionCtx.Done(): diff --git a/internal/consuming/postgresql.go b/internal/consuming/postgresql.go index 3faeb890c8..6fb8450be5 100644 --- a/internal/consuming/postgresql.go +++ b/internal/consuming/postgresql.go @@ -7,9 +7,10 @@ import ( "strconv" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" - "github.com/centrifugal/centrifuge" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/sync/errgroup" @@ -21,7 +22,7 @@ const ( ) func NewPostgresConsumer( - name string, logger Logger, dispatcher Dispatcher, config PostgresConfig, metrics *commonMetrics, + name string, dispatcher Dispatcher, config PostgresConfig, metrics *commonMetrics, ) (*PostgresConsumer, error) { if config.DSN == "" { return nil, errors.New("dsn is required") @@ -62,7 +63,6 @@ func NewPostgresConsumer( return &PostgresConsumer{ name: name, pool: pool, - logger: logger, dispatcher: dispatcher, config: config, lockPrefix: "centrifugo_partition_lock_" + name, @@ -75,7 +75,6 @@ type PostgresConfig = configtypes.PostgresConsumerConfig type PostgresConsumer struct { name string pool *pgxpool.Pool - logger Logger config PostgresConfig dispatcher Dispatcher lockPrefix string @@ -113,12 +112,12 @@ func (c *PostgresConsumer) listenForNotifications(ctx context.Context, triggerCh } partition, err := strconv.Atoi(notification.Payload) if err != nil { - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error converting postgresql notification", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer_name", c.name).Msg("error converting postgresql notification") continue } if partition > len(triggerChannels)-1 { - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "outbox partition is larger than configured number", map[string]any{"partition": partition})) + log.Error().Str("consumer_name", c.name).Int("partition", partition).Msg("outbox partition is larger than configured number") continue } select { @@ -182,7 +181,7 @@ func (c *PostgresConsumer) processOnce(ctx context.Context, partition int) (int, dispatchErr = c.dispatcher.Dispatch(context.Background(), event.Method, event.Payload) if dispatchErr != nil { // Stop here, all processed events will be removed, and we will start from this one. - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed event", map[string]any{"error": dispatchErr.Error(), "method": event.Method})) + log.Error().Err(dispatchErr).Str("consumer_name", c.name).Str("method", event.Method).Msg("error processing consumed event") break } else { numProcessedRows++ @@ -230,7 +229,7 @@ func (c *PostgresConsumer) Run(ctx context.Context) error { if errors.Is(err, context.Canceled) { return ctx.Err() } - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error listening outbox notifications", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("consumer_name", c.name).Msg("error listening outbox notifications") select { case <-time.After(time.Second): case <-ctx.Done(): @@ -263,7 +262,7 @@ func (c *PostgresConsumer) Run(ctx context.Context) error { retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) c.metrics.errorsTotal.WithLabelValues(c.name).Inc() - c.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing postgresql outbox", map[string]any{"error": err.Error(), "partition": i})) + log.Error().Err(err).Str("consumer_name", c.name).Int("partition", i).Msg("error processing postgresql outbox") select { case <-ctx.Done(): return ctx.Err() diff --git a/internal/logging/logging.go b/internal/logging/logging.go index 216f26d7a1..2ac1cb8ad1 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -14,6 +14,14 @@ import ( "github.com/rs/zerolog/log" ) +const ( + TraceLevel = zerolog.TraceLevel + DebugLevel = zerolog.DebugLevel + InfoLevel = zerolog.InfoLevel + WarnLevel = zerolog.WarnLevel + ErrorLevel = zerolog.ErrorLevel +) + var logLevelMatches = map[string]zerolog.Level{ "NONE": zerolog.NoLevel, "TRACE": zerolog.TraceLevel, @@ -24,41 +32,6 @@ var logLevelMatches = map[string]zerolog.Level{ "FATAL": zerolog.FatalLevel, } -func configureConsoleWriter() { - if isTerminalAttached() { - log.Logger = log.Output(zerolog.ConsoleWriter{ - Out: os.Stdout, - TimeFormat: "2006-01-02 15:04:05", - FormatLevel: logutils.ConsoleFormatLevel(), - FormatErrFieldName: logutils.ConsoleFormatErrFieldName(), - FormatErrFieldValue: logutils.ConsoleFormatErrFieldValue(), - }) - } -} - -func isTerminalAttached() bool { - //goland:noinspection GoBoolExpressions – Goland is not smart enough here. - return isatty.IsTerminal(os.Stdout.Fd()) && runtime.GOOS != "windows" -} - -func Setup(cfg config.Config) *os.File { - configureConsoleWriter() - logLevel, ok := logLevelMatches[strings.ToUpper(cfg.LogLevel)] - if !ok { - logLevel = zerolog.InfoLevel - } - zerolog.SetGlobalLevel(logLevel) - if cfg.LogFile != "" { - f, err := os.OpenFile(cfg.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal().Msgf("error opening log file: %v", err) - } - log.Logger = log.Output(f) - return f - } - return nil -} - func CentrifugeLogLevel(level string) centrifuge.LogLevel { if l, ok := logStringToLevel[strings.ToLower(level)]; ok { return l @@ -66,7 +39,7 @@ func CentrifugeLogLevel(level string) centrifuge.LogLevel { return centrifuge.LogLevelInfo } -// LogStringToLevel matches level string to Centrifuge LogLevel. +// logStringToLevel matches level string to Centrifuge LogLevel. var logStringToLevel = map[string]centrifuge.LogLevel{ "trace": centrifuge.LogLevelTrace, "debug": centrifuge.LogLevelDebug, @@ -75,19 +48,19 @@ var logStringToLevel = map[string]centrifuge.LogLevel{ "none": centrifuge.LogLevelNone, } -type LogHandler struct { +type centrifugeLogHandler struct { entries chan centrifuge.LogEntry } -func NewCentrifugeLogHandler() *LogHandler { - h := &LogHandler{ +func newCentrifugeLogHandler() *centrifugeLogHandler { + h := ¢rifugeLogHandler{ entries: make(chan centrifuge.LogEntry, 64), } go h.readEntries() return h } -func (h *LogHandler) readEntries() { +func (h *centrifugeLogHandler) readEntries() { for entry := range h.entries { var l *zerolog.Event switch entry.Level { @@ -112,10 +85,51 @@ func (h *LogHandler) readEntries() { } } -func (h *LogHandler) Handle(entry centrifuge.LogEntry) { +func (h *centrifugeLogHandler) Handle(entry centrifuge.LogEntry) { select { case h.entries <- entry: default: return } } + +func configureConsoleWriter() { + if isTerminalAttached() { + log.Logger = log.Output(zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: "2006-01-02 15:04:05", + FormatLevel: logutils.ConsoleFormatLevel(), + FormatErrFieldName: logutils.ConsoleFormatErrFieldName(), + FormatErrFieldValue: logutils.ConsoleFormatErrFieldValue(), + }) + } +} + +func isTerminalAttached() bool { + return isatty.IsTerminal(os.Stdout.Fd()) && runtime.GOOS != "windows" +} + +func Setup(cfg config.Config) (centrifuge.LogHandler, func()) { + configureConsoleWriter() + logLevel, ok := logLevelMatches[strings.ToUpper(cfg.LogLevel)] + if !ok { + logLevel = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(logLevel) + if cfg.LogFile != "" { + f, err := os.OpenFile(cfg.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Fatal().Msgf("error opening log file: %v", err) + } + log.Logger = log.Output(f) + return newCentrifugeLogHandler().Handle, func() { + _ = f.Close() + } + } + return newCentrifugeLogHandler().Handle, nil +} + +// Enabled checks if a specific logging level is enabled +func Enabled(level zerolog.Level) bool { + return level >= zerolog.GlobalLevel() +} diff --git a/internal/middleware/connlimit.go b/internal/middleware/connlimit.go index c085180ad4..9b2a2311f8 100644 --- a/internal/middleware/connlimit.go +++ b/internal/middleware/connlimit.go @@ -5,6 +5,8 @@ import ( "sync/atomic" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/config" "github.com/centrifugal/centrifuge" @@ -52,7 +54,7 @@ func (l *ConnLimit) Middleware(h http.Handler) http.Handler { now := time.Now().UnixNano() prevLoggedAt := atomic.LoadInt64(&connLimitReachedLoggedAt) if prevLoggedAt == 0 || now-prevLoggedAt > connLimitReachedLogThrottle { - l.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "node connection limit reached", map[string]any{"limit": connLimit})) + log.Warn().Int("limit", connLimit).Msg("node connection limit reached") atomic.StoreInt64(&connLimitReachedLoggedAt, now) } w.WriteHeader(http.StatusServiceUnavailable) diff --git a/internal/natsbroker/broker.go b/internal/natsbroker/broker.go index b2ee6a724a..801ff06431 100644 --- a/internal/natsbroker/broker.go +++ b/internal/natsbroker/broker.go @@ -115,7 +115,7 @@ func (b *NatsBroker) Run(h centrifuge.BrokerEventHandler) error { return err } b.nc = nc - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Nats Broker connected to: %s", url))) + log.Info().Str("broker", "nats").Str("url", url).Msg("broker running") return nil } @@ -278,7 +278,7 @@ func (b *NatsBroker) handleClientMessage(subject string, data []byte, sub *nats. var push protocol.Push err := push.UnmarshalVT(data) if err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "can't unmarshal push from Nats", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error unmarshal push from Nats") return } @@ -302,7 +302,7 @@ func (b *NatsBroker) handleClientMessage(subject string, data []byte, sub *nats. } else if push.Leave != nil { _ = b.eventHandler.HandleLeave(push.Channel, infoFromProto(push.Leave.Info)) } else { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown push from Nats", map[string]any{"push": fmt.Sprintf("%v", &push)})) + log.Warn().Str("push", fmt.Sprintf("%v", &push)).Msg("unknown push from Nats") } } diff --git a/internal/proxy/connect_handler.go b/internal/proxy/connect_handler.go index cfefffc9cd..ddb49ced97 100644 --- a/internal/proxy/connect_handler.go +++ b/internal/proxy/connect_handler.go @@ -6,6 +6,8 @@ import ( "encoding/json" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/clientstorage" "github.com/centrifugal/centrifugo/v5/internal/config" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" @@ -76,7 +78,7 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc { h.summary.Observe(duration) h.histogram.Observe(duration) h.errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying connect", map[string]any{"client": e.ClientID, "error": err.Error()})) + log.Error().Err(err).Str("client", e.ClientID).Msg("error proxying connect") return centrifuge.ConnectReply{}, ConnectExtra{}, err } h.summary.Observe(duration) @@ -97,7 +99,7 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc { if result.B64Info != "" { decodedInfo, err := base64.StdEncoding.DecodeString(result.B64Info) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": e.ClientID, "error": err.Error()})) + log.Error().Err(err).Str("client", e.ClientID).Msg("error decoding base64 info") return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorInternal } info = decodedInfo @@ -109,7 +111,7 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc { if result.B64Data != "" { decodedData, err := base64.StdEncoding.DecodeString(result.B64Data) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": e.ClientID, "error": err.Error()})) + log.Error().Err(err).Str("client", e.ClientID).Msg("error decoding base64 data") return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorInternal } data = decodedData @@ -137,7 +139,7 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc { return centrifuge.ConnectReply{}, ConnectExtra{}, err } if !found { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown channel in connect result channels", map[string]any{"client": e.ClientID, "channel": ch})) + log.Warn().Str("client", e.ClientID).Str("channel", ch).Msg("unknown channel in connect result channels") return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorUnknownChannel } reply.Subscriptions[ch] = centrifuge.SubscribeOptions{ @@ -162,7 +164,7 @@ func (h *ConnectHandler) Handle(node *centrifuge.Node) ConnectingHandlerFunc { return centrifuge.ConnectReply{}, ConnectExtra{}, err } if !found { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown channel in connect result subs", map[string]any{"client": e.ClientID, "channel": ch})) + log.Warn().Str("client", e.ClientID).Str("channel", ch).Msg("unknown channel in connect result subs") return centrifuge.ConnectReply{}, ConnectExtra{}, centrifuge.ErrorUnknownChannel } var chInfo []byte diff --git a/internal/proxy/publish_handler.go b/internal/proxy/publish_handler.go index fd21a59db2..ab1514b05d 100644 --- a/internal/proxy/publish_handler.go +++ b/internal/proxy/publish_handler.go @@ -4,6 +4,8 @@ import ( "encoding/base64" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" @@ -59,7 +61,7 @@ func (h *PublishHandler) Handle(node *centrifuge.Node) PublishHandlerFunc { proxyEnabled := chOpts.PublishProxyEnabled proxyName := chOpts.PublishProxyName if !proxyEnabled { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "publish proxy not configured for a channel", map[string]any{"channel": e.Channel})) + log.Info().Str("channel", e.Channel).Msg("publish proxy not enabled for a channel") return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable } p = h.config.Proxies[proxyName] @@ -97,7 +99,7 @@ func (h *PublishHandler) Handle(node *centrifuge.Node) PublishHandlerFunc { summary.Observe(duration) histogram.Observe(duration) errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying publish", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Str("channel", e.Channel).Msg("error proxying publish") return centrifuge.PublishReply{}, err } summary.Observe(duration) @@ -121,7 +123,7 @@ func (h *PublishHandler) Handle(node *centrifuge.Node) PublishHandlerFunc { } else if publishRep.Result.B64Data != "" { decodedData, err := base64.StdEncoding.DecodeString(publishRep.Result.B64Data) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 data") return centrifuge.PublishReply{}, centrifuge.ErrorInternal } data = decodedData diff --git a/internal/proxy/refresh_handler.go b/internal/proxy/refresh_handler.go index 30c2438900..55b1bab64b 100644 --- a/internal/proxy/refresh_handler.go +++ b/internal/proxy/refresh_handler.go @@ -5,6 +5,8 @@ import ( "encoding/json" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/proxyproto" "github.com/centrifugal/centrifuge" @@ -68,7 +70,7 @@ func (h *RefreshHandler) Handle(node *centrifuge.Node) RefreshHandlerFunc { h.summary.Observe(duration) h.histogram.Observe(duration) h.errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying refresh", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error proxying refresh") // In case of an error give connection one more minute to live and // then try to check again. This way we gracefully handle temporary // problems on application backend side. @@ -84,7 +86,7 @@ func (h *RefreshHandler) Handle(node *centrifuge.Node) RefreshHandlerFunc { result := refreshRep.Result if result == nil { // User will be disconnected. - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "no refresh result found", map[string]any{})) + log.Error().Msg("no refresh result found") return centrifuge.RefreshReply{ Expired: true, }, RefreshExtra{}, nil @@ -100,7 +102,7 @@ func (h *RefreshHandler) Handle(node *centrifuge.Node) RefreshHandlerFunc { if result.B64Info != "" { decodedInfo, err := base64.StdEncoding.DecodeString(result.B64Info) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 info") return centrifuge.RefreshReply{}, RefreshExtra{}, centrifuge.ErrorInternal } info = decodedInfo diff --git a/internal/proxy/rpc_handler.go b/internal/proxy/rpc_handler.go index df795cf14c..37eb7e0e59 100644 --- a/internal/proxy/rpc_handler.go +++ b/internal/proxy/rpc_handler.go @@ -4,6 +4,8 @@ import ( "encoding/base64" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/config" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" @@ -58,17 +60,17 @@ func (h *RPCHandler) Handle(node *centrifuge.Node) RPCHandlerFunc { rpcOpts, ok, err := cfgContainer.RpcOptions(e.Method) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error getting RPC options", map[string]any{"method": e.Method, "error": err.Error()})) + log.Error().Err(err).Str("method", e.Method).Msg("error getting RPC options") return centrifuge.RPCReply{}, centrifuge.ErrorInternal } if !ok { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "rpc options not found", map[string]any{"method": e.Method})) + log.Info().Str("method", e.Method).Msg("rpc options not found") return centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound } proxyEnabled := rpcOpts.ProxyEnabled proxyName := rpcOpts.ProxyName if !proxyEnabled { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "rpc proxy not configured for a method", map[string]any{"method": e.Method})) + log.Info().Str("method", e.Method).Msg("rpc proxy not enabled for a method") return centrifuge.RPCReply{}, centrifuge.ErrorNotAvailable } p = h.config.Proxies[proxyName] @@ -106,7 +108,7 @@ func (h *RPCHandler) Handle(node *centrifuge.Node) RPCHandlerFunc { summary.Observe(duration) histogram.Observe(duration) errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying RPC", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error proxying RPC") return centrifuge.RPCReply{}, err } summary.Observe(duration) @@ -124,7 +126,7 @@ func (h *RPCHandler) Handle(node *centrifuge.Node) RPCHandlerFunc { if rpcData.B64Data != "" { decodedData, err := base64.StdEncoding.DecodeString(rpcData.B64Data) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 data") return centrifuge.RPCReply{}, centrifuge.ErrorInternal } data = decodedData diff --git a/internal/proxy/sub_refresh_handler.go b/internal/proxy/sub_refresh_handler.go index f2c20f381b..a9a19fac33 100644 --- a/internal/proxy/sub_refresh_handler.go +++ b/internal/proxy/sub_refresh_handler.go @@ -4,6 +4,8 @@ import ( "encoding/base64" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" @@ -62,7 +64,7 @@ func (h *SubRefreshHandler) Handle(node *centrifuge.Node) SubRefreshHandlerFunc proxyEnabled := chOpts.SubRefreshProxyEnabled proxyName := chOpts.SubRefreshProxyName if !proxyEnabled { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "sub refresh proxy not configured for a channel", map[string]any{"channel": e.Channel})) + log.Info().Str("channel", e.Channel).Msg("sub refresh proxy not configured for a channel") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.ErrorNotAvailable } p = h.config.Proxies[proxyName] @@ -94,7 +96,7 @@ func (h *SubRefreshHandler) Handle(node *centrifuge.Node) SubRefreshHandlerFunc summary.Observe(duration) histogram.Observe(duration) errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying sub refresh", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Str("channel", e.Channel).Msg("error proxying sub refresh") // In case of an error give connection one more minute to live and // then try to check again. This way we gracefully handle temporary // problems on application backend side. @@ -110,7 +112,7 @@ func (h *SubRefreshHandler) Handle(node *centrifuge.Node) SubRefreshHandlerFunc result := refreshRep.Result if result == nil { // Subscription will be unsubscribed. - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "no sub refresh result found", map[string]any{})) + log.Error().Msg("no sub refresh result found") return centrifuge.SubRefreshReply{ Expired: true, }, SubRefreshExtra{}, nil @@ -126,7 +128,7 @@ func (h *SubRefreshHandler) Handle(node *centrifuge.Node) SubRefreshHandlerFunc if result.B64Info != "" { decodedInfo, err := base64.StdEncoding.DecodeString(result.B64Info) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 info") return centrifuge.SubRefreshReply{}, SubRefreshExtra{}, centrifuge.ErrorInternal } info = decodedInfo diff --git a/internal/proxy/subscribe_handler.go b/internal/proxy/subscribe_handler.go index d9267b9922..64347c5941 100644 --- a/internal/proxy/subscribe_handler.go +++ b/internal/proxy/subscribe_handler.go @@ -4,6 +4,8 @@ import ( "encoding/base64" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" "github.com/centrifugal/centrifugo/v5/internal/subsource" @@ -63,7 +65,7 @@ func (h *SubscribeHandler) Handle(node *centrifuge.Node) SubscribeHandlerFunc { proxyEnabled := chOpts.SubscribeProxyEnabled proxyName := chOpts.SubscribeProxyName if !proxyEnabled { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe proxy not configured for a channel", map[string]any{"channel": e.Channel})) + log.Info().Str("channel", e.Channel).Msg("subscribe proxy not enabled for a channel") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable } p = h.config.Proxies[proxyName] @@ -101,7 +103,7 @@ func (h *SubscribeHandler) Handle(node *centrifuge.Node) SubscribeHandlerFunc { summary.Observe(duration) histogram.Observe(duration) errors.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error proxying subscribe", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Str("channel", e.Channel).Msg("error proxying subscribe") return centrifuge.SubscribeReply{}, SubscribeExtra{}, err } summary.Observe(duration) @@ -129,7 +131,7 @@ func (h *SubscribeHandler) Handle(node *centrifuge.Node) SubscribeHandlerFunc { if subscribeRep.Result.B64Info != "" { decodedInfo, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Info) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 info") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorInternal } info = decodedInfo @@ -139,7 +141,7 @@ func (h *SubscribeHandler) Handle(node *centrifuge.Node) SubscribeHandlerFunc { if subscribeRep.Result.B64Data != "" { decodedData, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Data) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 data") return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorInternal } data = decodedData diff --git a/internal/proxy/subscribe_stream_handler.go b/internal/proxy/subscribe_stream_handler.go index 6de5755177..98a347c6d8 100644 --- a/internal/proxy/subscribe_stream_handler.go +++ b/internal/proxy/subscribe_stream_handler.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/proxyproto" "github.com/centrifugal/centrifugo/v5/internal/subsource" @@ -74,7 +76,7 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa proxyEnabled := chOpts.SubscribeStreamProxyEnabled proxyName := chOpts.SubscribeStreamProxyName if !proxyEnabled { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe stream proxy not configured for a channel", map[string]any{"channel": e.Channel})) + log.Info().Str("channel", e.Channel).Msg("subscribe stream proxy not enabled for a channel") return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorNotAvailable } p = h.config.Proxies[proxyName] @@ -154,7 +156,7 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa summary.Observe(duration) histogram.Observe(duration) errCounter.Inc() - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error from subscribe stream proxy", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Str("channel", e.Channel).Msg("error from subscribe stream proxy") //proxyCallErrorCount.WithLabelValues(proxyName, "subscribe", "internal").Inc() return centrifuge.SubscribeReply{}, nil, nil, err } @@ -189,7 +191,7 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa if subscribeRep.Result.B64Info != "" { decodedInfo, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Info) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 info") return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorInternal } info = decodedInfo @@ -199,7 +201,7 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa if subscribeRep.Result.B64Data != "" { decodedData, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Data) if err != nil { - node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": client.ID(), "error": err.Error()})) + log.Error().Err(err).Str("client", client.ID()).Msg("error decoding base64 data") return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorInternal } data = decodedData diff --git a/internal/unigrpc/grpc.go b/internal/unigrpc/grpc.go index 05dd7122a9..d191281798 100644 --- a/internal/unigrpc/grpc.go +++ b/internal/unigrpc/grpc.go @@ -3,6 +3,9 @@ package unigrpc import ( "time" + "github.com/centrifugal/centrifugo/v5/internal/logging" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/unigrpc/unistream" "github.com/centrifugal/centrifuge" @@ -59,10 +62,10 @@ func (s *Service) Consume(req *unistream.ConnectRequest, stream unistream.Centri } defer func() { _ = closeFn() }() - if s.node.LogEnabled(centrifuge.LogLevelDebug) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"transport": transport.Name(), "client": c.ID()})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("transport", transport.Name()).Str("client", c.ID()).Msg("client connection established") defer func(started time.Time) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"duration": time.Since(started).String(), "transport": transport.Name(), "client": c.ID()})) + log.Debug().Str("transport", transport.Name()).Str("client", c.ID()).Dur("duration", time.Since(started)).Msg("client connection completed") }(time.Now()) } diff --git a/internal/unihttpstream/handler.go b/internal/unihttpstream/handler.go index 581c04eb92..110976f486 100644 --- a/internal/unihttpstream/handler.go +++ b/internal/unihttpstream/handler.go @@ -6,10 +6,12 @@ import ( "net/http" "time" + "github.com/centrifugal/centrifugo/v5/internal/logging" "github.com/centrifugal/centrifugo/v5/internal/tools" "github.com/centrifugal/centrifuge" "github.com/centrifugal/protocol" + "github.com/rs/zerolog/log" ) type Handler struct { @@ -40,7 +42,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxBytesSize) connectRequestData, err := io.ReadAll(r.Body) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "error reading uni http stream request body", map[string]any{"error": err.Error()})) + log.Info().Err(err).Str("transport", "uni_http_stream").Msg("error reading uni http stream request body") if len(connectRequestData) >= int(maxBytesSize) { w.WriteHeader(http.StatusRequestEntityTooLarge) return @@ -49,8 +51,8 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } err = json.Unmarshal(connectRequestData, &req) if err != nil { - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "malformed connect request", map[string]any{"error": err.Error()})) + if logging.Enabled(logging.DebugLevel) { + log.Error().Err(err).Str("transport", "uni_http_stream").Msg("malformed connect request") } return } @@ -62,16 +64,16 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { transport := newStreamTransport(r, h.pingPong) c, closeFn, err := centrifuge.NewClient(r.Context(), h.node, transport) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error create client", map[string]any{"error": err.Error(), "transport": "uni_http_stream"})) + log.Error().Err(err).Str("transport", "uni_http_stream").Msg("error create client") return } defer func() { _ = closeFn() }() defer close(transport.closedCh) // need to execute this after client closeFn. - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"transport": transport.Name(), "client": c.ID()})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("transport", "uni_http_stream").Str("client", c.ID()).Msg("client connection established") defer func(started time.Time) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"duration": time.Since(started).String(), "transport": transport.Name(), "client": c.ID()})) + log.Debug().Str("transport", "uni_http_stream").Str("client", c.ID()).Dur("duration", time.Since(started)).Msg("client connection completed") }(time.Now()) } diff --git a/internal/unisse/handler.go b/internal/unisse/handler.go index b582e8304c..5c5c85a4a5 100644 --- a/internal/unisse/handler.go +++ b/internal/unisse/handler.go @@ -6,10 +6,12 @@ import ( "net/http" "time" + "github.com/centrifugal/centrifugo/v5/internal/logging" "github.com/centrifugal/centrifugo/v5/internal/tools" "github.com/centrifugal/centrifuge" "github.com/centrifugal/protocol" + "github.com/rs/zerolog/log" ) type Handler struct { @@ -39,7 +41,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if connectRequestString != "" { err := json.Unmarshal([]byte(connectRequestString), &req) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "malformed connect request", map[string]any{"error": err.Error()})) + log.Info().Err(err).Str("transport", "uni_sse").Msg("error unmarshalling connect request") return } } else { @@ -50,7 +52,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, int64(maxBytesSize)) connectRequestData, err := io.ReadAll(r.Body) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "error reading uni sse request body", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("error reading uni sse request body") if len(connectRequestData) >= maxBytesSize { w.WriteHeader(http.StatusRequestEntityTooLarge) return @@ -59,8 +61,8 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } err = json.Unmarshal(connectRequestData, &req) if err != nil { - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "malformed connect request", map[string]any{"error": err.Error()})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Err(err).Str("transport", "uni_sse").Msg("malformed connect request") } return } @@ -72,16 +74,16 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { transport := newEventsourceTransport(r, h.pingPong) c, closeFn, err := centrifuge.NewClient(r.Context(), h.node, transport) if err != nil { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error create client", map[string]any{"error": err.Error(), "transport": "uni_sse"})) + log.Error().Err(err).Str("transport", "uni_sse").Msg("error create client") return } defer func() { _ = closeFn() }() defer close(transport.closedCh) // need to execute this after client closeFn. - if h.node.LogEnabled(centrifuge.LogLevelDebug) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"transport": transport.Name(), "client": c.ID()})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("transport", "uni_sse").Str("client", c.ID()).Msg("client connection established") defer func(started time.Time) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"duration": time.Since(started).String(), "transport": transport.Name(), "client": c.ID()})) + log.Debug().Str("transport", "uni_sse").Str("client", c.ID()).Dur("duration", time.Since(started)).Msg("client connection completed") }(time.Now()) } diff --git a/internal/uniws/handler.go b/internal/uniws/handler.go index bf2a199359..f3e251d246 100644 --- a/internal/uniws/handler.go +++ b/internal/uniws/handler.go @@ -6,6 +6,10 @@ import ( "sync" "time" + "github.com/centrifugal/centrifugo/v5/internal/logging" + + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifuge" "github.com/centrifugal/protocol" "github.com/gorilla/websocket" @@ -71,14 +75,14 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { conn, err := s.upgrade.Upgrade(rw, r, nil) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "websocket upgrade error", map[string]any{"error": err.Error()})) + log.Error().Err(err).Str("transport", "uni_ws").Msg("websocket upgrade error") return } if compression { err := conn.SetCompressionLevel(compressionLevel) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "websocket error setting compression level", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("websocket error setting compression level") } } @@ -131,15 +135,15 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { c, closeFn, err := centrifuge.NewClient(NewCancelContext(r.Context(), ctxCh), s.node, transport) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]any{"transport": transport.Name()})) + log.Error().Err(err).Str("transport", "uni_ws").Msg("error creating client") return } defer func() { _ = closeFn() }() - if s.node.LogEnabled(centrifuge.LogLevelDebug) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"client": c.ID(), "transport": transport.Name()})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("transport", "uni_ws").Str("client", c.ID()).Msg("client connection established") defer func(started time.Time) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"client": c.ID(), "transport": transport.Name(), "duration": time.Since(started).String()})) + log.Debug().Str("transport", "uni_ws").Str("client", c.ID()).Dur("duration", time.Since(started)).Msg("client connection completed") }(time.Now()) } diff --git a/internal/usage/usage.go b/internal/usage/usage.go index 42724ae61b..18a759c36f 100644 --- a/internal/usage/usage.go +++ b/internal/usage/usage.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifugo/v5/internal/build" "github.com/centrifugal/centrifugo/v5/internal/config" "github.com/centrifugal/centrifugo/v5/internal/configtypes" @@ -140,7 +142,7 @@ func (s *Sender) isDev() bool { func (s *Sender) Run(ctx context.Context) error { firstTimeSend := time.Now().Add(initialDelay) if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: schedule next send", map[string]any{"delay": initialDelay.String()})) + log.Debug().Dur("delay", initialDelay).Msg("usage stats: schedule next send") } // Wait 1/4 of a delay to randomize hourly ticks on different nodes. @@ -151,7 +153,7 @@ func (s *Sender) Run(ctx context.Context) error { } if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: start periodic ticks", map[string]any{})) + log.Debug().Msg("usage stats: start periodic ticks") } for { @@ -160,19 +162,19 @@ func (s *Sender) Run(ctx context.Context) error { return ctx.Err() case <-time.After(tickInterval): if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: updating max values", map[string]any{})) + log.Debug().Msg("usage stats: tick") } err := s.updateMaxValues() if err != nil { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "usage stats: error updating max values", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("usage stats: error updating max values") } continue } if time.Now().Before(firstTimeSend) { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: too early to send first time", map[string]any{})) + log.Debug().Msg("usage stats: too early to send first time") } continue } @@ -186,25 +188,25 @@ func (s *Sender) Run(ctx context.Context) error { if lastSentAt > 0 && time.Now().Unix() <= lastSentAt+int64(sendInterval.Seconds()) { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: too early to send", map[string]any{})) + log.Debug().Msg("usage stats: too early to send") } continue } if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: sending usage stats", map[string]any{})) + log.Debug().Msg("usage stats: sending usage stats") } metrics, err := s.prepareMetrics() if err != nil { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "usage stats: error preparing metrics", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("usage stats: error preparing metrics") } continue } err = s.sendUsageStats(metrics, build.UsageStatsEndpoint, build.UsageStatsToken) if err != nil { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "usage stats: error sending", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("usage stats: error sending") } continue } @@ -231,7 +233,7 @@ func (s *Sender) broadcastLastSentAt() { if err := s.node.Notify(LastSentUpdateNotificationOp, data, ""); err != nil { // Issue a single retry. if err = s.node.Notify(LastSentUpdateNotificationOp, data, ""); err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "usage stats: error broadcasting stats lastSentAt", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("usage stats: error broadcasting stats last sent at value") } } } @@ -243,14 +245,14 @@ func (s *Sender) UpdateLastSentAt(data []byte) { var envelope lastSentAtEnvelope err := json.Unmarshal(data, &envelope) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "usage stats: error decoding lastSentAtEnvelope", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("usage stats: error decoding lastSentAtEnvelope") return } s.mu.Lock() defer s.mu.Unlock() if envelope.LastSentAt > s.lastSentAt { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: updating last sent to value from another node", map[string]any{})) + log.Debug().Msg("usage stats: updating last sent to value from another node") } s.lastSentAt = envelope.LastSentAt s.resetMaxValues() @@ -555,9 +557,9 @@ func (s *Sender) sendUsageStats(metrics []*metric, statsEndpoint, statsToken str } if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: sending usage stats", map[string]any{"payload": string(data)})) + log.Debug().Str("payload", string(data)).Msg("usage stats: sending usage stats") } else { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelTrace, "usage stats: sending usage stats", map[string]any{"payload": string(data)})) + log.Trace().Str("payload", string(data)).Msg("usage stats: sending usage stats") } client := &http.Client{ @@ -566,14 +568,14 @@ func (s *Sender) sendUsageStats(metrics []*metric, statsEndpoint, statsToken str if statsEndpoint == "" { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: skip sending due to empty endpoint", map[string]any{})) + log.Debug().Msg("usage stats: skip sending due to empty endpoint") } return nil } if statsToken == "" { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: skip sending due to empty token", map[string]any{})) + log.Debug().Msg("usage stats: skip sending due to empty token") } return nil } @@ -590,7 +592,7 @@ func (s *Sender) sendUsageStats(metrics []*metric, statsEndpoint, statsToken str req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(data)) if err != nil { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: can't create send request", map[string]any{"error": err.Error()})) + log.Debug().Err(err).Msg("usage stats: can't create send request") } continue } @@ -600,7 +602,7 @@ func (s *Sender) sendUsageStats(metrics []*metric, statsEndpoint, statsToken str resp, err := client.Do(req) if err != nil { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: error sending request", map[string]any{"error": err.Error()})) + log.Debug().Err(err).Msg("usage stats: error sending request") } continue } @@ -609,7 +611,7 @@ func (s *Sender) sendUsageStats(metrics []*metric, statsEndpoint, statsToken str if resp.StatusCode != http.StatusOK { if s.isDev() { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "usage stats: unexpected response status code", map[string]any{"status": resp.StatusCode})) + log.Debug().Int("status", resp.StatusCode).Msg("usage stats: unexpected response status code") } continue } diff --git a/internal/wt/handler.go b/internal/wt/handler.go index 08577161a5..bc2f31d40e 100644 --- a/internal/wt/handler.go +++ b/internal/wt/handler.go @@ -5,6 +5,10 @@ import ( "net/http" "time" + "github.com/centrifugal/centrifugo/v5/internal/logging" + + "github.com/rs/zerolog/log" + "github.com/centrifugal/centrifuge" "github.com/centrifugal/protocol" "github.com/quic-go/webtransport-go" @@ -29,7 +33,7 @@ const bidiStreamAcceptTimeout = 10 * time.Second func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { conn, err := s.server.Upgrade(rw, r) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "error upgrading to webtransport", map[string]any{"error": err.Error()})) + log.Info().Str("transport", transportName).Err(err).Msg("error upgrading to webtransport") rw.WriteHeader(http.StatusBadRequest) return } @@ -39,7 +43,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { stream, err := conn.AcceptStream(acceptCtx) if err != nil { acceptCtxCancel() - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "stream accept error", map[string]any{"error": err.Error()})) + log.Error().Err(err).Msg("stream accept error") rw.WriteHeader(http.StatusBadRequest) return } @@ -53,15 +57,15 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { transport := newWebtransportTransport(protoType, conn, stream, s.pingPong) c, closeFn, err := centrifuge.NewClient(r.Context(), s.node, transport) if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]any{"transport": transportName})) + log.Error().Err(err).Msg("error creating client") return } defer func() { _ = closeFn() }() - if s.node.LogEnabled(centrifuge.LogLevelDebug) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"client": c.ID(), "transport": transportName})) + if logging.Enabled(logging.DebugLevel) { + log.Debug().Str("transport", transportName).Str("client", c.ID()).Msg("client connection established") defer func(started time.Time) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"client": c.ID(), "transport": transportName, "duration": time.Since(started).String()})) + log.Debug().Str("transport", transportName).Str("client", c.ID()).Dur("duration", time.Since(started)).Msg("client connection completed") }(time.Now()) } @@ -77,7 +81,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { for { cmd, cmdSize, err := decoder.Decode() if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding command", map[string]any{"client": c.ID(), "transport": transportName, "error": err.Error()})) + log.Error().Err(err).Str("transport", transportName).Str("client", c.ID()).Msg("error decoding command") return } ok := c.HandleCommand(cmd, cmdSize)