From 10d5101230819e4d7e76ac2f1c66f357286bc23c Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 6 Apr 2023 10:31:16 -0400 Subject: [PATCH 1/4] Add fine grained metrics+logging for handling, processing, and grab lock times in snow handler --- snow/networking/handler/handler.go | 51 +++++++++++++++++++++++------- snow/networking/handler/metrics.go | 41 ++++++++++++++++++------ 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 6d45b36f176f..6d788e40e91a 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -423,6 +423,8 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { op = msg.Op() body = msg.Message() startTime = h.clock.Time() + // Check if the chain is in normal operation at the start of message execution (may change during execution) + isNormalOp = h.ctx.State.Get().State == snow.NormalOp ) h.ctx.Log.Debug("forwarding sync message to consensus", zap.Stringer("nodeID", nodeID), @@ -435,23 +437,30 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { ) h.resourceTracker.StartProcessing(nodeID, startTime) h.ctx.Lock.Lock() + lockAcquiredTime := h.clock.Time() defer func() { h.ctx.Lock.Unlock() var ( - endTime = h.clock.Time() - histogram = h.metrics.messages[op] - processingTime = endTime.Sub(startTime) + endTime = h.clock.Time() + messageHistograms = h.metrics.messages[op] + acquireLockTime = lockAcquiredTime.Sub(startTime) + processingTime = endTime.Sub(lockAcquiredTime) + handlingTime = endTime.Sub(startTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - histogram.Observe(float64(processingTime)) + messageHistograms.handlingTime.Observe(float64(handlingTime)) + messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) + messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling sync message", zap.Stringer("messageOp", op), ) - if processingTime > syncProcessingTimeWarnLimit && h.ctx.State.Get().State == snow.NormalOp { + if handlingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling sync message took longer than expected", + zap.Duration("handlingTime", handlingTime), zap.Duration("processingTime", processingTime), + zap.Duration("acquireLockTime", acquireLockTime), zap.Stringer("nodeID", nodeID), zap.Stringer("messageOp", op), zap.Any("message", body), @@ -756,11 +765,14 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { h.resourceTracker.StartProcessing(nodeID, startTime) defer func() { var ( - endTime = h.clock.Time() - histogram = h.metrics.messages[op] + endTime = h.clock.Time() + messageHistograms = h.metrics.messages[op] ) h.resourceTracker.StopProcessing(nodeID, endTime) - histogram.Observe(float64(endTime.Sub(startTime))) + // Processing an async message does not grab a lock, so handling/processing + // times are identical and we skip observing [acquireLockTime]. + messageHistograms.handlingTime.Observe(float64(endTime.Sub(startTime))) + messageHistograms.processingTime.Observe(float64(endTime.Sub(startTime))) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling async message", zap.Stringer("messageOp", op), @@ -835,6 +847,8 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { op = msg.Op() body = msg.Message() startTime = h.clock.Time() + // Check if the chain is in normal operation at the start of message execution (may change during execution) + isNormalOp = h.ctx.State.Get().State == snow.NormalOp ) h.ctx.Log.Debug("forwarding chan message to consensus", zap.Stringer("messageOp", op), @@ -844,18 +858,33 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { zap.Any("message", body), ) h.ctx.Lock.Lock() + lockAcquiredTime := h.clock.Time() defer func() { h.ctx.Lock.Unlock() var ( - endTime = h.clock.Time() - histogram = h.metrics.messages[op] + endTime = h.clock.Time() + messageHistograms = h.metrics.messages[op] + acquireLockTime = lockAcquiredTime.Sub(startTime) + processingTime = endTime.Sub(lockAcquiredTime) + handlingTime = endTime.Sub(startTime) ) - histogram.Observe(float64(endTime.Sub(startTime))) + messageHistograms.handlingTime.Observe(float64(handlingTime)) + messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) + messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling chan message", zap.Stringer("messageOp", op), ) + if handlingTime > syncProcessingTimeWarnLimit && isNormalOp { + h.ctx.Log.Warn("handling chan message took longer than expected", + zap.Duration("handlingTime", handlingTime), + zap.Duration("processingTime", processingTime), + zap.Duration("acquireLockTime", acquireLockTime), + zap.Stringer("messageOp", op), + zap.Any("message", body), + ) + } }() state := h.ctx.State.Get() diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index 65e8f8f0de76..a1bb4a2d3a23 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -16,7 +16,13 @@ import ( type metrics struct { expired prometheus.Counter asyncExpired prometheus.Counter - messages map[message.Op]metric.Averager + messages map[message.Op]*messageProcessing +} + +type messageProcessing struct { + handlingTime metric.Averager + processingTime metric.Averager + acquireLockTime metric.Averager } func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { @@ -37,16 +43,33 @@ func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { reg.Register(asyncExpired), ) - messages := make(map[message.Op]metric.Averager, len(message.ConsensusOps)) + messages := make(map[message.Op]*messageProcessing, len(message.ConsensusOps)) for _, op := range message.ConsensusOps { opStr := op.String() - messages[op] = metric.NewAveragerWithErrs( - namespace, - opStr, - fmt.Sprintf("time (in ns) of processing a %s", opStr), - reg, - &errs, - ) + messageProcessing := &messageProcessing{ + handlingTime: metric.NewAveragerWithErrs( + namespace, + opStr, + fmt.Sprintf("time (in ns) of handling a %s", opStr), + reg, + &errs, + ), + processingTime: metric.NewAveragerWithErrs( + namespace, + fmt.Sprintf("%s_processing", opStr), + fmt.Sprintf("time (in ns) of processing a %s", opStr), + reg, + &errs, + ), + acquireLockTime: metric.NewAveragerWithErrs( + namespace, + fmt.Sprintf("%s_lock", opStr), + fmt.Sprintf("time (in ns) of acquiring a lock to process a %s", opStr), + reg, + &errs, + ), + } + messages[op] = messageProcessing } return &metrics{ From e25d4d2b5f27aada24dc03a6af0fc34067c57f5c Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 6 Apr 2023 11:30:33 -0400 Subject: [PATCH 2/4] Address PR comments to avoid using derivative metrics --- snow/networking/handler/handler.go | 18 +++++------------- snow/networking/handler/metrics.go | 10 +--------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 6d788e40e91a..1b5f1d36763b 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -445,20 +445,17 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { endTime = h.clock.Time() messageHistograms = h.metrics.messages[op] acquireLockTime = lockAcquiredTime.Sub(startTime) - processingTime = endTime.Sub(lockAcquiredTime) - handlingTime = endTime.Sub(startTime) + processingTime = endTime.Sub(startTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - messageHistograms.handlingTime.Observe(float64(handlingTime)) messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling sync message", zap.Stringer("messageOp", op), ) - if handlingTime > syncProcessingTimeWarnLimit && isNormalOp { + if processingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling sync message took longer than expected", - zap.Duration("handlingTime", handlingTime), zap.Duration("processingTime", processingTime), zap.Duration("acquireLockTime", acquireLockTime), zap.Stringer("nodeID", nodeID), @@ -769,9 +766,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { messageHistograms = h.metrics.messages[op] ) h.resourceTracker.StopProcessing(nodeID, endTime) - // Processing an async message does not grab a lock, so handling/processing - // times are identical and we skip observing [acquireLockTime]. - messageHistograms.handlingTime.Observe(float64(endTime.Sub(startTime))) + // Processing an async message does not grab a lock, so we skip observing [acquireLockTime]. messageHistograms.processingTime.Observe(float64(endTime.Sub(startTime))) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling async message", @@ -866,19 +861,16 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { endTime = h.clock.Time() messageHistograms = h.metrics.messages[op] acquireLockTime = lockAcquiredTime.Sub(startTime) - processingTime = endTime.Sub(lockAcquiredTime) - handlingTime = endTime.Sub(startTime) + processingTime = endTime.Sub(startTime) ) - messageHistograms.handlingTime.Observe(float64(handlingTime)) messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling chan message", zap.Stringer("messageOp", op), ) - if handlingTime > syncProcessingTimeWarnLimit && isNormalOp { + if processingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling chan message took longer than expected", - zap.Duration("handlingTime", handlingTime), zap.Duration("processingTime", processingTime), zap.Duration("acquireLockTime", acquireLockTime), zap.Stringer("messageOp", op), diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index a1bb4a2d3a23..4e5091dff61a 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -20,7 +20,6 @@ type metrics struct { } type messageProcessing struct { - handlingTime metric.Averager processingTime metric.Averager acquireLockTime metric.Averager } @@ -47,20 +46,13 @@ func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { for _, op := range message.ConsensusOps { opStr := op.String() messageProcessing := &messageProcessing{ - handlingTime: metric.NewAveragerWithErrs( + processingTime: metric.NewAveragerWithErrs( namespace, opStr, fmt.Sprintf("time (in ns) of handling a %s", opStr), reg, &errs, ), - processingTime: metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("%s_processing", opStr), - fmt.Sprintf("time (in ns) of processing a %s", opStr), - reg, - &errs, - ), acquireLockTime: metric.NewAveragerWithErrs( namespace, fmt.Sprintf("%s_lock", opStr), From ef1e5d91a3b6ee60a0c1b00aa712648a7fe463c0 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 6 Apr 2023 15:06:31 -0400 Subject: [PATCH 3/4] Address renaming comments --- snow/networking/handler/handler.go | 18 ++++++++++-------- snow/networking/handler/metrics.go | 8 ++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 1b5f1d36763b..623495472302 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -444,11 +444,11 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { var ( endTime = h.clock.Time() messageHistograms = h.metrics.messages[op] - acquireLockTime = lockAcquiredTime.Sub(startTime) + msgHandlingTime = lockAcquiredTime.Sub(startTime) processingTime = endTime.Sub(startTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) + messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime)) messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling sync message", @@ -457,7 +457,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { if processingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling sync message took longer than expected", zap.Duration("processingTime", processingTime), - zap.Duration("acquireLockTime", acquireLockTime), + zap.Duration("msgHandlingTime", msgHandlingTime), zap.Stringer("nodeID", nodeID), zap.Stringer("messageOp", op), zap.Any("message", body), @@ -764,10 +764,12 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { var ( endTime = h.clock.Time() messageHistograms = h.metrics.messages[op] + processingTime = endTime.Sub(startTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - // Processing an async message does not grab a lock, so we skip observing [acquireLockTime]. - messageHistograms.processingTime.Observe(float64(endTime.Sub(startTime))) + // There is no lock grabbed here, so both metrics are identical + messageHistograms.processingTime.Observe(float64(processingTime)) + messageHistograms.msgHandlingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling async message", zap.Stringer("messageOp", op), @@ -860,10 +862,10 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { var ( endTime = h.clock.Time() messageHistograms = h.metrics.messages[op] - acquireLockTime = lockAcquiredTime.Sub(startTime) + msgHandlingTime = lockAcquiredTime.Sub(startTime) processingTime = endTime.Sub(startTime) ) - messageHistograms.acquireLockTime.Observe(float64(acquireLockTime)) + messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime)) messageHistograms.processingTime.Observe(float64(processingTime)) msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling chan message", @@ -872,7 +874,7 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { if processingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling chan message took longer than expected", zap.Duration("processingTime", processingTime), - zap.Duration("acquireLockTime", acquireLockTime), + zap.Duration("msgHandlingTime", msgHandlingTime), zap.Stringer("messageOp", op), zap.Any("message", body), ) diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index 4e5091dff61a..e3a9f0b7d29d 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -21,7 +21,7 @@ type metrics struct { type messageProcessing struct { processingTime metric.Averager - acquireLockTime metric.Averager + msgHandlingTime metric.Averager } func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { @@ -53,10 +53,10 @@ func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { reg, &errs, ), - acquireLockTime: metric.NewAveragerWithErrs( + msgHandlingTime: metric.NewAveragerWithErrs( namespace, - fmt.Sprintf("%s_lock", opStr), - fmt.Sprintf("time (in ns) of acquiring a lock to process a %s", opStr), + fmt.Sprintf("%s_msg_handling", opStr), + fmt.Sprintf("time (in ns) handling a %s after grabbing the lock", opStr), reg, &errs, ), From fc6131fe22865976f0fe1c6edc45ecd6e10a17f7 Mon Sep 17 00:00:00 2001 From: Stephen Date: Thu, 6 Apr 2023 15:29:13 -0400 Subject: [PATCH 4/4] ocd nits --- snow/networking/handler/handler.go | 6 ++++-- snow/networking/handler/metrics.go | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 623495472302..9c56a129d78b 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -423,7 +423,8 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { op = msg.Op() body = msg.Message() startTime = h.clock.Time() - // Check if the chain is in normal operation at the start of message execution (may change during execution) + // Check if the chain is in normal operation at the start of message + // execution (may change during execution) isNormalOp = h.ctx.State.Get().State == snow.NormalOp ) h.ctx.Log.Debug("forwarding sync message to consensus", @@ -844,7 +845,8 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { op = msg.Op() body = msg.Message() startTime = h.clock.Time() - // Check if the chain is in normal operation at the start of message execution (may change during execution) + // Check if the chain is in normal operation at the start of message + // execution (may change during execution) isNormalOp = h.ctx.State.Get().State == snow.NormalOp ) h.ctx.Log.Debug("forwarding chan message to consensus", diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index e3a9f0b7d29d..a8776b30832e 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -49,14 +49,14 @@ func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { processingTime: metric.NewAveragerWithErrs( namespace, opStr, - fmt.Sprintf("time (in ns) of handling a %s", opStr), + fmt.Sprintf("time (in ns) spent handling a %s", opStr), reg, &errs, ), msgHandlingTime: metric.NewAveragerWithErrs( namespace, fmt.Sprintf("%s_msg_handling", opStr), - fmt.Sprintf("time (in ns) handling a %s after grabbing the lock", opStr), + fmt.Sprintf("time (in ns) spent handling a %s after grabbing the lock", opStr), reg, &errs, ),