Skip to content

Commit

Permalink
add ErrCustomMessagePending error and channel to wait for custom mess…
Browse files Browse the repository at this point in the history
…age to be available to set
  • Loading branch information
andykellr committed Feb 6, 2024
1 parent acc30ba commit 9367a9e
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 24 deletions.
13 changes: 9 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ type OpAMPClient interface {
// SetCustomMessage sets the custom message that will be sent to the Server. May be
// called anytime after Start(), including from OnMessage handler.
//
// If the CustomMessage is nil or it specifies a capability that is not listed in the
// CustomCapabilities provided in the StartSettings for the client, it will return an
// error.
SetCustomMessage(message *protobufs.CustomMessage) error
// If the CustomMessage is nil, ErrCustomMessageMissing will be returned. If the message
// specifies a capability that is not listed in the CustomCapabilities provided in the
// StartSettings for the client, ErrCustomCapabilityNotSupported will be returned.
//
// Only one message can be set at a time. If a message has already been set, it will
// return ErrCustomMessagePending. To ensure that it is safe to set another
// CustomMessage, the caller should wait for the channel to be closed before attempting
// to set another custom message.
SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
}
90 changes: 83 additions & 7 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,7 +1604,7 @@ func TestReportCustomCapabilities(t *testing.T) {

// Client --->
// Send a custom message to the server
_ = client.SetCustomMessage(clientEchoRequest)
_, _ = client.SetCustomMessage(clientEchoRequest)

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
Expand Down Expand Up @@ -1671,14 +1671,14 @@ func TestSetCustomMessage(t *testing.T) {
{
name: "nil message is error",
message: nil,
expectedError: internal.ErrCustomMessageMissing,
expectedError: types.ErrCustomMessageMissing,
},
{
name: "unsupported message is error",
message: &protobufs.CustomMessage{
Capability: "io.opentelemetry.not-supported",
},
expectedError: internal.ErrCustomCapabilityNotSupported,
expectedError: types.ErrCustomCapabilityNotSupported,
},
{
name: "supported capability is ok",
Expand All @@ -1691,7 +1691,8 @@ func TestSetCustomMessage(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.ErrorIs(t, client.SetCustomMessage(test.message), test.expectedError)
_, err := client.SetCustomMessage(test.message)
assert.ErrorIs(t, err, test.expectedError)
})
}
})
Expand Down Expand Up @@ -1726,7 +1727,8 @@ func TestCustomMessages(t *testing.T) {
Type: "hello",
Data: []byte("test message 1"),
}
assert.NoError(t, client.SetCustomMessage(customMessage1))
_, err := client.SetCustomMessage(customMessage1)
assert.NoError(t, err)

// Verify message 1 delivered
eventually(
Expand All @@ -1746,7 +1748,8 @@ func TestCustomMessages(t *testing.T) {
Type: "hello",
Data: []byte("test message 2"),
}
assert.NoError(t, client.SetCustomMessage(customMessage2))
_, err = client.SetCustomMessage(customMessage2)
assert.NoError(t, err)

// Verify message 2 delivered
eventually(
Expand All @@ -1764,7 +1767,80 @@ func TestCustomMessages(t *testing.T) {
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
err = client.Stop(context.Background())
assert.NoError(t, err)
})
}

func TestSetCustomMessageConflict(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCustomMessage atomic.Value
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.CustomMessage != nil {
rcvCustomMessage.Store(msg.CustomMessage)
}
return nil
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
CustomCapabilities: []string{"local.test.example"},
}
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

customMessage1 := &protobufs.CustomMessage{
Capability: "local.test.example",
Type: "hello",
Data: []byte("test message 1"),
}
customMessage2 := &protobufs.CustomMessage{
Capability: "local.test.example",
Type: "hello",
Data: []byte("test message 2"),
}

_, err := client.SetCustomMessage(customMessage1)
assert.NoError(t, err)

// Sending another message immediately should fail with ErrCustomMessagePending.
sendingChan, err := client.SetCustomMessage(customMessage2)
assert.ErrorIs(t, err, types.ErrCustomMessagePending)
assert.NotNil(t, sendingChan)

// Receive the first custom message
eventually(
t,
func() bool {
msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage)
if !ok || msg == nil {
return false
}
return proto.Equal(customMessage1, msg)
},
)

// Wait for the sending channel to be closed.
<-sendingChan

// Now sending the second message should work.
_, err = client.SetCustomMessage(customMessage2)
assert.NoError(t, err)

// Receive the second custom message
eventually(
t,
func() bool {
msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage)
if !ok || msg == nil {
return false
}
return proto.Equal(customMessage2, msg)
},
)
})
}
2 changes: 1 addition & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
}

// SetCustomMessage implements OpAMPClient.SetCustomMessage.
func (c *httpClient) SetCustomMessage(message *protobufs.CustomMessage) error {
func (c *httpClient) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SetCustomMessage(message)
}

Expand Down
25 changes: 17 additions & 8 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ var (
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrCustomMessageMissing = errors.New("CustomMessage is nil")
ErrCustomCapabilityNotSupported = errors.New("CustomCapability of CustomMessage is not supported")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -394,19 +392,30 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo
}

// SetCustomMessage sends the specified custom message to the server.
func (c *ClientCommon) SetCustomMessage(message *protobufs.CustomMessage) error {
func (c *ClientCommon) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
if message == nil {
return ErrCustomMessageMissing
return nil, types.ErrCustomMessageMissing
}
if !c.ClientSyncedState.HasCustomCapability(message.Capability) {
return ErrCustomCapabilityNotSupported
return nil, types.ErrCustomCapabilityNotSupported
}

c.sender.NextMessage().Update(
hasCustomMessage := false
sendingChan := c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.CustomMessage = message
if msg.CustomMessage != nil {
hasCustomMessage = true
} else {
msg.CustomMessage = message
}
},
)

if hasCustomMessage {
return sendingChan, types.ErrCustomMessagePending
}

c.sender.ScheduleSend()
return nil

return sendingChan, nil
}
17 changes: 14 additions & 3 deletions client/internal/nextmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,31 @@ import (
type NextMessage struct {
// The next message to send.
nextMessage *protobufs.AgentToServer
// nextMessageSending is a channel that is closed when the message is sent.
nextMessageSending chan struct{}
// Indicates that nextMessage is pending to be sent.
messagePending bool
// Mutex to protect the above 2 fields.
// Mutex to protect the above 3 fields.
messageMutex sync.Mutex
}

// NewNextMessage returns a new empty NextMessage.
func NewNextMessage() NextMessage {
return NextMessage{
nextMessage: &protobufs.AgentToServer{},
nextMessage: &protobufs.AgentToServer{},
nextMessageSending: make(chan struct{}),
}
}

// Update applies the specified modifier function to the next message that
// will be sent and marks the message as pending to be sent.
func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) {
func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) (messageSendingChannel chan struct{}) {
s.messageMutex.Lock()
modifier(s.nextMessage)
s.messagePending = true
sending := s.nextMessageSending
s.messageMutex.Unlock()
return sending
}

// PopPending returns the next message to be sent, if it is pending or nil otherwise.
Expand All @@ -54,7 +59,13 @@ func (s *NextMessage) PopPending() *protobufs.AgentToServer {
Capabilities: s.nextMessage.Capabilities,
}

sending := s.nextMessageSending

s.nextMessage = msg
s.nextMessageSending = make(chan struct{})

// Notify that the message is being sent and a new nextMessage has been created.
close(sending)
}
s.messageMutex.Unlock()
return msgToSend
Expand Down
16 changes: 16 additions & 0 deletions client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package types

import "errors"

var (
// ErrCustomMessageMissing is returned by SetCustomMessage when called with a nil message.
ErrCustomMessageMissing = errors.New("CustomMessage is nil")

// ErrCustomCapabilityNotSupported is returned by SetCustomMessage when called with
// message that has a capability that is not specified as supported by the client.
ErrCustomCapabilityNotSupported = errors.New("CustomCapability of CustomMessage is not supported")

// ErrCustomMessagePending is returned by SetCustomMessage when called before the previous
// message has been sent.
ErrCustomMessagePending = errors.New("custom message already set")
)
2 changes: 1 addition & 1 deletion client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error
return c.common.SetPackageStatuses(statuses)
}

func (c *wsClient) SetCustomMessage(message *protobufs.CustomMessage) error {
func (c *wsClient) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SetCustomMessage(message)
}

Expand Down

0 comments on commit 9367a9e

Please sign in to comment.