Skip to content

Commit

Permalink
#62 Client now restarts after sync failures
Browse files Browse the repository at this point in the history
  • Loading branch information
glothriel committed Jun 5, 2024
1 parent 8750570 commit f797a49
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/hello/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type AppStateChangeGenerator struct {
lock sync.Mutex
}

// OnSync is called when a sync message is received
func (s *AppStateChangeGenerator) OnSync(peer string, apps []peers.App) {
// SetState is called when a sync message is received
func (s *AppStateChangeGenerator) SetState(peer string, apps []peers.App) {
logrus.Debugf("Received sync from %s with %d apps", peer, len(apps))
s.lock.Lock()
defer s.lock.Unlock()
Expand Down
6 changes: 4 additions & 2 deletions pkg/hello/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (t *httpClientSyncingTransport) Sync(req []byte) ([]byte, error) {
}

// NewHTTPClientSyncingTransport creates a new SyncClientTransport instance
func NewHTTPClientSyncingTransport(serverURL string) SyncClientTransport {
func NewHTTPClientSyncingTransport(serverURL string, timeout time.Duration) SyncClientTransport {
return &httpClientSyncingTransport{
serverURL: serverURL,
client: &http.Client{},
client: &http.Client{
Timeout: timeout,
},
}
}
44 changes: 27 additions & 17 deletions pkg/hello/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hello

import (
"errors"
"fmt"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (s *SyncingServer) Start() {
incomingSync.Err <- peerErr
continue
}
s.stateGenerator.OnSync(
s.stateGenerator.SetState(
peer.Name,
msg.Apps,
)
Expand Down Expand Up @@ -98,16 +99,18 @@ func NewSyncingServer(
// SyncingClient is a struct that orchestrates all the operations that are performed client-side
// when executing app list synchronizations
type SyncingClient struct {
myName string
nginxAdapter *AppStateChangeGenerator
encoder SyncingEncoder
interval time.Duration
apps AppSource
transport SyncClientTransport
myName string
stateChangeGenerator *AppStateChangeGenerator
encoder SyncingEncoder
interval time.Duration
apps AppSource
transport SyncClientTransport
failureThreshold int
}

// Start starts the syncing client
func (c *SyncingClient) Start() error {
failures := 0
for {
time.Sleep(c.interval)
apps, listErr := c.apps.List()
Expand All @@ -125,15 +128,21 @@ func (c *SyncingClient) Start() error {
}
incomingApps, err := c.transport.Sync(encodedApps)
if err != nil {
logrus.Errorf("failed to sync apps: %v", err)
continue
if failures >= c.failureThreshold {
return fmt.Errorf("Fatal: failed to sync %d times in a row: %v", failures, err)
} else {

Check warning on line 133 in pkg/hello/syncing.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
failures++
logrus.Errorf("failed to sync apps: %v", err)
continue
}
}
failures = 0
decodedMsg, decodeErr := c.encoder.Decode(incomingApps)
if decodeErr != nil {
logrus.Errorf("failed to decode incoming apps: %v", decodeErr)
continue
}
c.nginxAdapter.OnSync(
c.stateChangeGenerator.SetState(
decodedMsg.Peer,
decodedMsg.Apps,
)
Expand All @@ -150,12 +159,13 @@ func NewSyncingClient(
transport SyncClientTransport,
) *SyncingClient {
return &SyncingClient{
myName: myName,
nginxAdapter: nginxAdapter,
encoder: encoder,
interval: interval,
apps: apps,
transport: transport,
myName: myName,
stateChangeGenerator: nginxAdapter,
encoder: encoder,
interval: interval,
apps: apps,
transport: transport,
failureThreshold: 3,
}
}

Expand All @@ -173,7 +183,7 @@ func NewHTTPSyncingClient(
if !ok {
return nil, errors.New("sync_server_address not found in pairing response metadata")
}
transport := NewHTTPClientSyncingTransport(syncServerAddress)
transport := NewHTTPClientSyncingTransport(syncServerAddress, 3*time.Second)
return NewSyncingClient(
myName,
nginxAdapter,
Expand Down
40 changes: 40 additions & 0 deletions pkg/hello/syncing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package hello

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

type mockSyncClientTransport struct {
lastCalledWith []byte
returnValue []byte
returnError error
}

func (m *mockSyncClientTransport) Sync(data []byte) ([]byte, error) {
m.lastCalledWith = data
return m.returnValue, m.returnError
}

func TestClientStartFailsAfterXSyncFailures(t *testing.T) {
// given
client := NewSyncingClient(
"client",
NewAppStateChangeGenerator(),
NewJSONSyncingEncoder(),
1,
NewInMemoryAppStorage(),
&mockSyncClientTransport{
returnError: fmt.Errorf("sync failed"),
},
)

// when
startErr := client.Start()

// then
assert.Error(t, startErr)
assert.Contains(t, startErr.Error(), "failed to sync 3 times in a row")
}

0 comments on commit f797a49

Please sign in to comment.