Skip to content

Commit

Permalink
fix how monitoring proxy messages is handled (#1276)
Browse files Browse the repository at this point in the history
  • Loading branch information
kian99 authored Jul 22, 2024
1 parent 413fc6e commit ea59fdc
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions internal/rpc/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (c *writeLockConn) sendMessage(responseObject any, request *message) {
c.writeJson(msg)
}

// inflightMsgs holds only request messages that are
// still pending a response from a Juju controller.
type inflightMsgs struct {
controllerUUID string

Expand Down Expand Up @@ -198,17 +200,23 @@ func (msgs *inflightMsgs) addMessage(msg *message) {
msgs.messages[msg.RequestID] = msg
}

func (msgs *inflightMsgs) removeMessage(msg *message) {
// monitor how long it took to handle this message
servermon.JujuCallDurationHistogram.WithLabelValues(
msg.Type,
msg.Request,
msgs.controllerUUID,
).Observe(time.Since(msg.start).Seconds())

// removeMessage deletes the request message that corresponds
// to the responses message ID.
func (msgs *inflightMsgs) removeMessage(msgID uint64) {
msgs.mu.Lock()
defer msgs.mu.Unlock()
delete(msgs.messages, msg.RequestID)
req, ok := msgs.messages[msgID]
if ok {
delete(msgs.messages, msgID)
}
msgs.mu.Unlock()

if ok {
servermon.JujuCallDurationHistogram.WithLabelValues(
req.Type,
req.Request,
msgs.controllerUUID,
).Observe(time.Since(req.start).Seconds())
}
}

func (msgs *inflightMsgs) getMessage(key uint64) *message {
Expand Down Expand Up @@ -353,7 +361,7 @@ func (p *clientProxy) start(ctx context.Context) error {
if err := p.dst.writeJson(msg); err != nil {
zapctx.Error(ctx, "clientProxy error writing to dst", zap.Error(err))
p.sendError(p.src, msg, err)
p.msgs.removeMessage(msg)
p.msgs.removeMessage(msg.RequestID)
continue
}
}
Expand Down Expand Up @@ -445,7 +453,7 @@ func (p *controllerProxy) start(ctx context.Context) error {
return err
}
}
p.msgs.removeMessage(msg)
p.msgs.removeMessage(msg.RequestID)
p.auditLogMessage(msg, true)
zapctx.Debug(ctx, "Writing modified message to client", zap.Any("Message", msg))
if err := p.dst.writeJson(msg); err != nil {
Expand All @@ -457,7 +465,7 @@ func (p *controllerProxy) start(ctx context.Context) error {

func (p *controllerProxy) handleError(msg *message, err error) {
p.sendError(p.dst, msg, err)
p.msgs.removeMessage(msg)
p.msgs.removeMessage(msg.RequestID)
}

// checkPermissionsRequired returns a nil map if no permissions are required.
Expand Down

0 comments on commit ea59fdc

Please sign in to comment.