Skip to content

Commit

Permalink
refactor: use instance uid as connection key in opamp server (#1390)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Jul 24, 2024
1 parent 793503c commit 81388a5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
2 changes: 1 addition & 1 deletion odiglet/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ARG DOTNET_OTEL_VERSION=v0.7.0
ADD https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/releases/download/$DOTNET_OTEL_VERSION/opentelemetry-dotnet-instrumentation-linux-musl.zip .
RUN unzip opentelemetry-dotnet-instrumentation-linux-musl.zip && rm opentelemetry-dotnet-instrumentation-linux-musl.zip

FROM --platform=$BUILDPLATFORM keyval/odiglet-base:v1.4 as builder
FROM --platform=$BUILDPLATFORM keyval/odiglet-base:v1.4 AS builder
WORKDIR /go/src/github.com/odigos-io/odigos
# Copy local modules required by the build
COPY api/ api/
Expand Down
20 changes: 10 additions & 10 deletions opampserver/pkg/connection/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func NewConnectionsCache() *ConnectionsCache {
}
}

// GetConnection returns the connection information for the given device id.
// GetConnection returns the connection information for the given OpAMP instanceUid.
// the returned object is a by-value copy of the connection information, so it can be safely used.
// To change something in the connection information, use the functions below which are synced and safe.
func (c *ConnectionsCache) GetConnection(deviceId string) (*ConnectionInfo, bool) {
func (c *ConnectionsCache) GetConnection(instanceUid string) (*ConnectionInfo, bool) {
c.mux.Lock()
defer c.mux.Unlock()
conn, ok := c.liveConnections[deviceId]
conn, ok := c.liveConnections[instanceUid]
if !ok || conn == nil {
return nil, false
} else {
Expand All @@ -47,30 +47,30 @@ func (c *ConnectionsCache) GetConnection(deviceId string) (*ConnectionInfo, bool
}
}

func (c *ConnectionsCache) AddConnection(deviceId string, conn *ConnectionInfo) {
func (c *ConnectionsCache) AddConnection(instanceUid string, conn *ConnectionInfo) {
// copy the conn object to avoid it being accessed concurrently
connCopy := *conn
c.mux.Lock()
defer c.mux.Unlock()
c.liveConnections[deviceId] = &connCopy
c.liveConnections[instanceUid] = &connCopy
}

func (c *ConnectionsCache) RemoveConnection(deviceId string) {
func (c *ConnectionsCache) RemoveConnection(instanceUid string) {
c.mux.Lock()
defer c.mux.Unlock()
delete(c.liveConnections, deviceId)
delete(c.liveConnections, instanceUid)
}

func (c *ConnectionsCache) RecordMessageTime(deviceId string) {
func (c *ConnectionsCache) RecordMessageTime(instanceUid string) {
c.mux.Lock()
defer c.mux.Unlock()

conn, ok := c.liveConnections[deviceId]
conn, ok := c.liveConnections[instanceUid]
if !ok {
return
}
conn.lastMessageTime = time.Now()
c.liveConnections[deviceId] = conn
c.liveConnections[instanceUid] = conn
}

func (c *ConnectionsCache) CleanupStaleConnections() []ConnectionInfo {
Expand Down
15 changes: 11 additions & 4 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}

instanceUid := string(agentToServer.InstanceUid)
if instanceUid == "" {
logger.Error(err, "InstanceUid is missing")
w.WriteHeader(http.StatusBadRequest)
return
}

deviceId := req.Header.Get("X-Odigos-DeviceId")
if deviceId == "" {
logger.Error(err, "X-Odigos-DeviceId header is missing")
Expand All @@ -73,7 +80,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
}

var serverToAgent *protobufs.ServerToAgent
connectionInfo, exists := connectionCache.GetConnection(deviceId)
connectionInfo, exists := connectionCache.GetConnection(instanceUid)
if !exists {
connectionInfo, serverToAgent, err = handlers.OnNewConnection(ctx, deviceId, &agentToServer)
if err != nil {
Expand All @@ -82,13 +89,13 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}
if connectionInfo != nil {
connectionCache.AddConnection(deviceId, connectionInfo)
connectionCache.AddConnection(instanceUid, connectionInfo)
}
} else {

if agentToServer.AgentDisconnect != nil {
handlers.OnConnectionClosed(ctx, connectionInfo)
connectionCache.RemoveConnection(deviceId)
connectionCache.RemoveConnection(instanceUid)
}

serverToAgent, err = handlers.OnAgentToServerMessage(ctx, &agentToServer, connectionInfo)
Expand All @@ -113,7 +120,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
}

// keep record in memory of last message time, to detect stale connections
connectionCache.RecordMessageTime(deviceId)
connectionCache.RecordMessageTime(instanceUid)

serverToAgent.InstanceUid = agentToServer.InstanceUid

Expand Down

0 comments on commit 81388a5

Please sign in to comment.