Skip to content

Commit

Permalink
Juju call monitoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesstimec committed Jun 12, 2024
1 parent 15d374c commit dbb8887
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 25 deletions.
17 changes: 11 additions & 6 deletions internal/jujuapi/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ func (s modelProxyServer) ServeWS(ctx context.Context, clientConn *websocket.Con

// controllerConnectionFunc returns a function that will be used to
// connect to a controller when a client makes a request.
func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerator) func(context.Context) (jimmRPC.WebsocketConnection, string, error) {
return func(ctx context.Context) (jimmRPC.WebsocketConnection, string, error) {
func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerator) func(context.Context) (jimmRPC.WebsocketConnectionWithMetadata, error) {
return func(ctx context.Context) (jimmRPC.WebsocketConnectionWithMetadata, error) {
const op = errors.Op("proxy.controllerConnectionFunc")
path := jimmhttp.PathElementFromContext(ctx, "path")
zapctx.Debug(ctx, "grabbing model info from path", zap.String("path", path))
uuid, finalPath, err := modelInfoFromPath(path)
if err != nil {
zapctx.Error(ctx, "error parsing path", zap.Error(err))
return nil, "", errors.E(op, err)
return jimmRPC.WebsocketConnectionWithMetadata{}, errors.E(op, err)
}
m := dbmodel.Model{
UUID: sql.NullString{
Expand All @@ -177,18 +177,23 @@ func controllerConnectionFunc(s modelProxyServer, jwtGenerator *jimm.JWTGenerato
}
if err := s.jimm.Database.GetModel(context.Background(), &m); err != nil {
zapctx.Error(ctx, "failed to find model", zap.String("uuid", uuid), zap.Error(err))
return nil, "", errors.E(err, errors.CodeNotFound)
return jimmRPC.WebsocketConnectionWithMetadata{}, errors.E(err, errors.CodeNotFound)
}
jwtGenerator.SetTags(m.ResourceTag(), m.Controller.ResourceTag())
mt := m.ResourceTag()
zapctx.Debug(ctx, "Dialing Controller", zap.String("path", path))
controllerConn, err := jimmRPC.Dial(ctx, &m.Controller, mt, finalPath)
if err != nil {
zapctx.Error(ctx, "cannot dial controller", zap.String("controller", m.Controller.Name), zap.Error(err))
return nil, "", err
return jimmRPC.WebsocketConnectionWithMetadata{}, err
}
fullModelName := m.Controller.Name + "/" + m.Name
return controllerConn, fullModelName, nil
return jimmRPC.WebsocketConnectionWithMetadata{
Conn: controllerConn,
ControllerUUID: m.Controller.UUID,
ModelUUID: m.UUID.String,
ModelName: fullModelName,
}, nil
}
}

Expand Down
13 changes: 11 additions & 2 deletions internal/jujuclient/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/canonical/jimm/internal/jimm"
"github.com/canonical/jimm/internal/jimmjwx"
"github.com/canonical/jimm/internal/rpc"
"github.com/canonical/jimm/internal/servermon"
)

const (
Expand Down Expand Up @@ -234,8 +235,16 @@ func (c *Connection) redial(ctx context.Context, requiredPermissions map[string]
// Call makes an RPC call to the server. Call sends the request message to
// the server and waits for the response to be returned or the context to
// be canceled.
func (c *Connection) Call(ctx context.Context, facade string, version int, id, method string, args, resp interface{}) error {
err := c.client.Call(ctx, facade, version, id, method, args, resp)
func (c *Connection) Call(ctx context.Context, facade string, version int, id, method string, args, resp interface{}) (err error) {
labels := []string{facade, method, "", c.mt.Id()}
if c.ctl != nil {
labels = []string{facade, method, c.ctl.UUID, c.mt.Id()}
}
durationObserver := servermon.DurationObserver(servermon.JujuCallDurationHistogram, labels...)
defer durationObserver()
defer servermon.ErrorCounter(servermon.JujuCallErrorCount, &err, labels...)

err = c.client.Call(ctx, facade, version, id, method, args, resp)
if err != nil {
if rpcErr, ok := err.(*rpc.Error); ok {
// if we get a permission check required error, we redial the controller
Expand Down
21 changes: 15 additions & 6 deletions internal/rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,13 @@ func TestProxySockets(t *testing.T) {
errChan := make(chan error)
srvJIMM := newServer(func(connClient *websocket.Conn) error {
testTokenGen := testTokenGenerator{}
f := func(context.Context) (rpc.WebsocketConnection, string, error) {
f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) {
connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL)
c.Assert(err, qt.IsNil)
return connController, "TestName", nil
return rpc.WebsocketConnectionWithMetadata{
Conn: connController,
ModelName: "TestName",
}, nil
}
auditLogger := func(ale *dbmodel.AuditLogEntry) {}
proxyHelpers := rpc.ProxyHelpers{
Expand Down Expand Up @@ -297,10 +300,13 @@ func TestCancelProxySockets(t *testing.T) {
errChan := make(chan error)
srvJIMM := newServer(func(connClient *websocket.Conn) error {
testTokenGen := testTokenGenerator{}
f := func(context.Context) (rpc.WebsocketConnection, string, error) {
f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) {
connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL)
c.Assert(err, qt.IsNil)
return connController, "TestName", nil
return rpc.WebsocketConnectionWithMetadata{
Conn: connController,
ModelName: "TestName",
}, nil
}
auditLogger := func(ale *dbmodel.AuditLogEntry) {}
proxyHelpers := rpc.ProxyHelpers{
Expand Down Expand Up @@ -337,10 +343,13 @@ func TestProxySocketsAuditLogs(t *testing.T) {
errChan := make(chan error)
srvJIMM := newServer(func(connClient *websocket.Conn) error {
testTokenGen := testTokenGenerator{}
f := func(context.Context) (rpc.WebsocketConnection, string, error) {
f := func(context.Context) (rpc.WebsocketConnectionWithMetadata, error) {
connController, err := srvController.dialer.DialWebsocket(ctx, srvController.URL)
c.Assert(err, qt.IsNil)
return connController, "TestModelName", nil
return rpc.WebsocketConnectionWithMetadata{
Conn: connController,
ModelName: "TestModelName",
}, nil
}
auditLogger := func(ale *dbmodel.AuditLogEntry) { auditLogs = append(auditLogs, ale) }
proxyHelpers := rpc.ProxyHelpers{
Expand Down
46 changes: 38 additions & 8 deletions internal/rpc/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/canonical/jimm/internal/jimm"
"github.com/canonical/jimm/internal/jimm/credentials"
"github.com/canonical/jimm/internal/openfga"
"github.com/canonical/jimm/internal/servermon"
"github.com/canonical/jimm/internal/utils"
jimmnames "github.com/canonical/jimm/pkg/names"
)
Expand Down Expand Up @@ -50,6 +51,15 @@ type WebsocketConnection interface {
Close() error
}

// WebsocketConnectionWithMetadata holds the websocket connection and metadata about the
// established connection.
type WebsocketConnectionWithMetadata struct {
Conn WebsocketConnection
ControllerUUID string
ModelUUID string
ModelName string
}

// JIMM represents the JIMM interface used by the proxy.
type JIMM interface {
GetOpenFGAUserAndAuthorise(ctx context.Context, email string) (*openfga.User, error)
Expand All @@ -62,7 +72,7 @@ type JIMM interface {
type ProxyHelpers struct {
ConnClient WebsocketConnection
TokenGen TokenGenerator
ConnectController func(context.Context) (WebsocketConnection, string, error)
ConnectController func(context.Context) (WebsocketConnectionWithMetadata, error)
AuditLog func(*dbmodel.AuditLogEntry)
JIMM JIMM
AuthenticatedIdentityID string
Expand Down Expand Up @@ -160,6 +170,9 @@ func (c *writeLockConn) sendMessage(responseObject any, request *message) {
}

type inflightMsgs struct {
controlerUUID string
modelUUID string

mu sync.Mutex
loginMessage *message
messages map[uint64]*message
Expand All @@ -183,10 +196,19 @@ func (msgs *inflightMsgs) addMessage(msg *message) {
msgs.mu.Lock()
defer msgs.mu.Unlock()

msg.start = time.Now()
msgs.messages[msg.RequestID] = msg
}

func (msgs *inflightMsgs) removeMessage(msg *message) {
// monitor how long it took to handle this message
servermon.JujuCallDurationHistogram.WithLabelValues(
msg.Type,
msg.Request,
msgs.controlerUUID,
msgs.modelUUID,
).Observe(time.Since(msg.start).Seconds())

msgs.mu.Lock()
defer msgs.mu.Unlock()
delete(msgs.messages, msg.RequestID)
Expand Down Expand Up @@ -226,6 +248,7 @@ func (p *modelProxy) sendError(socket *writeLockConn, req *message, err error) {
socket.writeJson(msg)
}
// An error message is a response back to the client.
servermon.JujuCallErrorCount.WithLabelValues(req.Type, req.Request, p.msgs.controlerUUID, p.msgs.modelUUID)
p.auditLogMessage(msg, true)
}

Expand Down Expand Up @@ -278,7 +301,7 @@ type clientProxy struct {
modelProxy
wg sync.WaitGroup
errChan chan error
createControllerConn func(context.Context) (WebsocketConnection, string, error)
createControllerConn func(context.Context) (WebsocketConnectionWithMetadata, error)
// mu synchronises changes to closed and modelproxy.dst, dst is is only created
// at some unspecified point in the future after a client request.
mu sync.Mutex
Expand All @@ -293,13 +316,15 @@ func (p *clientProxy) start(ctx context.Context) error {
p.dst.conn.Close()
}
}()

for {
zapctx.Debug(ctx, "Reading on client connection")
msg := new(message)
if err := p.src.readJson(&msg); err != nil {
// Error reading on the socket implies it is closed, simply return.
return err
}

zapctx.Debug(ctx, "Read message from client", zap.Any("message", msg))
err := p.makeControllerConnection(ctx)
if err != nil {
Expand All @@ -315,7 +340,7 @@ func (p *clientProxy) start(ctx context.Context) error {
toClient, toController, err := p.handleAdminFacade(ctx, msg)
if err != nil {
p.sendError(p.src, msg, err)
continue
return nil
}
if toClient != nil {
p.src.sendMessage(nil, toClient)
Expand All @@ -328,15 +353,16 @@ func (p *clientProxy) start(ctx context.Context) error {
zapctx.Error(ctx, "Invalid request ID 0")
err := errors.E(op, "Invalid request ID 0")
p.sendError(p.src, msg, err)
continue
return nil
}
p.msgs.addMessage(msg)
zapctx.Debug(ctx, "Writing to controller")

if err := p.dst.writeJson(msg); err != nil {
zapctx.Error(ctx, "clientProxy error writing to dst", zap.Error(err))
p.sendError(p.src, msg, err)
p.msgs.removeMessage(msg)
continue
return nil
}
}
}
Expand All @@ -355,12 +381,16 @@ func (p *clientProxy) makeControllerConnection(ctx context.Context) error {
err := errors.E(op, "Client connection closed while starting controller connection")
return err
}
conn, modelName, err := p.createControllerConn(ctx)
connWithMetadata, err := p.createControllerConn(ctx)
if err != nil {
return err
}
p.modelName = modelName
p.dst = &writeLockConn{conn: conn}

p.msgs.controlerUUID = connWithMetadata.ControllerUUID
p.msgs.modelUUID = connWithMetadata.ModelUUID

p.modelName = connWithMetadata.ModelName
p.dst = &writeLockConn{conn: connWithMetadata.Conn}
controllerToClient := controllerProxy{
modelProxy: modelProxy{
src: p.dst,
Expand Down
10 changes: 8 additions & 2 deletions internal/rpc/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/coreos/go-oidc/v3/oidc"
qt "github.com/frankban/quicktest"
"github.com/google/uuid"
"github.com/juju/juju/rpc/params"
"github.com/juju/names/v5"
"github.com/lestrrat-go/jwx/v2/jwt"
Expand Down Expand Up @@ -249,8 +250,13 @@ func TestProxySocketsAdminFacade(t *testing.T) {
helpers := rpc.ProxyHelpers{
ConnClient: clientWebsocket,
TokenGen: &mockTokenGenerator{},
ConnectController: func(ctx context.Context) (rpc.WebsocketConnection, string, error) {
return controllerWebsocket, "test model", nil
ConnectController: func(ctx context.Context) (rpc.WebsocketConnectionWithMetadata, error) {
return rpc.WebsocketConnectionWithMetadata{
Conn: controllerWebsocket,
ModelName: "test model",
ModelUUID: uuid.NewString(),
ControllerUUID: uuid.NewString(),
}, nil
},
AuditLog: func(*dbmodel.AuditLogEntry) {},
JIMM: &mockJIMM{
Expand Down
6 changes: 5 additions & 1 deletion internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
// flexible than the juju implementation.
package rpc

import "encoding/json"
import (
"encoding/json"
"time"
)

// A message encodes a single message sent, or received, over an RPC
// connection. It contains the union of fields in a request or response
// message.
type message struct {
start time.Time
RequestID uint64 `json:"request-id,omitempty"`
Type string `json:"type,omitempty"`
Version int `json:"version,omitempty"`
Expand Down
13 changes: 13 additions & 0 deletions internal/servermon/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ var (
Name: "error_total",
Help: "The number of vault call errors.",
}, []string{"method"})
JujuCallDurationHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "jimm",
Subsystem: "juju",
Name: "call_duration_seconds",
Help: "Histogram of juju call time in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, []string{"facade", "method", "controller", "model"})
JujuCallErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "jimm",
Subsystem: "juju",
Name: "error_total",
Help: "The number of juju call errors.",
}, []string{"facade", "method", "controller", "model"})
ConcurrentWebsocketConnections = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "jimm",
Subsystem: "websocket",
Expand Down

0 comments on commit dbb8887

Please sign in to comment.