From dbb8887795f14b9e1a31a0e4c92330dcfc5a7302 Mon Sep 17 00:00:00 2001 From: Ales Stimec Date: Wed, 12 Jun 2024 10:58:23 +0200 Subject: [PATCH] Juju call monitoring. --- internal/jujuapi/websocket.go | 17 +++++++----- internal/jujuclient/dial.go | 13 +++++++-- internal/rpc/client_test.go | 21 ++++++++++----- internal/rpc/proxy.go | 46 ++++++++++++++++++++++++++------ internal/rpc/proxy_test.go | 10 +++++-- internal/rpc/rpc.go | 6 ++++- internal/servermon/monitoring.go | 13 +++++++++ 7 files changed, 101 insertions(+), 25 deletions(-) diff --git a/internal/jujuapi/websocket.go b/internal/jujuapi/websocket.go index 9d4bbae18..6e28a241e 100644 --- a/internal/jujuapi/websocket.go +++ b/internal/jujuapi/websocket.go @@ -159,15 +159,15 @@ func (s modelProxyServer) ServeWS(ctx context.Context, clientConn *websocket.Con // controllerConnectionFunc returns a function that will be used to // connect to a controller when a client makes a request. -func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerator) func(context.Context) (jimmRPC.WebsocketConnection, string, error) { - return func(ctx context.Context) (jimmRPC.WebsocketConnection, string, error) { +func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerator) func(context.Context) (jimmRPC.WebsocketConnectionWithMetadata, error) { + return func(ctx context.Context) (jimmRPC.WebsocketConnectionWithMetadata, error) { const op = errors.Op("proxy.controllerConnectionFunc") path := jimmhttp.PathElementFromContext(ctx, "path") zapctx.Debug(ctx, "grabbing model info from path", zap.String("path", path)) uuid, finalPath, err := modelInfoFromPath(path) if err != nil { zapctx.Error(ctx, "error parsing path", zap.Error(err)) - return nil, "", errors.E(op, err) + return jimmRPC.WebsocketConnectionWithMetadata{}, errors.E(op, err) } m := dbmodel.Model{ UUID: sql.NullString{ @@ -177,7 +177,7 @@ func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerato } if err := s.jimm.Database.GetModel(context.Background(), &m); err != nil { zapctx.Error(ctx, "failed to find model", zap.String("uuid", uuid), zap.Error(err)) - return nil, "", errors.E(err, errors.CodeNotFound) + return jimmRPC.WebsocketConnectionWithMetadata{}, errors.E(err, errors.CodeNotFound) } jwtGenerator.SetTags(m.ResourceTag(), m.Controller.ResourceTag()) mt := m.ResourceTag() @@ -185,10 +185,15 @@ func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerato controllerConn, err := jimmRPC.Dial(ctx, &m.Controller, mt, finalPath) if err != nil { zapctx.Error(ctx, "cannot dial controller", zap.String("controller", m.Controller.Name), zap.Error(err)) - return nil, "", err + return jimmRPC.WebsocketConnectionWithMetadata{}, err } fullModelName := m.Controller.Name + "/" + m.Name - return controllerConn, fullModelName, nil + return jimmRPC.WebsocketConnectionWithMetadata{ + Conn: controllerConn, + ControllerUUID: m.Controller.UUID, + ModelUUID: m.UUID.String, + ModelName: fullModelName, + }, nil } } diff --git a/internal/jujuclient/dial.go b/internal/jujuclient/dial.go index 887302e28..683c7a067 100644 --- a/internal/jujuclient/dial.go +++ b/internal/jujuclient/dial.go @@ -31,6 +31,7 @@ import ( "github.com/canonical/jimm/internal/jimm" "github.com/canonical/jimm/internal/jimmjwx" "github.com/canonical/jimm/internal/rpc" + "github.com/canonical/jimm/internal/servermon" ) const ( @@ -234,8 +235,16 @@ func (c *Connection) redial(ctx context.Context, requiredPermissions map[string] // Call makes an RPC call to the server. Call sends the request message to // the server and waits for the response to be returned or the context to // be canceled. -func (c *Connection) Call(ctx context.Context, facade string, version int, id, method string, args, resp interface{}) error { - err := c.client.Call(ctx, facade, version, id, method, args, resp) +func (c *Connection) Call(ctx context.Context, facade string, version int, id, method string, args, resp interface{}) (err error) { + labels := []string{facade, method, "", c.mt.Id()} + if c.ctl != nil { + labels = []string{facade, method, c.ctl.UUID, c.mt.Id()} + } + durationObserver := servermon.DurationObserver(servermon.JujuCallDurationHistogram, labels...) + defer durationObserver() + defer servermon.ErrorCounter(servermon.JujuCallErrorCount, &err, labels...) + + err = c.client.Call(ctx, facade, version, id, method, args, resp) if err != nil { if rpcErr, ok := err.(*rpc.Error); ok { // if we get a permission check required error, we redial the controller diff --git a/internal/rpc/client_test.go b/internal/rpc/client_test.go index e284853c0..e93c56d83 100644 --- a/internal/rpc/client_test.go +++ b/internal/rpc/client_test.go @@ -250,10 +250,13 @@ func TestProxySockets(t *testing.T) { errChan := make(chan error) srvJIMM := newServer(func(connClient *websocket.Conn) error { testTokenGen := testTokenGenerator{} - f := func(context.Context) (rpc.WebsocketConnection, string, error) { + f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) { connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL) c.Assert(err, qt.IsNil) - return connController, "TestName", nil + return rpc.WebsocketConnectionWithMetadata{ + Conn: connController, + ModelName: "TestName", + }, nil } auditLogger := func(ale *dbmodel.AuditLogEntry) {} proxyHelpers := rpc.ProxyHelpers{ @@ -297,10 +300,13 @@ func TestCancelProxySockets(t *testing.T) { errChan := make(chan error) srvJIMM := newServer(func(connClient *websocket.Conn) error { testTokenGen := testTokenGenerator{} - f := func(context.Context) (rpc.WebsocketConnection, string, error) { + f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) { connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL) c.Assert(err, qt.IsNil) - return connController, "TestName", nil + return rpc.WebsocketConnectionWithMetadata{ + Conn: connController, + ModelName: "TestName", + }, nil } auditLogger := func(ale *dbmodel.AuditLogEntry) {} proxyHelpers := rpc.ProxyHelpers{ @@ -337,10 +343,13 @@ func TestProxySocketsAuditLogs(t *testing.T) { errChan := make(chan error) srvJIMM := newServer(func(connClient *websocket.Conn) error { testTokenGen := testTokenGenerator{} - f := func(context.Context) (rpc.WebsocketConnection, string, error) { + f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) { connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL) c.Assert(err, qt.IsNil) - return connController, "TestModelName", nil + return rpc.WebsocketConnectionWithMetadata{ + Conn: connController, + ModelName: "TestModelName", + }, nil } auditLogger := func(ale *dbmodel.AuditLogEntry) { auditLogs = append(auditLogs, ale) } proxyHelpers := rpc.ProxyHelpers{ diff --git a/internal/rpc/proxy.go b/internal/rpc/proxy.go index 90b257caf..3cfb6bae4 100644 --- a/internal/rpc/proxy.go +++ b/internal/rpc/proxy.go @@ -19,6 +19,7 @@ import ( "github.com/canonical/jimm/internal/jimm" "github.com/canonical/jimm/internal/jimm/credentials" "github.com/canonical/jimm/internal/openfga" + "github.com/canonical/jimm/internal/servermon" "github.com/canonical/jimm/internal/utils" jimmnames "github.com/canonical/jimm/pkg/names" ) @@ -50,6 +51,15 @@ type WebsocketConnection interface { Close() error } +// WebsocketConnectionWithMetadata holds the websocket connection and metadata about the +// established connection. +type WebsocketConnectionWithMetadata struct { + Conn WebsocketConnection + ControllerUUID string + ModelUUID string + ModelName string +} + // JIMM represents the JIMM interface used by the proxy. type JIMM interface { GetOpenFGAUserAndAuthorise(ctx context.Context, email string) (*openfga.User, error) @@ -62,7 +72,7 @@ type JIMM interface { type ProxyHelpers struct { ConnClient WebsocketConnection TokenGen TokenGenerator - ConnectController func(context.Context) (WebsocketConnection, string, error) + ConnectController func(context.Context) (WebsocketConnectionWithMetadata, error) AuditLog func(*dbmodel.AuditLogEntry) JIMM JIMM AuthenticatedIdentityID string @@ -160,6 +170,9 @@ func (c *writeLockConn) sendMessage(responseObject any, request *message) { } type inflightMsgs struct { + controlerUUID string + modelUUID string + mu sync.Mutex loginMessage *message messages map[uint64]*message @@ -183,10 +196,19 @@ func (msgs *inflightMsgs) addMessage(msg *message) { msgs.mu.Lock() defer msgs.mu.Unlock() + msg.start = time.Now() msgs.messages[msg.RequestID] = msg } func (msgs *inflightMsgs) removeMessage(msg *message) { + // monitor how long it took to handle this message + servermon.JujuCallDurationHistogram.WithLabelValues( + msg.Type, + msg.Request, + msgs.controlerUUID, + msgs.modelUUID, + ).Observe(time.Since(msg.start).Seconds()) + msgs.mu.Lock() defer msgs.mu.Unlock() delete(msgs.messages, msg.RequestID) @@ -226,6 +248,7 @@ func (p *modelProxy) sendError(socket *writeLockConn, req *message, err error) { socket.writeJson(msg) } // An error message is a response back to the client. + servermon.JujuCallErrorCount.WithLabelValues(req.Type, req.Request, p.msgs.controlerUUID, p.msgs.modelUUID) p.auditLogMessage(msg, true) } @@ -278,7 +301,7 @@ type clientProxy struct { modelProxy wg sync.WaitGroup errChan chan error - createControllerConn func(context.Context) (WebsocketConnection, string, error) + createControllerConn func(context.Context) (WebsocketConnectionWithMetadata, error) // mu synchronises changes to closed and modelproxy.dst, dst is is only created // at some unspecified point in the future after a client request. mu sync.Mutex @@ -293,6 +316,7 @@ func (p *clientProxy) start(ctx context.Context) error { p.dst.conn.Close() } }() + for { zapctx.Debug(ctx, "Reading on client connection") msg := new(message) @@ -300,6 +324,7 @@ func (p *clientProxy) start(ctx context.Context) error { // Error reading on the socket implies it is closed, simply return. return err } + zapctx.Debug(ctx, "Read message from client", zap.Any("message", msg)) err := p.makeControllerConnection(ctx) if err != nil { @@ -315,7 +340,7 @@ func (p *clientProxy) start(ctx context.Context) error { toClient, toController, err := p.handleAdminFacade(ctx, msg) if err != nil { p.sendError(p.src, msg, err) - continue + return nil } if toClient != nil { p.src.sendMessage(nil, toClient) @@ -328,15 +353,16 @@ func (p *clientProxy) start(ctx context.Context) error { zapctx.Error(ctx, "Invalid request ID 0") err := errors.E(op, "Invalid request ID 0") p.sendError(p.src, msg, err) - continue + return nil } p.msgs.addMessage(msg) zapctx.Debug(ctx, "Writing to controller") + if err := p.dst.writeJson(msg); err != nil { zapctx.Error(ctx, "clientProxy error writing to dst", zap.Error(err)) p.sendError(p.src, msg, err) p.msgs.removeMessage(msg) - continue + return nil } } } @@ -355,12 +381,16 @@ func (p *clientProxy) makeControllerConnection(ctx context.Context) error { err := errors.E(op, "Client connection closed while starting controller connection") return err } - conn, modelName, err := p.createControllerConn(ctx) + connWithMetadata, err := p.createControllerConn(ctx) if err != nil { return err } - p.modelName = modelName - p.dst = &writeLockConn{conn: conn} + + p.msgs.controlerUUID = connWithMetadata.ControllerUUID + p.msgs.modelUUID = connWithMetadata.ModelUUID + + p.modelName = connWithMetadata.ModelName + p.dst = &writeLockConn{conn: connWithMetadata.Conn} controllerToClient := controllerProxy{ modelProxy: modelProxy{ src: p.dst, diff --git a/internal/rpc/proxy_test.go b/internal/rpc/proxy_test.go index a62f9d697..8c464cb65 100644 --- a/internal/rpc/proxy_test.go +++ b/internal/rpc/proxy_test.go @@ -11,6 +11,7 @@ import ( "github.com/coreos/go-oidc/v3/oidc" qt "github.com/frankban/quicktest" + "github.com/google/uuid" "github.com/juju/juju/rpc/params" "github.com/juju/names/v5" "github.com/lestrrat-go/jwx/v2/jwt" @@ -249,8 +250,13 @@ func TestProxySocketsAdminFacade(t *testing.T) { helpers := rpc.ProxyHelpers{ ConnClient: clientWebsocket, TokenGen: &mockTokenGenerator{}, - ConnectController: func(ctx context.Context) (rpc.WebsocketConnection, string, error) { - return controllerWebsocket, "test model", nil + ConnectController: func(ctx context.Context) (rpc.WebsocketConnectionWithMetadata, error) { + return rpc.WebsocketConnectionWithMetadata{ + Conn: controllerWebsocket, + ModelName: "test model", + ModelUUID: uuid.NewString(), + ControllerUUID: uuid.NewString(), + }, nil }, AuditLog: func(*dbmodel.AuditLogEntry) {}, JIMM: &mockJIMM{ diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index 8d372e83c..f86733d08 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -7,12 +7,16 @@ // flexible than the juju implementation. package rpc -import "encoding/json" +import ( + "encoding/json" + "time" +) // A message encodes a single message sent, or received, over an RPC // connection. It contains the union of fields in a request or response // message. type message struct { + start time.Time RequestID uint64 `json:"request-id,omitempty"` Type string `json:"type,omitempty"` Version int `json:"version,omitempty"` diff --git a/internal/servermon/monitoring.go b/internal/servermon/monitoring.go index f57438381..f8aec1e91 100644 --- a/internal/servermon/monitoring.go +++ b/internal/servermon/monitoring.go @@ -63,6 +63,19 @@ var ( Name: "error_total", Help: "The number of vault call errors.", }, []string{"method"}) + JujuCallDurationHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "jimm", + Subsystem: "juju", + Name: "call_duration_seconds", + Help: "Histogram of juju call time in seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + }, []string{"facade", "method", "controller", "model"}) + JujuCallErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "jimm", + Subsystem: "juju", + Name: "error_total", + Help: "The number of juju call errors.", + }, []string{"facade", "method", "controller", "model"}) ConcurrentWebsocketConnections = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "jimm", Subsystem: "websocket",