Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support client heartbeat #296

Merged
merged 9 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
h.receiveProcessor.ProcessReceivedMessage(ctx, &response)
}

func (h *HTTPSender) SetHeartbeatInterval(duration time.Duration) error {
if duration <= 0 {
return errors.New("heartbeat interval for httpclient must be greater than zero")
}

if duration != 0 {
h.SetPollingInterval(duration)
}

return nil
}

// SetPollingInterval sets the interval between polling. Has effect starting from the
// next polling cycle.
func (h *HTTPSender) SetPollingInterval(duration time.Duration) {
Expand Down
20 changes: 20 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
srv.Close()
}

func TestHTTPSenderSetHeartbeatInterval(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// zero is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// zero should be valid for http sender
expected := 10 * time.Second
assert.NoError(t, sender.SetHeartbeatInterval(expected))
assert.Equal(t, expected.Milliseconds(), sender.pollingIntervalMs)
}

func TestAddTLSConfig(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})

Expand Down
8 changes: 8 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"fmt"
"sync"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -216,6 +217,13 @@
return
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat) {
interval := time.Duration(settings.Opamp.HeartbeatIntervalSeconds) * time.Second
if err := r.sender.SetHeartbeatInterval(interval); err != nil {
r.logger.Errorf(ctx, "Failed to set heartbeat interval: %v", err)

Check warning on line 223 in client/internal/receivedprocessor.go

View check run for this annotation

Codecov / codecov/patch

client/internal/receivedprocessor.go#L223

Added line #L223 was not covered by tests
}
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings) {
err := r.callbacks.OnOpampConnectionSettings(ctx, settings.Opamp)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"errors"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -22,6 +23,9 @@ type Sender interface {

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
SetInstanceUid(instanceUid types.InstanceUid) error

// SetHeartbeatInterval sets the interval for the agent heartbeats.
SetHeartbeatInterval(duration time.Duration) error
}

// SenderCommon is partial Sender implementation that is common between WebSocket and plain
Expand Down
62 changes: 58 additions & 4 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package internal

import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand All @@ -13,7 +15,8 @@ import (
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalSeconds = 30
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
Expand All @@ -25,15 +28,24 @@ type WSSender struct {
// Indicates that the sender has fully stopped.
stopped chan struct{}
err error

heartbeatIntervalUpdated chan struct{}
heartbeatIntervalSeconds atomic.Int64
heartbeatTimer *time.Timer
}

// NewSender creates a new Sender that uses WebSocket to send
// messages to the server.
func NewSender(logger types.Logger) *WSSender {
return &WSSender{
logger: logger,
SenderCommon: NewSenderCommon(),
s := &WSSender{
logger: logger,
heartbeatIntervalUpdated: make(chan struct{}, 1),
heartbeatTimer: time.NewTimer(0),
SenderCommon: NewSenderCommon(),
}
s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds)

return s
}

// Start the sender and send the first message that was set via NextMessage().Update()
Expand Down Expand Up @@ -62,10 +74,51 @@ func (s *WSSender) StoppingErr() error {
return s.err
}

// SetHeartbeatInterval sets the heartbeat interval and triggers timer reset.
func (s *WSSender) SetHeartbeatInterval(d time.Duration) error {
if d < 0 {
return errors.New("heartbeat interval for wsclient must be non-negative")
}

s.heartbeatIntervalSeconds.Store(int64(d.Seconds()))
select {
case s.heartbeatIntervalUpdated <- struct{}{}:
default:
}
return nil
}

func (s *WSSender) shouldSendHeartbeat() <-chan time.Time {
t := s.heartbeatTimer

// Before Go 1.23, the only safe way to use Reset was to [Stop] and
// explicitly drain the timer first.
// ref: https://pkg.go.dev/time#Timer.Reset
if !t.Stop() {
select {
case <-t.C:
default:
}
}
haoqixu marked this conversation as resolved.
Show resolved Hide resolved

if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 {
t.Reset(d)
return t.C
}

// Heartbeat interval is set to Zero, disable heartbeat.
return nil
}

func (s *WSSender) run(ctx context.Context) {
out:
for {
select {
case <-s.shouldSendHeartbeat():
s.NextMessage().Update(func(msg *protobufs.AgentToServer) {})
haoqixu marked this conversation as resolved.
Show resolved Hide resolved
s.ScheduleSend()
case <-s.heartbeatIntervalUpdated:
// trigger heartbeat timer reset
case <-s.hasPendingMessage:
s.sendNextMessage(ctx)

Expand All @@ -77,6 +130,7 @@ out:
}
}

s.heartbeatTimer.Stop()
close(s.stopped)
}

Expand Down
28 changes: 28 additions & 0 deletions client/internal/wssender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package internal

import (
"testing"
"time"

sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/stretchr/testify/assert"
)

func TestWSSenderSetHeartbeatInterval(t *testing.T) {
sender := NewSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())

// zero is valid for ws sender
assert.NoError(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, int64(0), sender.heartbeatIntervalSeconds.Load())

var expected int64 = 10
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Second))
assert.Equal(t, expected, sender.heartbeatIntervalSeconds.Load())
}
71 changes: 71 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,77 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

func TestWSSenderReportsHeartbeat(t *testing.T) {
tests := []struct {
name string
clientEnableHeartbeat bool
serverEnableHeartbeat bool
expectHeartbeats bool
}{
{"enable heartbeat", true, true, true},
{"client disable heartbeat", false, true, false},
{"server disable heartbeat", true, false, false},
}

for _, tt := range tests {
srv := internal.StartMockServer(t)

var firstMsg atomic.Bool
var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
firstMsg.Store(true)
}
var msgCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if firstMsg.Load() {
firstMsg.Store(false)
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
HeartbeatIntervalSeconds: 1,
},
},
}
if !tt.serverEnableHeartbeat {
resp.ConnectionSettings.Opamp.HeartbeatIntervalSeconds = 0
}
return resp
}
msgCount.Add(1)
return nil
}

// Start an OpAMP/WebSocket client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
}
if tt.clientEnableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewWebSocket(nil)
startClient(t, settings, client)

// Wait for connection to be established.
eventually(t, func() bool { return conn.Load() != nil })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool {
return msgCount.Load() >= 2
}, 3*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool {
return msgCount.Load() >= 2
}, 3*time.Second, 10*time.Millisecond)
}

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
}
}

func TestDisconnectWSByServer(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
Expand Down
6 changes: 3 additions & 3 deletions internal/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: publicKeyPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: publicKeyPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaCert: caCertBytes,

Check warning on line 158 in internal/certs.go

View check run for this annotation

Codecov / codecov/patch

internal/certs.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

return certificate, nil
Expand Down
8 changes: 4 additions & 4 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,15 +536,15 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (
// Client-initiated CSR flow. This is currently initiated when connecting
// to the Server for the first time (see requestClientCertificate()).
cert, err = tls.X509KeyPair(
certificate.PublicKey, // We received the certificate from the Server.
certificate.Cert, // We received the certificate from the Server.
agent.clientPrivateKeyPEM, // Private key was earlier locally generated.
)
} else {
// Server-initiated flow. This is currently initiated by user clicking a button in
// the Server UI.
// Both certificate and private key are from the Server.
cert, err = tls.X509KeyPair(
certificate.PublicKey,
certificate.Cert,
certificate.PrivateKey,
)
}
Expand All @@ -554,8 +554,8 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (
return nil, err
}

if len(certificate.CaPublicKey) != 0 {
caCertPB, _ := pem.Decode(certificate.CaPublicKey)
if len(certificate.CaCert) != 0 {
caCertPB, _ := pem.Decode(certificate.CaCert)
caCert, err := x509.ParseCertificate(caCertPB.Bytes)
if err != nil {
agent.logger.Errorf(context.Background(), "Cannot parse CA cert: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions internal/examples/server/certman/certman.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func CreateClientTLSCertFromCSR(csr *x509.CertificateRequest) (*protobufs.TLSCer

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: certPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: certPEM.Bytes(),
CaCert: caCertBytes,
}

return certificate, nil
Expand Down Expand Up @@ -144,9 +144,9 @@ func CreateClientTLSCert() (*protobufs.TLSCertificate, error) {

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: certPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: certPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaCert: caCertBytes,
}

return certificate, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/opamp-spec
Loading
Loading