Skip to content

Commit

Permalink
Enable clients to set flags (#286)
Browse files Browse the repository at this point in the history
Updates #198 

This PR adds a new method to the OpAMPClient that allows clients to set their own flags (such as `RequestInstanceUid`).
  • Loading branch information
tpaschalis authored Jun 26, 2024
1 parent 8f7a652 commit ed38d5f
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 18 deletions.
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ type OpAMPClient interface {
// for more details.
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error

// SetFlags modifies the set of flags supported by the client.
// May be called before or after Start(), including from OnMessage handler.
// The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified
// and is safe to use.
//
// See
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agenttoserverflags
// for more details.
SetFlags(flags protobufs.AgentToServerFlags)

// SendCustomMessage sends the custom message to the Server. May be called anytime after
// Start(), including from OnMessage handler.
//
Expand Down
126 changes: 112 additions & 14 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func eventually(t *testing.T, f func() bool) {
assert.Eventually(t, f, 5*time.Second, 10*time.Millisecond)
}

func newInstanceUid(t *testing.T) types.InstanceUid {
func genNewInstanceUid(t *testing.T) types.InstanceUid {
uid, err := uuid.NewV7()
require.NoError(t, err)
b, err := uid.MarshalBinary()
Expand All @@ -103,7 +103,7 @@ func newInstanceUid(t *testing.T) types.InstanceUid {

func prepareSettings(t *testing.T, settings *types.StartSettings, c OpAMPClient) {
// Autogenerate instance id.
settings.InstanceUid = newInstanceUid(t)
settings.InstanceUid = genNewInstanceUid(t)

// Make sure correct URL scheme is used, based on the type of the OpAMP client.
u, err := url.Parse(settings.OpAMPServerURL)
Expand Down Expand Up @@ -630,27 +630,24 @@ func TestAgentIdentification(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a server.
srv := internal.StartMockServer(t)
newInstanceUid := newInstanceUid(t)
newInstanceUid := genNewInstanceUid(t)
var rcvAgentInstanceUid atomic.Value
var sentInvalidId atomic.Bool
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
rcvAgentInstanceUid.Store(msg.InstanceUid)
if sentInvalidId.Load() {
if msg.Flags&uint64(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) == 1 {
newInstanceUid = genNewInstanceUid(t)
rcvAgentInstanceUid.Store(newInstanceUid[:])
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
// If we sent the invalid one first, send a valid one now
// If the RequestInstanceUid flag was set, populate this field.
NewInstanceUid: newInstanceUid[:],
},
}
}
sentInvalidId.Store(true)
rcvAgentInstanceUid.Store(msg.InstanceUid)
// Start by sending just the old instance ID.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
// Start by sending an invalid id forcing an error.
NewInstanceUid: nil,
},
}
}

Expand Down Expand Up @@ -689,8 +686,8 @@ func TestAgentIdentification(t *testing.T) {
},
)

// Send a dummy message again to get the _new_ id
_ = client.SetAgentDescription(createAgentDescr())
// Set the flags to request a new ID.
client.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)

// When it was sent, the new instance uid should have been used, which should
// have been observed by the Server
Expand Down Expand Up @@ -2122,3 +2119,104 @@ func TestSetCustomCapabilities(t *testing.T) {
assert.NoError(t, err)
})
}

// TestSetFlags tests the ability for the client to change the set of flags it sends.
func TestSetFlags(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
var rcvCustomFlags atomic.Value
var flags protobufs.AgentToServerFlags

srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.Flags != 0 {
rcvCustomFlags.Store(msg.Flags)
}
return nil
}

settings := types.StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// The zero value of AgentToServerFlags is ready to use
client.SetFlags(flags)

// Update flags to send
flags |= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
client.SetFlags(flags)

// Verify new flags were delivered to the server
eventually(
t,
func() bool {
msg, ok := rcvCustomFlags.Load().(uint64)
if !ok || msg == 0 {
return false
}
return uint64(flags) == msg
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

// TestSetFlags tests the ability for the client to set its flags before starting up.
func TestSetFlagsBeforeStart(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a Server.
flags := protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
srv := internal.StartMockServer(t)
var rcvCustomFlags atomic.Value
var isFirstMessage atomic.Bool
isFirstMessage.Store(true)

// Make sure we only record flags from the very first message.
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if isFirstMessage.Load() {
rcvCustomFlags.Store(msg.Flags)
}
isFirstMessage.Store(false)
return nil
}

settings := types.StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

// Set up the flags _before_ calling Start to verify that they're
// handled correctly in PrepareFirstMessage.
client.SetFlags(flags)

// Start the client.
assert.NoError(t, client.Start(context.Background(), settings))

// Verify the flags were delivered to the server during the first message.
eventually(
t,
func() bool {
msg, ok := rcvCustomFlags.Load().(uint64)
if !ok || msg == 0 {
return false
}
return uint64(flags) == msg
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
7 changes: 6 additions & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
return c.common.SetPackageStatuses(statuses)
}

// SendCustomMessage implements OpAMPClient.SetCustomCapabilities.
// SendCustomCapabilities implements OpAMPClient.SetCustomCapabilities.
func (c *httpClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
return c.common.SetCustomCapabilities(customCapabilities)
}

// SetFlags implements OpAMPClient.SetFlags.
func (c *httpClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

// SendCustomMessage implements OpAMPClient.SendCustomMessage.
func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
Expand Down
14 changes: 14 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
},
)
return nil
Expand Down Expand Up @@ -385,6 +386,19 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo
return nil
}

func (c *ClientCommon) SetFlags(flags protobufs.AgentToServerFlags) {
// store the flags to send
c.ClientSyncedState.SetFlags(flags)

// send the new flags to the Server
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.Flags = uint64(flags)
},
)
c.sender.ScheduleSend()
}

// SendCustomMessage sends the specified custom message to the server.
func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
if message == nil {
Expand Down
19 changes: 17 additions & 2 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ var (
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. 5 messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities.
// have access to synchronize to the Server. Six messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
//
// See OpAMP spec for more details on how status reporting works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Expand All @@ -39,6 +39,7 @@ type ClientSyncedState struct {
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
flags protobufs.AgentToServerFlags
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand Down Expand Up @@ -71,6 +72,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities {
return s.customCapabilities
}

func (s *ClientSyncedState) Flags() uint64 {
defer s.mutex.Unlock()
s.mutex.Lock()
return uint64(s.flags)
}

// SetAgentDescription sets the AgentDescription in the state.
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error {
if descr == nil {
Expand Down Expand Up @@ -168,3 +175,11 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool {

return false
}

// SetFlags sets the flags in the state.
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
defer s.mutex.Unlock()
s.mutex.Lock()

s.flags = flags
}
36 changes: 36 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,39 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) {
cancel()
srv.Close()
}

func TestRequestInstanceUidFlagReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

sender := NewHTTPSender(&sharedinternal.NopLogger{})
sender.callbacks = types.CallbacksStruct{}

// Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use.
clientSyncedState := &ClientSyncedState{}
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)

// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: nil,
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte("foo")},
})

// Then the RequestInstanceUid flag stays intact.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)

// If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}},
})

// Then the flag is reset so we don't request a new instance uid yet again.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}
6 changes: 5 additions & 1 deletion client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func (r *receivedProcessor) rcvFlags(
msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()
msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities()
msg.Flags = r.clientSyncedState.Flags()

// The logic for EffectiveConfig is similar to the previous 5 sub-messages however
// The logic for EffectiveConfig is similar to the previous 6 sub-messages however
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
// from clientSyncedState. We do this to avoid keeping EffectiveConfig in-memory.
msg.EffectiveConfig = cfg
Expand Down Expand Up @@ -237,6 +238,9 @@ func (r *receivedProcessor) rcvAgentIdentification(ctx context.Context, agentId
return err
}

// If we set up a new instance ID, reset the RequestInstanceUid flag.
r.clientSyncedState.flags &^= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (c *wsClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCap
return c.common.SetCustomCapabilities(customCapabilities)
}

func (c *wsClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
}
Expand Down

0 comments on commit ed38d5f

Please sign in to comment.