Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mock NTP client #217

Merged
merged 10 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 61 additions & 36 deletions pkg/ntpmonitor/ntpmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@ var (
ErrDeltaTooSmall = errors.New("delta is too small")
)

type NTPClient interface {
QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error)
}

type LiveNTPClient struct{}

func (c LiveNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
return ntp.QueryWithOptions(srv, opts)
}

// NTPMonitor compares the local time with a set of trusted NTP servers.
type NTPMonitor struct {
cfg *Config
run atomic.Bool
cfg *Config
run atomic.Bool
ntpClient NTPClient
}

// New creates a NTPMonitor, reading the configuration from the provided
Expand Down Expand Up @@ -81,6 +92,50 @@ func NewFromConfig(cfg *Config) (*NTPMonitor, error) {
return &NTPMonitor{cfg: cfg}, nil
}

type serverResponses struct {
tooFewServerResponses bool
tooManyInvalidResponses bool
}

func (n *NTPMonitor) queryServers(delta time.Duration) serverResponses {
// Get a random set of servers
servers := RandomChoice(n.cfg.Servers, n.cfg.NumServers)
validResponses := 0
noResponse := 0
for _, srv := range servers {
// Create a time interval from 'now' with the max
// time delta added/removed
// Make sure the time from the remote NTP server lies
// within this interval.
resp, err := n.QueryNTPServer(srv)
if err != nil {
log.Logger.Errorf("ntp response timeout from %s",
srv)
noResponse++
continue
}

// ClockOffset is the estimated difference from
// local time to NTP server's time.
// The estimate assumes latency is similar for both
// sending and receiving data.
// The estimated offset does not depend on the value
// of the latency.
if resp.ClockOffset.Abs() > delta {
log.Logger.Warnf("local time is different from %s: %s",
srv, resp.Time)
} else {
validResponses++
}
}

// Did enough NTP servers respond?
return serverResponses{
tooFewServerResponses: n.cfg.ServerThreshold > n.cfg.NumServers-noResponse,
tooManyInvalidResponses: n.cfg.ServerThreshold > validResponses,
}
}

// Start the periodic monitor. Once started, it runs until Stop() is called,
func (n *NTPMonitor) Start() {
n.run.Store(true)
Expand All @@ -93,45 +148,15 @@ func (n *NTPMonitor) Start() {
delta := time.Duration(n.cfg.MaxTimeDelta) * time.Second
log.Logger.Info("ntp monitoring starting")
for n.run.Load() {
// Get a random set of servers
servers := RandomChoice(n.cfg.Servers, n.cfg.NumServers)
validResponses := 0
noResponse := 0
for _, srv := range servers {
// Create a time interval from 'now' with the max
// time delta added/remobed
// Make sure the time from the remote NTP server lies
// within this interval.
resp, err := n.QueryNTPServer(srv)

if err != nil {
log.Logger.Errorf("ntp response timeout from %s",
srv)
noResponse++
continue
}

// ClockOffset is the estimated difference from
// local time to NTP server's time.
// The estimate assumes latency is similar for both
// sending and receiving data.
// The estimated offset does not depend on the value
// of the latency.
if resp.ClockOffset.Abs() > delta {
log.Logger.Warnf("local time is different from %s: %s",
srv, resp.Time)
} else {
validResponses++
}
}
responses := n.queryServers(delta)

// Did enough NTP servers respond?
if n.cfg.ServerThreshold > n.cfg.NumServers-noResponse {
if responses.tooFewServerResponses {
pkgapi.MetricNTPErrorCount.With(map[string]string{
"reason": "err_too_few",
}).Inc()
}
if n.cfg.ServerThreshold > validResponses {
if responses.tooManyInvalidResponses {
pkgapi.MetricNTPErrorCount.With(map[string]string{
"reason": "err_inv_time",
}).Inc()
Expand Down Expand Up @@ -160,7 +185,7 @@ func (n *NTPMonitor) QueryNTPServer(srv string) (*ntp.Response, error) {
opts := ntp.QueryOptions{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should QueryNTPServer be private, because we don't call it elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks like it isn't called anywhere else.

Timeout: time.Duration(n.cfg.RequestTimeout) * time.Second,
}
resp, err := ntp.QueryWithOptions(srv, opts)
resp, err := n.ntpClient.QueryWithOptions(srv, opts)
pkgapi.MetricNTPLatency.With(map[string]string{
"host": srv,
}).Observe(float64(time.Since(start)))
Expand Down
178 changes: 177 additions & 1 deletion pkg/ntpmonitor/ntpmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,39 @@
package ntpmonitor

import (
"errors"
"testing"
"time"

"github.com/beevik/ntp"
)

type MockNTPClient struct {
ignoredServers map[string]string
Copy link
Contributor

@haydentherapper haydentherapper Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is ignoredServers? Maybe a comment?

Edit: Ah, used for which servers to fail on. Yea, a comment would be helpful!

}

func (c MockNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
if _, ok := c.ignoredServers[srv]; ok {
return nil, errors.New("failed to query NTP server")
}

return &ntp.Response{
ClockOffset: 1,
Time: time.Now(),
}, nil
}

type driftedTimeNTPClient struct {
driftedOffset time.Duration
}

func (c driftedTimeNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
return &ntp.Response{
ClockOffset: c.driftedOffset,
Time: time.Now(),
}, nil
}

func TestNewFromConfig(t *testing.T) {
var cfg Config
var nm *NTPMonitor
Expand All @@ -33,7 +63,7 @@ func TestNewFromConfig(t *testing.T) {
t.Errorf("expected error %s got %s", ErrTooFewServers, err)
}

// Number of servers are smaller than requsted
// Number of servers are smaller than requested
cfg.Servers = append(cfg.Servers, "foo.bar")
cfg.NumServers = 2
nm, err = NewFromConfig(&cfg)
Expand Down Expand Up @@ -99,3 +129,149 @@ func TestNewFromConfig(t *testing.T) {
t.Errorf("unexpected error %s", err)
}
}

func TestNTPMonitorQueryNTPServer(t *testing.T) {
mockNTP := MockNTPClient{}
failNTP := MockNTPClient{
ignoredServers: map[string]string{
"s1": "",
},
}

testCases := []struct {
name string
client MockNTPClient
requestAttempts int
expectTestToPass bool
}{
{
name: "Successfully query NTP server",
client: mockNTP,
requestAttempts: 3,
expectTestToPass: true,
},
{
name: "Fail to query NTP server",
client: failNTP,
requestAttempts: 1,
expectTestToPass: false,
},
}
for _, tc := range testCases {
monitor, err := NewFromConfig(&Config{
Servers: []string{"s1"},
NumServers: 1,
RequestAttempts: tc.requestAttempts,
ServerThreshold: 1,
RequestTimeout: 1,
MaxTimeDelta: 1,
})
if err != nil {
t.Fatalf("unexpectedly failed to create NTP monitor: %v", err)
}
monitor.ntpClient = tc.client

resp, err := monitor.QueryNTPServer("s1")
if tc.expectTestToPass && err != nil {
t.Errorf("test '%s' unexpectedly failed with non-nil error: %v", tc.name, err)
}
if tc.expectTestToPass && resp == nil {
t.Errorf("test '%s' unexpectedly failed with nil ntp.Response", tc.name)
}
if !tc.expectTestToPass && err == nil {
t.Errorf("test '%s' unexpectedly passed with a nil error", tc.name)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test metrics output? (I'm not sure!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this.

}
}

func TestNTPMonitorQueryServers(t *testing.T) {
mockNTP := MockNTPClient{}
failNTP := MockNTPClient{
ignoredServers: map[string]string{
"s1": "",
"s2": "",
"s3": "",
},
}
partialFailNTP := MockNTPClient{
ignoredServers: map[string]string{
"s2": "",
"s3": "",
},
}

offsetDuration, err := time.ParseDuration("2s")
if err != nil {
t.Fatalf("unexpected failed to parse duration: %v", err)
}

driftedNTP := driftedTimeNTPClient{
driftedOffset: offsetDuration,
}

testCases := []struct {
name string
client NTPClient
serverThreshold int
maxTimeDelta int
expectEnoughServerResponse bool
expectValidServerResponse bool
}{
{
name: "Successfully query all NTP servers",
client: mockNTP,
serverThreshold: 3,
maxTimeDelta: 3,
expectEnoughServerResponse: true,
expectValidServerResponse: true,
},
{
name: "Receive too few server responses",
client: partialFailNTP,
serverThreshold: 2,
maxTimeDelta: 5,
expectEnoughServerResponse: false,
expectValidServerResponse: true,
},
{
name: "Receive too many drifted time responses",
client: driftedNTP,
serverThreshold: 2,
maxTimeDelta: 4,
expectEnoughServerResponse: true,
expectValidServerResponse: false,
},
{
name: "Fail to receive any responses",
client: failNTP,
serverThreshold: 1,
maxTimeDelta: 4,
expectEnoughServerResponse: false,
expectValidServerResponse: false,
},
}
for _, tc := range testCases {
monitor, err := NewFromConfig(&Config{
Servers: []string{"s1", "s2", "s3", "s4", "s5", "s6"},
NumServers: 3,
Period: 1,
RequestAttempts: 1,
ServerThreshold: tc.serverThreshold,
RequestTimeout: 1,
MaxTimeDelta: tc.maxTimeDelta,
})
if err != nil {
t.Fatalf("unexpectedly failed to create NTP monitor: %v", err)
}
monitor.ntpClient = tc.client

delta := time.Duration(5) * time.Second
responses := monitor.queryServers(delta)
if tc.expectEnoughServerResponse && responses.tooFewServerResponses {
t.Errorf("test '%s' unexpectedly failed with too few server responses", tc.name)
}
if tc.expectValidServerResponse && responses.tooManyInvalidResponses {
t.Errorf("test '%s' unexpectedly failed with too many invalid responses", tc.name)
}
}
}
2 changes: 1 addition & 1 deletion pkg/ntpmonitor/randomchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"math/rand"
)

// RandomChoice returns a random selection of n items from the slic s.
// RandomChoice returns a random selection of n items from the slice s.
// The choice is made using a PSEUDO RANDOM selection.
// If n is greater than len(s), an empty slice is returned.
func RandomChoice[T any](s []T, n int) []T {
Expand Down