Skip to content

Commit

Permalink
Mock NTP client (#217)
Browse files Browse the repository at this point in the history
* typo

Signed-off-by: Meredith Lancaster <malancas@github.com>

* add initial ntp client interface and structs

Signed-off-by: Meredith Lancaster <malancas@github.com>

* update mock client

Signed-off-by: Meredith Lancaster <malancas@github.com>

* fix QueryNTPServer unit tests

Signed-off-by: Meredith Lancaster <malancas@github.com>

* create queryServers method for testing

Signed-off-by: Meredith Lancaster <malancas@github.com>

* cleanup test functions

Signed-off-by: Meredith Lancaster <malancas@github.com>

* add servers arg to helper function

Signed-off-by: Meredith Lancaster <malancas@github.com>

* pr feedback

Signed-off-by: Meredith Lancaster <malancas@github.com>

* update dependencies

Signed-off-by: Meredith Lancaster <malancas@github.com>

* add some basic testing aroudn expected metric values

Signed-off-by: Meredith Lancaster <malancas@github.com>

Signed-off-by: Meredith Lancaster <malancas@github.com>
  • Loading branch information
malancas authored Jan 18, 2023
1 parent e8d9302 commit d1012d2
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down
105 changes: 68 additions & 37 deletions pkg/ntpmonitor/ntpmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,26 @@ var (
ErrDeltaTooSmall = errors.New("delta is too small")
)

type serverResponses struct {
tooFewServerResponses bool
tooManyInvalidResponses bool
}

type NTPClient interface {
QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error)
}

type LiveNTPClient struct{}

func (c LiveNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
return ntp.QueryWithOptions(srv, opts)
}

// NTPMonitor compares the local time with a set of trusted NTP servers.
type NTPMonitor struct {
cfg *Config
run atomic.Bool
cfg *Config
run atomic.Bool
ntpClient NTPClient
}

// New creates a NTPMonitor, reading the configuration from the provided
Expand All @@ -62,6 +78,12 @@ func New(configFile string) (*NTPMonitor, error) {

// NewFromConfig creates a NTPMonitor from an instantiated configuration.
func NewFromConfig(cfg *Config) (*NTPMonitor, error) {
// default to using a live NTP client
liveNTPClient := LiveNTPClient{}
return NewFromConfigWithClient(cfg, liveNTPClient)
}

func NewFromConfigWithClient(cfg *Config, client NTPClient) (*NTPMonitor, error) {
if len(cfg.Servers) == 0 || len(cfg.Servers) < cfg.NumServers {
return nil, ErrTooFewServers
}
Expand All @@ -78,7 +100,44 @@ func NewFromConfig(cfg *Config) (*NTPMonitor, error) {
return nil, ErrDeltaTooSmall
}

return &NTPMonitor{cfg: cfg}, nil
return &NTPMonitor{cfg: cfg, ntpClient: client}, nil
}

func (n *NTPMonitor) queryServers(delta time.Duration, servers []string) serverResponses {
validResponses := 0
noResponse := 0
for _, srv := range servers {
// Create a time interval from 'now' with the max
// time delta added/removed
// Make sure the time from the remote NTP server lies
// within this interval.
resp, err := n.queryNTPServer(srv)
if err != nil {
log.Logger.Errorf("ntp response timeout from %s",
srv)
noResponse++
continue
}

// ClockOffset is the estimated difference from
// local time to NTP server's time.
// The estimate assumes latency is similar for both
// sending and receiving data.
// The estimated offset does not depend on the value
// of the latency.
if resp.ClockOffset.Abs() > delta {
log.Logger.Warnf("local time is different from %s: %s",
srv, resp.Time)
} else {
validResponses++
}
}

// Did enough NTP servers respond?
return serverResponses{
tooFewServerResponses: n.cfg.ServerThreshold > n.cfg.NumServers-noResponse,
tooManyInvalidResponses: n.cfg.ServerThreshold > validResponses,
}
}

// Start the periodic monitor. Once started, it runs until Stop() is called,
Expand All @@ -95,43 +154,15 @@ func (n *NTPMonitor) Start() {
for n.run.Load() {
// Get a random set of servers
servers := RandomChoice(n.cfg.Servers, n.cfg.NumServers)
validResponses := 0
noResponse := 0
for _, srv := range servers {
// Create a time interval from 'now' with the max
// time delta added/remobed
// Make sure the time from the remote NTP server lies
// within this interval.
resp, err := n.QueryNTPServer(srv)

if err != nil {
log.Logger.Errorf("ntp response timeout from %s",
srv)
noResponse++
continue
}

// ClockOffset is the estimated difference from
// local time to NTP server's time.
// The estimate assumes latency is similar for both
// sending and receiving data.
// The estimated offset does not depend on the value
// of the latency.
if resp.ClockOffset.Abs() > delta {
log.Logger.Warnf("local time is different from %s: %s",
srv, resp.Time)
} else {
validResponses++
}
}
responses := n.queryServers(delta, servers)

// Did enough NTP servers respond?
if n.cfg.ServerThreshold > n.cfg.NumServers-noResponse {
if responses.tooFewServerResponses {
pkgapi.MetricNTPErrorCount.With(map[string]string{
"reason": "err_too_few",
}).Inc()
}
if n.cfg.ServerThreshold > validResponses {
if responses.tooManyInvalidResponses {
pkgapi.MetricNTPErrorCount.With(map[string]string{
"reason": "err_inv_time",
}).Inc()
Expand All @@ -149,9 +180,9 @@ func (n *NTPMonitor) Stop() {
n.run.Store(false)
}

// QueryNTPServer queries a provided ntp server, trying up to a configured
// queryNTPServer queries a provided ntp server, trying up to a configured
// amount of times. There is one second sleep between each attempt.
func (n *NTPMonitor) QueryNTPServer(srv string) (*ntp.Response, error) {
func (n *NTPMonitor) queryNTPServer(srv string) (*ntp.Response, error) {
var i = 1
for {
log.Logger.Debugf("querying ntp server %s", srv)
Expand All @@ -160,7 +191,7 @@ func (n *NTPMonitor) QueryNTPServer(srv string) (*ntp.Response, error) {
opts := ntp.QueryOptions{
Timeout: time.Duration(n.cfg.RequestTimeout) * time.Second,
}
resp, err := ntp.QueryWithOptions(srv, opts)
resp, err := n.ntpClient.QueryWithOptions(srv, opts)
pkgapi.MetricNTPLatency.With(map[string]string{
"host": srv,
}).Observe(float64(time.Since(start)))
Expand Down
187 changes: 186 additions & 1 deletion pkg/ntpmonitor/ntpmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,43 @@
package ntpmonitor

import (
"errors"
"testing"
"time"

"github.com/beevik/ntp"
"github.com/prometheus/client_golang/prometheus/testutil"
pkgapi "github.com/sigstore/timestamp-authority/pkg/api"
)

type MockNTPClient struct {
// add the names of servers that MockNTPClient#QueryWithOptions should
// always return an error response for
ignoredServers map[string]string
}

func (c MockNTPClient) QueryWithOptions(srv string, _ ntp.QueryOptions) (*ntp.Response, error) {
if _, ok := c.ignoredServers[srv]; ok {
return nil, errors.New("failed to query NTP server")
}

return &ntp.Response{
ClockOffset: 1,
Time: time.Now(),
}, nil
}

type driftedTimeNTPClient struct {
driftedOffset time.Duration
}

func (c driftedTimeNTPClient) QueryWithOptions(_ string, _ ntp.QueryOptions) (*ntp.Response, error) {
return &ntp.Response{
ClockOffset: c.driftedOffset,
Time: time.Now(),
}, nil
}

func TestNewFromConfig(t *testing.T) {
var cfg Config
var nm *NTPMonitor
Expand All @@ -33,7 +67,7 @@ func TestNewFromConfig(t *testing.T) {
t.Errorf("expected error %s got %s", ErrTooFewServers, err)
}

// Number of servers are smaller than requsted
// Number of servers are smaller than requested
cfg.Servers = append(cfg.Servers, "foo.bar")
cfg.NumServers = 2
nm, err = NewFromConfig(&cfg)
Expand Down Expand Up @@ -99,3 +133,154 @@ func TestNewFromConfig(t *testing.T) {
t.Errorf("unexpected error %s", err)
}
}

func TestNTPMonitorQueryNTPServer(t *testing.T) {
mockNTP := MockNTPClient{}
failNTP := MockNTPClient{
ignoredServers: map[string]string{
"s1": "",
},
}

testCases := []struct {
name string
client MockNTPClient
expectTestToPass bool
}{
{
name: "Successfully query NTP server",
client: mockNTP,
expectTestToPass: true,
},
{
name: "Fail to query NTP server",
client: failNTP,
expectTestToPass: false,
},
}

// There does not seem to be a way to reset the metric counter for testing
// purposes. To test that the metric counter value is incrementing by one
// as expected, we can set this variable to zero before the test for loop
// and increment it the call to monitor.queryNTPServer. We then check that
// the metric value has only increased by one as expected
expectedMetricValue := 0
for _, tc := range testCases {
monitor, err := NewFromConfigWithClient(&Config{
Servers: []string{"s1"},
NumServers: 1,
RequestAttempts: 1,
ServerThreshold: 1,
RequestTimeout: 1,
MaxTimeDelta: 1,
}, tc.client)
if err != nil {
t.Fatalf("unexpectedly failed to create NTP monitor: %v", err)
}

resp, err := monitor.queryNTPServer("s1")
// increment the expected metric value
expectedMetricValue++

if tc.expectTestToPass && err != nil {
t.Errorf("test '%s' unexpectedly failed with non-nil error: %v", tc.name, err)
}
if tc.expectTestToPass && resp == nil {
t.Errorf("test '%s' unexpectedly failed with nil ntp.Response", tc.name)
}
if !tc.expectTestToPass && err == nil {
t.Errorf("test '%s' unexpectedly passed with a nil error", tc.name)
}
// check that the actual metric value was incremented by one as expected
actualMetricCount := testutil.CollectAndCount(pkgapi.MetricNTPSyncCount)
if expectedMetricValue != actualMetricCount {
t.Errorf("test '%s' unexpectedly failed with wrong metric value %d, expected %d", tc.name, actualMetricCount, expectedMetricValue)
}
}
}

func TestNTPMonitorQueryServers(t *testing.T) {
mockNTP := MockNTPClient{}
failNTP := MockNTPClient{
ignoredServers: map[string]string{"s1": "", "s2": "", "s3": ""},
}
partialFailNTP := MockNTPClient{
ignoredServers: map[string]string{"s2": "", "s3": ""},
}

offsetDuration, err := time.ParseDuration("5s")
if err != nil {
t.Fatalf("unexpected failed to parse duration: %v", err)
}

driftedNTP := driftedTimeNTPClient{
driftedOffset: offsetDuration,
}

testCases := []struct {
name string
client NTPClient
serverThreshold int
maxTimeDelta int
expectEnoughServerResponse bool
expectValidServerResponse bool
}{
{
name: "Successfully query all NTP servers",
client: mockNTP,
serverThreshold: 3,
maxTimeDelta: 3,
expectEnoughServerResponse: true,
expectValidServerResponse: true,
},
{
name: "Receive too few server responses",
client: partialFailNTP,
serverThreshold: 2,
maxTimeDelta: 5,
expectEnoughServerResponse: false,
expectValidServerResponse: false,
},
{
name: "Receive too many drifted time responses",
client: driftedNTP,
serverThreshold: 2,
maxTimeDelta: 2,
expectEnoughServerResponse: true,
expectValidServerResponse: false,
},
{
name: "Fail to receive any responses",
client: failNTP,
serverThreshold: 1,
maxTimeDelta: 4,
expectEnoughServerResponse: false,
expectValidServerResponse: false,
},
}
for _, tc := range testCases {
monitor, err := NewFromConfigWithClient(&Config{
Servers: []string{"s1", "s2", "s3", "s4", "s5", "s6"},
NumServers: 3,
Period: 1,
RequestAttempts: 1,
RequestTimeout: 1,
ServerThreshold: tc.serverThreshold,
MaxTimeDelta: tc.maxTimeDelta,
}, tc.client)
if err != nil {
t.Fatalf("unexpectedly failed to create NTP monitor: %v", err)
}

delta := time.Duration(tc.maxTimeDelta) * time.Second
testedServers := []string{"s1", "s2", "s3"}

responses := monitor.queryServers(delta, testedServers)
if tc.expectEnoughServerResponse && responses.tooFewServerResponses {
t.Errorf("test '%s' unexpectedly failed with too few server responses", tc.name)
}
if tc.expectValidServerResponse && responses.tooManyInvalidResponses {
t.Errorf("test '%s' unexpectedly failed with too many invalid responses", tc.name)
}
}
}
2 changes: 1 addition & 1 deletion pkg/ntpmonitor/randomchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"math/rand"
)

// RandomChoice returns a random selection of n items from the slic s.
// RandomChoice returns a random selection of n items from the slice s.
// The choice is made using a PSEUDO RANDOM selection.
// If n is greater than len(s), an empty slice is returned.
func RandomChoice[T any](s []T, n int) []T {
Expand Down

0 comments on commit d1012d2

Please sign in to comment.