From 5a3993a8b29c5ce7647ce9209e37e8f8a0a5f5f8 Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 20 Feb 2023 11:48:09 +0100 Subject: [PATCH 01/25] Enable peer login expiration by default when adding a peer --- management/server/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/management/server/peer.go b/management/server/peer.go index 5e3f5e69bf2..8746c447e85 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -529,7 +529,7 @@ func (am *DefaultAccountManager) AddPeer(setupKey, userID string, peer *Peer) (* SSHEnabled: false, SSHKey: peer.SSHKey, LastLogin: time.Now(), - LoginExpirationEnabled: false, + LoginExpirationEnabled: true, } // add peer to 'All' group From 7176f735beefa5b27716ed44b39cb9f2b2f9db71 Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 16:10:33 +0100 Subject: [PATCH 02/25] Force peer expiration --- management/server/account.go | 99 +++++++++++++++++++++++++++++- management/server/grpcserver.go | 7 ++- management/server/peer.go | 24 +++++--- management/server/scheduler.go | 72 ++++++++++++++++++++++ management/server/updatechannel.go | 21 +++++-- 5 files changed, 205 insertions(+), 18 deletions(-) create mode 100644 management/server/scheduler.go diff --git a/management/server/account.go b/management/server/account.go index e2293e5081d..00f97014daf 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -119,7 +119,8 @@ type DefaultAccountManager struct { // singleAccountModeDomain is a domain to use in singleAccountMode setup singleAccountModeDomain string // dnsDomain is used for peer resolution. This is appended to the peer's name - dnsDomain string + dnsDomain string + peerLoginExpiry *Scheduler } // Settings represents Account settings structure that can be modified via API and Dashboard @@ -157,7 +158,8 @@ type Account struct { NameServerGroups map[string]*nbdns.NameServerGroup DNSSettings *DNSSettings // Settings is a dictionary of Account settings - Settings *Settings + Settings *Settings + loginExpiration *Scheduler } type UserInfo struct { @@ -307,6 +309,52 @@ func (a *Account) GetGroup(groupID string) *Group { return a.Groups[groupID] } +// GetExpiredPeers returns peers tha have been expired +func (a *Account) GetExpiredPeers() []*Peer { + var peers []*Peer + for _, peer := range a.GetPeersWithExpiration() { + expired, _ := peer.LoginExpired(a.Settings.PeerLoginExpiration) + if expired { + peers = append(peers, peer) + } + } + + return peers +} + +// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found. +// If there is no peer that expires this function returns false and a zero time.Duration +func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { + nextExpiry := time.Duration(1<<63 - 1) // max duration + peersWithExpiry := a.GetPeersWithExpiration() + if len(peersWithExpiry) == 0 { + return nextExpiry, false + } + for _, peer := range peersWithExpiry { + // consider only connected peers because others will require login on connecting to the management server + if peer.Status.LoginExpired { + continue + } + _, duration := peer.LoginExpired(a.Settings.PeerLoginExpiration) + if duration < nextExpiry { + nextExpiry = duration + } + } + + return nextExpiry, nextExpiry != time.Duration(1<<63-1) +} + +// GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true +func (a *Account) GetPeersWithExpiration() []*Peer { + var peers []*Peer + for _, peer := range a.Peers { + if peer.LoginExpirationEnabled { + peers = append(peers, peer) + } + } + return peers +} + // GetPeers returns a list of all Account peers func (a *Account) GetPeers() []*Peer { var peers []*Peer @@ -550,6 +598,7 @@ func BuildManager(store Store, peersUpdateManager *PeersUpdateManager, idpManage cacheLoading: map[string]chan struct{}{}, dnsDomain: dnsDomain, eventStore: eventStore, + peerLoginExpiry: NewScheduler(), } allAccounts := store.GetAllAccounts() // enable single account mode only if configured by user and number of existing accounts is not grater than 1 @@ -639,12 +688,17 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, if oldSettings.PeerLoginExpirationEnabled != newSettings.PeerLoginExpirationEnabled { event := activity.AccountPeerLoginExpirationEnabled if !newSettings.PeerLoginExpirationEnabled { + // todo cancel all the login expiration jobs per event = activity.AccountPeerLoginExpirationDisabled + am.peerLoginExpiry.Cancel([]string{accountID}) + } else { + am.checkAndSchedulePeerLoginExpiration(account) } am.storeEvent(userID, accountID, accountID, event, nil) } if oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration { + // todo reschedule expiration am.storeEvent(userID, accountID, accountID, activity.AccountPeerLoginExpirationDurationUpdated, nil) } @@ -658,6 +712,47 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, return updatedAccount, nil } +func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() (bool, time.Duration) { + return func() (bool, time.Duration) { + unlock := am.Store.AcquireAccountLock(accountID) + defer unlock() + + account, err := am.Store.GetAccount(accountID) + if err != nil { + log.Errorf("failed getting account %s expiring peers", account.Id) + return false, time.Duration(0) + } + + var peerIDs []string + for _, peer := range account.GetExpiredPeers() { + peerIDs = append(peerIDs, peer.ID) + peer.MarkLoginExpired(true) + account.UpdatePeer(peer) + err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status) + if err != nil { + log.Errorf("failed saving peer status while expiring peer %s", peer.ID) + return false, time.Duration(0) + } + } + + log.Debugf("discovered %d peers to expire for account %s", len(peerIDs), account.Id) + + if len(peerIDs) != 0 { + // this will trigger peer disconnect from the management service + am.peersUpdateManager.CloseChannels(peerIDs) + } + + nextExpiration, shouldRun := account.GetNextPeerExpiration() + return shouldRun, nextExpiration + } +} + +func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { + if expiryNextRunIn, ok := account.GetNextPeerExpiration(); ok { + go am.peerLoginExpiry.Schedule(expiryNextRunIn, account.Id, am.peerLoginExpirationJob(account.Id)) + } +} + // newAccount creates a new Account with a generated ID and generated default setup keys. // If ID is already in use (due to collision) we try one more time before returning error func (am *DefaultAccountManager) newAccount(userID, domain string) (*Account, error) { diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 7d4d60207ad..0ee9e0715b3 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -136,7 +136,8 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi if err != nil { return status.Error(codes.Internal, "internal server error") } - expired, left := peer.LoginExpired(account.Settings) + expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration) + expired = account.Settings.PeerLoginExpirationEnabled && expired if peer.UserID != "" && (expired || peer.Status.LoginExpired) { err = s.accountManager.MarkPeerLoginExpired(peerKey.String(), true) if err != nil { @@ -380,7 +381,9 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p if err != nil { return nil, status.Error(codes.Internal, "internal server error") } - expired, left := peer.LoginExpired(account.Settings) + + expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration) + expired = account.Settings.PeerLoginExpirationEnabled && expired if peer.UserID != "" && (expired || peer.Status.LoginExpired) { // it might be that peer expired but user has logged in already, check token then if loginReq.GetJwtToken() == "" { diff --git a/management/server/peer.go b/management/server/peer.go index 8746c447e85..886eb20a6c0 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -93,17 +93,24 @@ func (p *Peer) Copy() *Peer { } } +// MarkLoginExpired marks peer's status expired or not +func (p *Peer) MarkLoginExpired(expired bool) { + newStatus := p.Status.Copy() + newStatus.LastSeen = time.Now() + newStatus.LoginExpired = expired + p.Status = newStatus +} + // LoginExpired indicates whether the peer's login has expired or not. // If Peer.LastLogin plus the expiresIn duration has happened already; then login has expired. // Return true if a login has expired, false otherwise, and time left to expiration (negative when expired). // Login expiration can be disabled/enabled on a Peer level via Peer.LoginExpirationEnabled property. -// Login expiration can also be disabled/enabled globally on the Account level via Settings.PeerLoginExpirationEnabled -// and if disabled on the Account level, then Peer.LoginExpirationEnabled is ineffective. -func (p *Peer) LoginExpired(accountSettings *Settings) (bool, time.Duration) { - expiresAt := p.LastLogin.Add(accountSettings.PeerLoginExpiration) +// Login expiration can also be disabled/enabled globally on the Account level via Settings.PeerLoginExpirationEnabled. +func (p *Peer) LoginExpired(expiresIn time.Duration) (bool, time.Duration) { + expiresAt := p.LastLogin.Add(expiresIn) now := time.Now() timeLeft := expiresAt.Sub(now) - return accountSettings.PeerLoginExpirationEnabled && p.LoginExpirationEnabled && (timeLeft <= 0), timeLeft + return p.LoginExpirationEnabled && (timeLeft <= 0), timeLeft } // FQDN returns peers FQDN combined of the peer's DNS label and the system's DNS domain @@ -202,13 +209,10 @@ func (am *DefaultAccountManager) MarkPeerLoginExpired(peerPubKey string, loginEx return err } - newStatus := peer.Status.Copy() - newStatus.LastSeen = time.Now() - newStatus.LoginExpired = loginExpired - peer.Status = newStatus + peer.MarkLoginExpired(loginExpired) account.UpdatePeer(peer) - err = am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus) + err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status) if err != nil { return err } diff --git a/management/server/scheduler.go b/management/server/scheduler.go new file mode 100644 index 00000000000..556447cfd6c --- /dev/null +++ b/management/server/scheduler.go @@ -0,0 +1,72 @@ +package server + +import ( + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +// Scheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them. +type Scheduler struct { + // jobs map holds cancellation channels indexed by the job ID + jobs map[string]chan struct{} + mu *sync.Mutex +} + +func NewScheduler() *Scheduler { + return &Scheduler{ + jobs: make(map[string]chan struct{}), + mu: &sync.Mutex{}, + } +} + +func (wm *Scheduler) cancel(ID string) bool { + cancel, ok := wm.jobs[ID] + if ok { + delete(wm.jobs, ID) + cancel <- struct{}{} + log.Debugf("cancelled scheduled job %s", ID) + } + return ok +} + +// Cancel cancels the scheduled job by ID if present. +// If job wasn't found the function returns false. +func (wm *Scheduler) Cancel(IDs []string) { + wm.mu.Lock() + defer wm.mu.Unlock() + + for _, id := range IDs { + wm.cancel(id) + } +} + +// Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time. +func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + wm.mu.Lock() + defer wm.mu.Unlock() + cancel := make(chan struct{}) + if _, ok := wm.jobs[ID]; !ok { + wm.jobs[ID] = cancel + log.Debugf("scheduled peer login expiration job for account %s to run in %s", ID, in.String()) + go func() { + select { + case <-time.After(in): + log.Debugf("time to do a scheduled job %s", ID) + wm.mu.Lock() + defer wm.mu.Unlock() + reschedule, runIn := job() + delete(wm.jobs, ID) + if reschedule { + go wm.Schedule(runIn, ID, job) + } + case <-cancel: + log.Debugf("stopped scheduled job %s ", ID) + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + return + } + }() + } +} diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 4b4d6e3d198..6cc10ad246c 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -60,10 +60,7 @@ func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage { return channel } -// CloseChannel closes updates channel of a given peer -func (p *PeersUpdateManager) CloseChannel(peerID string) { - p.channelsMux.Lock() - defer p.channelsMux.Unlock() +func (p *PeersUpdateManager) closeChannel(peerID string) { if channel, ok := p.peerChannels[peerID]; ok { delete(p.peerChannels, peerID) close(channel) @@ -72,6 +69,22 @@ func (p *PeersUpdateManager) CloseChannel(peerID string) { log.Debugf("closed updates channel of a peer %s", peerID) } +// CloseChannels closes updates channel for each given peer +func (p *PeersUpdateManager) CloseChannels(peerIDs []string) { + p.channelsMux.Lock() + defer p.channelsMux.Unlock() + for _, id := range peerIDs { + p.closeChannel(id) + } +} + +// CloseChannel closes updates channel of a given peer +func (p *PeersUpdateManager) CloseChannel(peerID string) { + p.channelsMux.Lock() + defer p.channelsMux.Unlock() + p.closeChannel(peerID) +} + // GetAllConnectedPeers returns a copy of the connected peers map func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} { p.channelsMux.Lock() From 153452f1cb40359a6ed67b522d747176f2b3939d Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 17:23:00 +0100 Subject: [PATCH 03/25] Check expiration on peer update and connect --- management/server/account.go | 18 +++++++++++++----- management/server/peer.go | 13 +++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 00f97014daf..05bcdf74b6c 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -324,6 +324,7 @@ func (a *Account) GetExpiredPeers() []*Peer { // GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found. // If there is no peer that expires this function returns false and a zero time.Duration +// This function only considers peers that haven't been expired yet and connected. func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { nextExpiry := time.Duration(1<<63 - 1) // max duration peersWithExpiry := a.GetPeersWithExpiration() @@ -332,7 +333,7 @@ func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { } for _, peer := range peersWithExpiry { // consider only connected peers because others will require login on connecting to the management server - if peer.Status.LoginExpired { + if peer.Status.LoginExpired || !peer.Status.Connected { continue } _, duration := peer.LoginExpired(a.Settings.PeerLoginExpiration) @@ -688,7 +689,6 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, if oldSettings.PeerLoginExpirationEnabled != newSettings.PeerLoginExpirationEnabled { event := activity.AccountPeerLoginExpirationEnabled if !newSettings.PeerLoginExpirationEnabled { - // todo cancel all the login expiration jobs per event = activity.AccountPeerLoginExpirationDisabled am.peerLoginExpiry.Cancel([]string{accountID}) } else { @@ -698,8 +698,8 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, } if oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration { - // todo reschedule expiration am.storeEvent(userID, accountID, accountID, activity.AccountPeerLoginExpirationDurationUpdated, nil) + am.checkAndSchedulePeerLoginExpiration(account) } updatedAccount := account.UpdateSettings(newSettings) @@ -720,7 +720,8 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() account, err := am.Store.GetAccount(accountID) if err != nil { log.Errorf("failed getting account %s expiring peers", account.Id) - return false, time.Duration(0) + // todo return retry? + return false, 0 } var peerIDs []string @@ -731,7 +732,8 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status) if err != nil { log.Errorf("failed saving peer status while expiring peer %s", peer.ID) - return false, time.Duration(0) + // todo return retry? + return false, 0 } } @@ -740,6 +742,11 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() if len(peerIDs) != 0 { // this will trigger peer disconnect from the management service am.peersUpdateManager.CloseChannels(peerIDs) + err := am.updateAccountPeers(account) + if err != nil { + log.Errorf("failed updating account peers while expiring peers for account %s", accountID) + return false, 0 + } } nextExpiration, shouldRun := account.GetNextPeerExpiration() @@ -748,6 +755,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() } func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { + am.peerLoginExpiry.Cancel([]string{account.Id}) if expiryNextRunIn, ok := account.GetNextPeerExpiration(); ok { go am.peerLoginExpiry.Schedule(expiryNextRunIn, account.Id, am.peerLoginExpirationJob(account.Id)) } diff --git a/management/server/peer.go b/management/server/peer.go index 886eb20a6c0..21973e2720a 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -255,6 +255,11 @@ func (am *DefaultAccountManager) MarkPeerConnected(peerPubKey string, connected if err != nil { return err } + + if peer.AddedWithSSOLogin() && peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled { + am.checkAndSchedulePeerLoginExpiration(account) + } + return nil } @@ -311,6 +316,10 @@ func (am *DefaultAccountManager) UpdatePeer(accountID, userID string, update *Pe event = activity.PeerLoginExpirationDisabled } am.storeEvent(userID, peer.IP.String(), accountID, event, peer.EventMeta(am.GetDNSDomain())) + + if peer.AddedWithSSOLogin() && peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled { + am.checkAndSchedulePeerLoginExpiration(account) + } } account.UpdatePeer(peer) @@ -779,6 +788,10 @@ func (a *Account) getPeersByACL(peerID string) []*Peer { ) continue } + expired, _ := peer.LoginExpired(a.Settings.PeerLoginExpiration) + if expired { + continue + } // exclude original peer if _, ok := peersSet[peer.ID]; peer.ID != peerID && !ok { peersSet[peer.ID] = struct{}{} From 36dc28e559b3646c938238b81ab02c567c943685 Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 17:24:12 +0100 Subject: [PATCH 04/25] Fix test --- management/server/peer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/management/server/peer_test.go b/management/server/peer_test.go index eb503d2184b..5ebbad4ecca 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -57,7 +57,7 @@ func TestPeer_LoginExpired(t *testing.T) { LastLogin: c.lastLogin, } - expired, _ := peer.LoginExpired(c.accountSettings) + expired, _ := peer.LoginExpired(c.accountSettings.PeerLoginExpiration) assert.Equal(t, expired, c.expected) }) } From c5f9ce5bc809d063d8a280331d0441c744cffbc9 Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 17:26:46 +0100 Subject: [PATCH 05/25] Fix lint and codacy issues --- management/server/account.go | 5 ++--- management/server/scheduler.go | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 05bcdf74b6c..e4c687d07a8 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -158,8 +158,7 @@ type Account struct { NameServerGroups map[string]*nbdns.NameServerGroup DNSSettings *DNSSettings // Settings is a dictionary of Account settings - Settings *Settings - loginExpiration *Scheduler + Settings *Settings } type UserInfo struct { @@ -606,7 +605,7 @@ func BuildManager(store Store, peersUpdateManager *PeersUpdateManager, idpManage am.singleAccountMode = singleAccountModeDomain != "" && len(allAccounts) <= 1 if am.singleAccountMode { if !isDomainValid(singleAccountModeDomain) { - return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for single accound mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain) + return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for a single account mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain) } am.singleAccountModeDomain = singleAccountModeDomain log.Infof("single account mode enabled, accounts number %d", len(allAccounts)) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index 556447cfd6c..6d5efe3a5ee 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -13,6 +13,7 @@ type Scheduler struct { mu *sync.Mutex } +// NewScheduler creates an instance of a Scheduler func NewScheduler() *Scheduler { return &Scheduler{ jobs: make(map[string]chan struct{}), From 6078c2bb03d0ef97744bf75cc987e931c472b93a Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 18:35:24 +0100 Subject: [PATCH 06/25] Optimize scheduler routines --- management/server/scheduler.go | 57 ++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index 6d5efe3a5ee..138b0183c27 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -25,8 +25,13 @@ func (wm *Scheduler) cancel(ID string) bool { cancel, ok := wm.jobs[ID] if ok { delete(wm.jobs, ID) - cancel <- struct{}{} - log.Debugf("cancelled scheduled job %s", ID) + select { + case cancel <- struct{}{}: + log.Debugf("cancelled scheduled job %s", ID) + default: + log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) + } + } return ok } @@ -47,27 +52,31 @@ func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedul wm.mu.Lock() defer wm.mu.Unlock() cancel := make(chan struct{}) - if _, ok := wm.jobs[ID]; !ok { - wm.jobs[ID] = cancel - log.Debugf("scheduled peer login expiration job for account %s to run in %s", ID, in.String()) - go func() { - select { - case <-time.After(in): - log.Debugf("time to do a scheduled job %s", ID) - wm.mu.Lock() - defer wm.mu.Unlock() - reschedule, runIn := job() - delete(wm.jobs, ID) - if reschedule { - go wm.Schedule(runIn, ID, job) - } - case <-cancel: - log.Debugf("stopped scheduled job %s ", ID) - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - return - } - }() + if _, ok := wm.jobs[ID]; ok { + log.Debugf("couldn't schedule a job %s because it already exists. There are %d total jobs scheduled.", + ID, len(wm.jobs)) + return } + + wm.jobs[ID] = cancel + log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs)) + go func() { + select { + case <-time.After(in): + log.Debugf("time to do a scheduled job %s", ID) + reschedule, runIn := job() + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + if reschedule { + go wm.Schedule(runIn, ID, job) + } + case <-cancel: + log.Debugf("stopped scheduled job %s ", ID) + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + return + } + }() } From 44b3a857109e43ca608e24065fb8f34c7dd7f86b Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 18:52:39 +0100 Subject: [PATCH 07/25] Add scheduler test --- management/server/scheduler_test.go | 52 ++++++++++++++++++++++++++++ management/server/turncredentials.go | 1 + 2 files changed, 53 insertions(+) create mode 100644 management/server/scheduler_test.go diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go new file mode 100644 index 00000000000..ea07fe1ad8e --- /dev/null +++ b/management/server/scheduler_test.go @@ -0,0 +1,52 @@ +package server + +import ( + "github.com/stretchr/testify/assert" + "sync" + "testing" + "time" +) + +func TestScheduler_Cancel(t *testing.T) { + jobID1 := "test-scheduler-job-1" + jobID2 := "test-scheduler-job-2" + scheduler := NewScheduler() + scheduler.Schedule(2*time.Second, jobID1, func() (reschedule bool, nextRunIn time.Duration) { + return false, 0 + }) + scheduler.Schedule(2*time.Second, jobID2, func() (reschedule bool, nextRunIn time.Duration) { + return false, 0 + }) + + assert.Len(t, scheduler.jobs, 2) + scheduler.Cancel([]string{jobID1}) + assert.Len(t, scheduler.jobs, 1) + assert.NotNil(t, scheduler.jobs[jobID2]) +} + +func TestScheduler_Schedule(t *testing.T) { + jobID := "test-scheduler-job-1" + scheduler := NewScheduler() + wg := sync.WaitGroup{} + wg.Add(1) + // job without reschedule should be triggered once + job := func() (reschedule bool, nextRunIn time.Duration) { + wg.Done() + return false, 0 + } + scheduler.Schedule(300*time.Millisecond, jobID, job) + wg.Wait() + + // job with reschedule should be triggered at least twice + wg = sync.WaitGroup{} + wg.Add(2) + job = func() (reschedule bool, nextRunIn time.Duration) { + wg.Done() + return true, 300 * time.Millisecond + } + + scheduler.Schedule(300*time.Millisecond, jobID, job) + wg.Wait() + scheduler.cancel(jobID) + +} diff --git a/management/server/turncredentials.go b/management/server/turncredentials.go index dcfab57dd6d..752376767b4 100644 --- a/management/server/turncredentials.go +++ b/management/server/turncredentials.go @@ -115,6 +115,7 @@ func (m *TimeBasedAuthSecretsManager) SetupRefresh(peerID string) { Turns: turns, }, } + log.Debugf("sending new TURN credentials to peer %s", peerID) err := m.updateManager.SendUpdate(peerID, &UpdateMessage{Update: update}) if err != nil { log.Errorf("error while sending TURN update to peer %s %v", peerID, err) From 51a756a4396421404af3d3c9895721c6fbaa813a Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 22 Feb 2023 19:04:03 +0100 Subject: [PATCH 08/25] Add scheduler performance test --- management/server/scheduler_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index ea07fe1ad8e..39eb32810bf 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -1,12 +1,33 @@ package server import ( + "fmt" "github.com/stretchr/testify/assert" + "math/rand" "sync" "testing" "time" ) +func TestScheduler_Performance(t *testing.T) { + scheduler := NewScheduler() + n := 1000 + wg := sync.WaitGroup{} + wg.Add(n) + for i := 0; i < n; i++ { + millis := time.Duration(rand.Intn(500-50)+50) * time.Millisecond + go scheduler.Schedule(millis, fmt.Sprintf("test-scheduler-job-%d", i), func() (reschedule bool, nextRunIn time.Duration) { + time.Sleep(millis) + wg.Done() + return false, 0 + }) + } + + assert.True(t, len(scheduler.jobs) > 0) + wg.Wait() + assert.Len(t, scheduler.jobs, 0) +} + func TestScheduler_Cancel(t *testing.T) { jobID1 := "test-scheduler-job-1" jobID2 := "test-scheduler-job-2" From 07a26cfdfa9e71b0fcfedd84ae1eab933aae02ff Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 23 Feb 2023 12:40:42 +0100 Subject: [PATCH 09/25] Refactor to use nil duration in GetNextPeerExpiration --- management/server/account.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index e4c687d07a8..b37c15524cd 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -324,12 +324,13 @@ func (a *Account) GetExpiredPeers() []*Peer { // GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found. // If there is no peer that expires this function returns false and a zero time.Duration // This function only considers peers that haven't been expired yet and connected. -func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { - nextExpiry := time.Duration(1<<63 - 1) // max duration +func (a *Account) GetNextPeerExpiration() *time.Duration { + peersWithExpiry := a.GetPeersWithExpiration() if len(peersWithExpiry) == 0 { - return nextExpiry, false + return nil } + nextExpiry := time.Duration(1<<63 - 1) // max duration for _, peer := range peersWithExpiry { // consider only connected peers because others will require login on connecting to the management server if peer.Status.LoginExpired || !peer.Status.Connected { @@ -341,7 +342,11 @@ func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { } } - return nextExpiry, nextExpiry != time.Duration(1<<63-1) + if nextExpiry == time.Duration(1<<63-1) { + return nil + } + + return &nextExpiry } // GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true @@ -748,15 +753,19 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() } } - nextExpiration, shouldRun := account.GetNextPeerExpiration() - return shouldRun, nextExpiration + nextExpiration := account.GetNextPeerExpiration() + if nextExpiration == nil { + return false, time.Duration(0) + } + return true, *nextExpiration } } func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { am.peerLoginExpiry.Cancel([]string{account.Id}) - if expiryNextRunIn, ok := account.GetNextPeerExpiration(); ok { - go am.peerLoginExpiry.Schedule(expiryNextRunIn, account.Id, am.peerLoginExpirationJob(account.Id)) + nextRun := account.GetNextPeerExpiration() + if nextRun != nil { + go am.peerLoginExpiry.Schedule(*nextRun, account.Id, am.peerLoginExpirationJob(account.Id)) } } From 52cdccff7b57dd9e306949b88b2541daa4a8c565 Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 23 Feb 2023 12:47:38 +0100 Subject: [PATCH 10/25] Skip already expired peers when cleaning up. --- management/server/account.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/management/server/account.go b/management/server/account.go index b37c15524cd..34d951998d2 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -730,6 +730,9 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() var peerIDs []string for _, peer := range account.GetExpiredPeers() { + if peer.Status.LoginExpired { + continue + } peerIDs = append(peerIDs, peer.ID) peer.MarkLoginExpired(true) account.UpdatePeer(peer) From 615ce0c3d122480ed1bd42180aac42c00a780f27 Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 23 Feb 2023 13:13:01 +0100 Subject: [PATCH 11/25] Fix typos and refactor GetNextPeerExpiration --- management/server/account.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 34d951998d2..0316dc433bf 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -308,7 +308,7 @@ func (a *Account) GetGroup(groupID string) *Group { return a.Groups[groupID] } -// GetExpiredPeers returns peers tha have been expired +// GetExpiredPeers returns peers that have been expired func (a *Account) GetExpiredPeers() []*Peer { var peers []*Peer for _, peer := range a.GetPeersWithExpiration() { @@ -321,32 +321,28 @@ func (a *Account) GetExpiredPeers() []*Peer { return peers } -// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found. -// If there is no peer that expires this function returns false and a zero time.Duration -// This function only considers peers that haven't been expired yet and connected. +// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire it it was found. +// If there is no peer that expires this function returns nil. +// This function only considers peers that haven't been expired yet and that are connected. func (a *Account) GetNextPeerExpiration() *time.Duration { peersWithExpiry := a.GetPeersWithExpiration() if len(peersWithExpiry) == 0 { return nil } - nextExpiry := time.Duration(1<<63 - 1) // max duration + var nextExpiry *time.Duration for _, peer := range peersWithExpiry { // consider only connected peers because others will require login on connecting to the management server if peer.Status.LoginExpired || !peer.Status.Connected { continue } _, duration := peer.LoginExpired(a.Settings.PeerLoginExpiration) - if duration < nextExpiry { - nextExpiry = duration + if nextExpiry == nil || duration < *nextExpiry { + nextExpiry = &duration } } - if nextExpiry == time.Duration(1<<63-1) { - return nil - } - - return &nextExpiry + return nextExpiry } // GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true From 57775b0cccdaaafc309500325342151a7764b2ec Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 23 Feb 2023 14:45:00 +0100 Subject: [PATCH 12/25] Minor scheduler corrections --- management/server/scheduler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index 138b0183c27..75a6a01d106 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -30,6 +30,7 @@ func (wm *Scheduler) cancel(ID string) bool { log.Debugf("cancelled scheduled job %s", ID) default: log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) + return false } } @@ -48,6 +49,7 @@ func (wm *Scheduler) Cancel(IDs []string) { } // Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time. +// If job with the provided ID already exists, a new one won't be scheduled. func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { wm.mu.Lock() defer wm.mu.Unlock() From 68f8ab34e8ae533880d298e6aaa71bc79983ca4d Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 23 Feb 2023 15:31:19 +0100 Subject: [PATCH 13/25] Add account expiration tests --- management/server/account.go | 4 +- management/server/account_test.go | 194 ++++++++++++++++++++++++++++++ 2 files changed, 196 insertions(+), 2 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 0316dc433bf..feca97160c3 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -321,7 +321,7 @@ func (a *Account) GetExpiredPeers() []*Peer { return peers } -// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire it it was found. +// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found. // If there is no peer that expires this function returns nil. // This function only considers peers that haven't been expired yet and that are connected. func (a *Account) GetNextPeerExpiration() *time.Duration { @@ -347,7 +347,7 @@ func (a *Account) GetNextPeerExpiration() *time.Duration { // GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true func (a *Account) GetPeersWithExpiration() []*Peer { - var peers []*Peer + peers := make([]*Peer, 0) for _, peer := range a.Peers { if peer.LoginExpirationEnabled { peers = append(peers, peer) diff --git a/management/server/account_test.go b/management/server/account_test.go index 979c41c86dd..58261e637f7 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1326,6 +1326,200 @@ func TestDefaultAccountManager_UpdateAccountSettings(t *testing.T) { require.Error(t, err, "expecting to fail when providing PeerLoginExpiration more than 180 days") } +func TestAccount_GetExpiredPeers(t *testing.T) { + +} + +func TestAccount_GetPeersWithExpiration(t *testing.T) { + type test struct { + name string + peers map[string]*Peer + expectedPeers map[string]struct{} + } + + testCases := []test{ + { + name: "No account peers, no peers with expiration", + peers: map[string]*Peer{}, + expectedPeers: map[string]struct{}{}, + }, + { + name: "Peers with login expiration disabled, no peers with expiration", + peers: map[string]*Peer{ + "peer-1": { + LoginExpirationEnabled: false, + }, + "peer-2": { + LoginExpirationEnabled: false, + }, + }, + expectedPeers: map[string]struct{}{}, + }, + { + name: "Peers with login expiration enabled, return peers with expiration", + peers: map[string]*Peer{ + "peer-1": { + ID: "peer-1", + LoginExpirationEnabled: true, + }, + "peer-2": { + LoginExpirationEnabled: false, + }, + }, + expectedPeers: map[string]struct{}{ + "peer-1": {}, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + account := &Account{ + Peers: testCase.peers, + } + + actual := account.GetPeersWithExpiration() + assert.Len(t, actual, len(testCase.expectedPeers)) + if len(testCase.expectedPeers) > 0 { + for k, _ := range testCase.expectedPeers { + contains := false + for _, peer := range actual { + if k == peer.ID { + contains = true + } + } + assert.True(t, contains) + } + } + }) + } + +} + +func TestAccount_GetNextPeerExpiration(t *testing.T) { + + type test struct { + name string + peers map[string]*Peer + expiration time.Duration + expirationEnabled bool + expectedNextExpiration *time.Duration + } + + expectedNextExpiration := time.Minute + testCases := []test{ + { + name: "No peers, no expiration", + peers: map[string]*Peer{}, + expiration: time.Second, + expirationEnabled: false, + expectedNextExpiration: nil, + }, + { + name: "No connected peers, no expiration", + peers: map[string]*Peer{ + "peer-1": { + Status: &PeerStatus{ + Connected: false, + }, + LoginExpirationEnabled: true, + }, + "peer-2": { + Status: &PeerStatus{ + Connected: true, + }, + LoginExpirationEnabled: false, + }, + }, + expiration: time.Second, + expirationEnabled: false, + expectedNextExpiration: nil, + }, + { + name: "Connected peers with disabled expiration, no expiration", + peers: map[string]*Peer{ + "peer-1": { + Status: &PeerStatus{ + Connected: true, + }, + LoginExpirationEnabled: false, + }, + "peer-2": { + Status: &PeerStatus{ + Connected: true, + }, + LoginExpirationEnabled: false, + }, + }, + expiration: time.Second, + expirationEnabled: false, + expectedNextExpiration: nil, + }, + { + name: "Expired peers, no expiration", + peers: map[string]*Peer{ + "peer-1": { + Status: &PeerStatus{ + Connected: true, + LoginExpired: true, + }, + LoginExpirationEnabled: true, + }, + "peer-2": { + Status: &PeerStatus{ + Connected: true, + LoginExpired: true, + }, + LoginExpirationEnabled: true, + }, + }, + expiration: time.Second, + expirationEnabled: false, + expectedNextExpiration: nil, + }, + { + name: "To be expired peer, return expiration", + peers: map[string]*Peer{ + "peer-1": { + Status: &PeerStatus{ + Connected: true, + LoginExpired: false, + }, + LoginExpirationEnabled: true, + LastLogin: time.Now(), + }, + "peer-2": { + Status: &PeerStatus{ + Connected: true, + LoginExpired: true, + }, + LoginExpirationEnabled: true, + }, + }, + expiration: time.Minute, + expirationEnabled: false, + expectedNextExpiration: &expectedNextExpiration, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + account := &Account{ + Peers: testCase.peers, + Settings: &Settings{PeerLoginExpiration: testCase.expiration, PeerLoginExpirationEnabled: testCase.expirationEnabled}, + } + + expiration := account.GetNextPeerExpiration() + if testCase.expectedNextExpiration != nil { + assert.True(t, *expiration >= 0 && *expiration <= *testCase.expectedNextExpiration) + } else { + assert.Nil(t, expiration) + } + + }) + } + +} + func createManager(t *testing.T) (*DefaultAccountManager, error) { store, err := createStore(t) if err != nil { From e1dbf51c78aab094eb979c88eec86973bffd2061 Mon Sep 17 00:00:00 2001 From: braginini Date: Sun, 26 Feb 2023 11:22:11 +0100 Subject: [PATCH 14/25] Notify account peers when expired peer authenticates --- management/server/peer.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/management/server/peer.go b/management/server/peer.go index 21973e2720a..49732421f09 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -241,7 +241,8 @@ func (am *DefaultAccountManager) MarkPeerConnected(peerPubKey string, connected return err } - newStatus := peer.Status.Copy() + oldStatus := peer.Status.Copy() + newStatus := oldStatus newStatus.LastSeen = time.Now() newStatus.Connected = connected // whenever peer got connected that means that it logged in successfully @@ -260,6 +261,15 @@ func (am *DefaultAccountManager) MarkPeerConnected(peerPubKey string, connected am.checkAndSchedulePeerLoginExpiration(account) } + if oldStatus.LoginExpired { + // we need to update other peers because when peer login expires all other peers are notified to disconnect from + //the expired one. Here we notify them that connection is now allowed again. + err = am.updateAccountPeers(account) + if err != nil { + return err + } + } + return nil } From da65f1642796aee9961ce46453d15bac25cfbf6e Mon Sep 17 00:00:00 2001 From: braginini Date: Sun, 26 Feb 2023 11:35:49 +0100 Subject: [PATCH 15/25] Simplify code of expiration --- management/server/account.go | 32 ++++++++++++++----------------- management/server/account_test.go | 27 ++++++++++++++++---------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index feca97160c3..cdf891e522f 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -322,13 +322,13 @@ func (a *Account) GetExpiredPeers() []*Peer { } // GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found. -// If there is no peer that expires this function returns nil. +// If there is no peer that expires this function returns false and a duration of 0. // This function only considers peers that haven't been expired yet and that are connected. -func (a *Account) GetNextPeerExpiration() *time.Duration { +func (a *Account) GetNextPeerExpiration() (bool, time.Duration) { peersWithExpiry := a.GetPeersWithExpiration() if len(peersWithExpiry) == 0 { - return nil + return false, time.Duration(0) } var nextExpiry *time.Duration for _, peer := range peersWithExpiry { @@ -342,7 +342,11 @@ func (a *Account) GetNextPeerExpiration() *time.Duration { } } - return nextExpiry + if nextExpiry == nil { + return false, time.Duration(0) + } + + return true, *nextExpiry } // GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true @@ -720,8 +724,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() account, err := am.Store.GetAccount(accountID) if err != nil { log.Errorf("failed getting account %s expiring peers", account.Id) - // todo return retry? - return false, 0 + return account.GetNextPeerExpiration() } var peerIDs []string @@ -735,8 +738,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status) if err != nil { log.Errorf("failed saving peer status while expiring peer %s", peer.ID) - // todo return retry? - return false, 0 + return account.GetNextPeerExpiration() } } @@ -748,23 +750,17 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() err := am.updateAccountPeers(account) if err != nil { log.Errorf("failed updating account peers while expiring peers for account %s", accountID) - return false, 0 + return account.GetNextPeerExpiration() } } - - nextExpiration := account.GetNextPeerExpiration() - if nextExpiration == nil { - return false, time.Duration(0) - } - return true, *nextExpiration + return account.GetNextPeerExpiration() } } func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { am.peerLoginExpiry.Cancel([]string{account.Id}) - nextRun := account.GetNextPeerExpiration() - if nextRun != nil { - go am.peerLoginExpiry.Schedule(*nextRun, account.Id, am.peerLoginExpirationJob(account.Id)) + if ok, nextRun := account.GetNextPeerExpiration(); ok { + go am.peerLoginExpiry.Schedule(nextRun, account.Id, am.peerLoginExpirationJob(account.Id)) } } diff --git a/management/server/account_test.go b/management/server/account_test.go index 58261e637f7..53231b96133 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1403,7 +1403,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { peers map[string]*Peer expiration time.Duration expirationEnabled bool - expectedNextExpiration *time.Duration + expectedNextRun bool + expectedNextExpiration time.Duration } expectedNextExpiration := time.Minute @@ -1413,7 +1414,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { peers: map[string]*Peer{}, expiration: time.Second, expirationEnabled: false, - expectedNextExpiration: nil, + expectedNextRun: false, + expectedNextExpiration: time.Duration(0), }, { name: "No connected peers, no expiration", @@ -1433,7 +1435,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { }, expiration: time.Second, expirationEnabled: false, - expectedNextExpiration: nil, + expectedNextRun: false, + expectedNextExpiration: time.Duration(0), }, { name: "Connected peers with disabled expiration, no expiration", @@ -1453,7 +1456,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { }, expiration: time.Second, expirationEnabled: false, - expectedNextExpiration: nil, + expectedNextRun: false, + expectedNextExpiration: time.Duration(0), }, { name: "Expired peers, no expiration", @@ -1475,7 +1479,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { }, expiration: time.Second, expirationEnabled: false, - expectedNextExpiration: nil, + expectedNextRun: false, + expectedNextExpiration: time.Duration(0), }, { name: "To be expired peer, return expiration", @@ -1498,7 +1503,8 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { }, expiration: time.Minute, expirationEnabled: false, - expectedNextExpiration: &expectedNextExpiration, + expectedNextRun: true, + expectedNextExpiration: expectedNextExpiration, }, } for _, testCase := range testCases { @@ -1508,11 +1514,12 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { Settings: &Settings{PeerLoginExpiration: testCase.expiration, PeerLoginExpirationEnabled: testCase.expirationEnabled}, } - expiration := account.GetNextPeerExpiration() - if testCase.expectedNextExpiration != nil { - assert.True(t, *expiration >= 0 && *expiration <= *testCase.expectedNextExpiration) + ok, expiration := account.GetNextPeerExpiration() + assert.Equal(t, ok, testCase.expectedNextRun) + if testCase.expectedNextRun { + assert.True(t, expiration >= 0 && expiration <= testCase.expectedNextExpiration) } else { - assert.Nil(t, expiration) + assert.Equal(t, expiration, testCase.expectedNextExpiration) } }) From 4f672c7a6594c9518ab12709d1672552d89c2dcc Mon Sep 17 00:00:00 2001 From: braginini Date: Sun, 26 Feb 2023 19:31:45 +0100 Subject: [PATCH 16/25] Fix lint issues --- management/server/account_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/management/server/account_test.go b/management/server/account_test.go index 53231b96133..b8dd33ea945 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1381,7 +1381,7 @@ func TestAccount_GetPeersWithExpiration(t *testing.T) { actual := account.GetPeersWithExpiration() assert.Len(t, actual, len(testCase.expectedPeers)) if len(testCase.expectedPeers) > 0 { - for k, _ := range testCase.expectedPeers { + for k := range testCase.expectedPeers { contains := false for _, peer := range actual { if k == peer.ID { From c42d051fa97406bd7d9dce2595b79b451a0119ec Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 11:05:51 +0100 Subject: [PATCH 17/25] Add peer expiration tests --- management/server/account.go | 4 +- management/server/account_test.go | 155 ++++++++++++++++++++++++++++ management/server/scheduler.go | 45 ++++++-- management/server/scheduler_test.go | 6 +- 4 files changed, 197 insertions(+), 13 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index cdf891e522f..2289f2f6358 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -120,7 +120,7 @@ type DefaultAccountManager struct { singleAccountModeDomain string // dnsDomain is used for peer resolution. This is appended to the peer's name dnsDomain string - peerLoginExpiry *Scheduler + peerLoginExpiry Scheduler } // Settings represents Account settings structure that can be modified via API and Dashboard @@ -603,7 +603,7 @@ func BuildManager(store Store, peersUpdateManager *PeersUpdateManager, idpManage cacheLoading: map[string]chan struct{}{}, dnsDomain: dnsDomain, eventStore: eventStore, - peerLoginExpiry: NewScheduler(), + peerLoginExpiry: NewDefaultScheduler(), } allAccounts := store.GetAllAccounts() // enable single account mode only if configured by user and number of existing accounts is not grater than 1 diff --git a/management/server/account_test.go b/management/server/account_test.go index b8dd33ea945..24606dfe3a0 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1294,6 +1294,147 @@ func TestDefaultAccountManager_DefaultAccountSettings(t *testing.T) { assert.Equal(t, account.Settings.PeerLoginExpirationEnabled, true) assert.Equal(t, account.Settings.PeerLoginExpiration, 24*time.Hour) } +func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) { + manager, err := createManager(t) + require.NoError(t, err, "unable to create account manager") + account, err := manager.GetAccountByUserOrAccountID(userID, "", "") + require.NoError(t, err, "unable to create an account") + + key, err := wgtypes.GenerateKey() + require.NoError(t, err, "unable to generate WireGuard key") + peer, err := manager.AddPeer("", userID, &Peer{ + Key: key.PublicKey().String(), + Meta: PeerSystemMeta{}, + Name: "test-peer", + LoginExpirationEnabled: true, + }) + require.NoError(t, err, "unable to add peer") + err = manager.MarkPeerConnected(key.PublicKey().String(), true) + require.NoError(t, err, "unable to mark peer connected") + account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + PeerLoginExpiration: time.Hour, + PeerLoginExpirationEnabled: true}) + require.NoError(t, err, "expecting to update account settings successfully but got error") + + wg := &sync.WaitGroup{} + wg.Add(2) + manager.peerLoginExpiry = &MockScheduler{ + CancelFunc: func(IDs []string) { + wg.Done() + }, + ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + wg.Done() + }, + } + + // disable expiration first + update := peer.Copy() + update.LoginExpirationEnabled = false + _, err = manager.UpdatePeer(account.Id, userID, update) + require.NoError(t, err, "unable to update peer") + // enabling expiration should trigger the routine + update.LoginExpirationEnabled = true + _, err = manager.UpdatePeer(account.Id, userID, update) + require.NoError(t, err, "unable to update peer") + + failed := waitTimeout(wg, time.Second) + if failed { + t.Fatal("timeout while waiting for test to finish") + } +} + +func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.T) { + manager, err := createManager(t) + require.NoError(t, err, "unable to create account manager") + account, err := manager.GetAccountByUserOrAccountID(userID, "", "") + require.NoError(t, err, "unable to create an account") + + key, err := wgtypes.GenerateKey() + require.NoError(t, err, "unable to generate WireGuard key") + _, err = manager.AddPeer("", userID, &Peer{ + Key: key.PublicKey().String(), + Meta: PeerSystemMeta{}, + Name: "test-peer", + LoginExpirationEnabled: true, + }) + require.NoError(t, err, "unable to add peer") + account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + PeerLoginExpiration: time.Hour, + PeerLoginExpirationEnabled: true}) + require.NoError(t, err, "expecting to update account settings successfully but got error") + + wg := &sync.WaitGroup{} + wg.Add(2) + manager.peerLoginExpiry = &MockScheduler{ + CancelFunc: func(IDs []string) { + wg.Done() + }, + ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + wg.Done() + }, + } + + // when we mark peer as connected, the peer login expiration routine should trigger + err = manager.MarkPeerConnected(key.PublicKey().String(), true) + require.NoError(t, err, "unable to mark peer connected") + + failed := waitTimeout(wg, time.Second) + if failed { + t.Fatal("timeout while waiting for test to finish") + } + +} + +func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *testing.T) { + manager, err := createManager(t) + require.NoError(t, err, "unable to create account manager") + account, err := manager.GetAccountByUserOrAccountID(userID, "", "") + require.NoError(t, err, "unable to create an account") + + key, err := wgtypes.GenerateKey() + require.NoError(t, err, "unable to generate WireGuard key") + _, err = manager.AddPeer("", userID, &Peer{ + Key: key.PublicKey().String(), + Meta: PeerSystemMeta{}, + Name: "test-peer", + LoginExpirationEnabled: true, + }) + require.NoError(t, err, "unable to add peer") + err = manager.MarkPeerConnected(key.PublicKey().String(), true) + require.NoError(t, err, "unable to mark peer connected") + + wg := &sync.WaitGroup{} + wg.Add(2) + manager.peerLoginExpiry = &MockScheduler{ + CancelFunc: func(IDs []string) { + wg.Done() + }, + ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + wg.Done() + }, + } + // enabling PeerLoginExpirationEnabled should trigger the expiration job + account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + PeerLoginExpiration: time.Hour, + PeerLoginExpirationEnabled: true}) + require.NoError(t, err, "expecting to update account settings successfully but got error") + + failed := waitTimeout(wg, time.Second) + if failed { + t.Fatal("timeout while waiting for test to finish") + } + wg.Add(1) + + // disabling PeerLoginExpirationEnabled should trigger cancel + account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + PeerLoginExpiration: time.Hour, + PeerLoginExpirationEnabled: false}) + require.NoError(t, err, "expecting to update account settings successfully but got error") + failed = waitTimeout(wg, time.Second) + if failed { + t.Fatal("timeout while waiting for test to finish") + } +} func TestDefaultAccountManager_UpdateAccountSettings(t *testing.T) { manager, err := createManager(t) @@ -1545,3 +1686,17 @@ func createStore(t *testing.T) (Store, error) { return store, nil } + +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false + case <-time.After(timeout): + return true + } +} diff --git a/management/server/scheduler.go b/management/server/scheduler.go index 75a6a01d106..c06589755b6 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -6,22 +6,51 @@ import ( "time" ) -// Scheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them. -type Scheduler struct { +// Scheduler is an interface which implementations can schedule and cancel jobs +type Scheduler interface { + Cancel(IDs []string) + Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) +} + +type MockScheduler struct { + CancelFunc func(IDs []string) + ScheduleFunc func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) +} + +// Cancel mocks the Cancel function of the Scheduler interface +func (mock *MockScheduler) Cancel(IDs []string) { + if mock.CancelFunc != nil { + mock.CancelFunc(IDs) + return + } + log.Errorf("MockScheduler doesn't have Cancel function defined ") +} + +// Schedule mocks the Schedule function of the Scheduler interface +func (mock *MockScheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + if mock.ScheduleFunc != nil { + mock.ScheduleFunc(in, ID, job) + return + } + log.Errorf("MockScheduler doesn't have Schedule function defined") +} + +// DefaultScheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them. +type DefaultScheduler struct { // jobs map holds cancellation channels indexed by the job ID jobs map[string]chan struct{} mu *sync.Mutex } -// NewScheduler creates an instance of a Scheduler -func NewScheduler() *Scheduler { - return &Scheduler{ +// NewDefaultScheduler creates an instance of a DefaultScheduler +func NewDefaultScheduler() *DefaultScheduler { + return &DefaultScheduler{ jobs: make(map[string]chan struct{}), mu: &sync.Mutex{}, } } -func (wm *Scheduler) cancel(ID string) bool { +func (wm *DefaultScheduler) cancel(ID string) bool { cancel, ok := wm.jobs[ID] if ok { delete(wm.jobs, ID) @@ -39,7 +68,7 @@ func (wm *Scheduler) cancel(ID string) bool { // Cancel cancels the scheduled job by ID if present. // If job wasn't found the function returns false. -func (wm *Scheduler) Cancel(IDs []string) { +func (wm *DefaultScheduler) Cancel(IDs []string) { wm.mu.Lock() defer wm.mu.Unlock() @@ -50,7 +79,7 @@ func (wm *Scheduler) Cancel(IDs []string) { // Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time. // If job with the provided ID already exists, a new one won't be scheduled. -func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { +func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { wm.mu.Lock() defer wm.mu.Unlock() cancel := make(chan struct{}) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 39eb32810bf..89300f9ea47 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -10,7 +10,7 @@ import ( ) func TestScheduler_Performance(t *testing.T) { - scheduler := NewScheduler() + scheduler := NewDefaultScheduler() n := 1000 wg := sync.WaitGroup{} wg.Add(n) @@ -31,7 +31,7 @@ func TestScheduler_Performance(t *testing.T) { func TestScheduler_Cancel(t *testing.T) { jobID1 := "test-scheduler-job-1" jobID2 := "test-scheduler-job-2" - scheduler := NewScheduler() + scheduler := NewDefaultScheduler() scheduler.Schedule(2*time.Second, jobID1, func() (reschedule bool, nextRunIn time.Duration) { return false, 0 }) @@ -47,7 +47,7 @@ func TestScheduler_Cancel(t *testing.T) { func TestScheduler_Schedule(t *testing.T) { jobID := "test-scheduler-job-1" - scheduler := NewScheduler() + scheduler := NewDefaultScheduler() wg := sync.WaitGroup{} wg.Add(1) // job without reschedule should be triggered once From 694d7cd0b6798b936712a8d0ebec212a03613039 Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 11:17:11 +0100 Subject: [PATCH 18/25] Fix lint issues --- management/server/account_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/management/server/account_test.go b/management/server/account_test.go index 24606dfe3a0..f2e54f98658 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1358,7 +1358,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing. LoginExpirationEnabled: true, }) require.NoError(t, err, "unable to add peer") - account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + _, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ PeerLoginExpiration: time.Hour, PeerLoginExpirationEnabled: true}) require.NoError(t, err, "expecting to update account settings successfully but got error") @@ -1426,7 +1426,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test wg.Add(1) // disabling PeerLoginExpirationEnabled should trigger cancel - account, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ + _, err = manager.UpdateAccountSettings(account.Id, userID, &Settings{ PeerLoginExpiration: time.Hour, PeerLoginExpirationEnabled: false}) require.NoError(t, err, "expecting to update account settings successfully but got error") From 0a95e069ad23178f83da671a9c8e6b718569ba32 Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 12:21:43 +0100 Subject: [PATCH 19/25] Fix Codacy --- management/server/scheduler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index c06589755b6..513310793fb 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -12,6 +12,7 @@ type Scheduler interface { Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) } +// MockScheduler is a mock implementation of Scheduler type MockScheduler struct { CancelFunc func(IDs []string) ScheduleFunc func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) From f83b96e8883692d946497fa293fd7ef830eb642d Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 12:54:59 +0100 Subject: [PATCH 20/25] Refactor to use conventional (value, bool) return --- management/server/account.go | 14 ++++----- management/server/account_test.go | 8 +++--- management/server/scheduler.go | 10 +++---- management/server/scheduler_test.go | 44 ++++++++++++++++++----------- 4 files changed, 44 insertions(+), 32 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 2289f2f6358..ac00462fab1 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -324,11 +324,11 @@ func (a *Account) GetExpiredPeers() []*Peer { // GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found. // If there is no peer that expires this function returns false and a duration of 0. // This function only considers peers that haven't been expired yet and that are connected. -func (a *Account) GetNextPeerExpiration() (bool, time.Duration) { +func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { peersWithExpiry := a.GetPeersWithExpiration() if len(peersWithExpiry) == 0 { - return false, time.Duration(0) + return 0, false } var nextExpiry *time.Duration for _, peer := range peersWithExpiry { @@ -343,10 +343,10 @@ func (a *Account) GetNextPeerExpiration() (bool, time.Duration) { } if nextExpiry == nil { - return false, time.Duration(0) + return 0, false } - return true, *nextExpiry + return *nextExpiry, true } // GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true @@ -716,8 +716,8 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, return updatedAccount, nil } -func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() (bool, time.Duration) { - return func() (bool, time.Duration) { +func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() (time.Duration, bool) { + return func() (time.Duration, bool) { unlock := am.Store.AcquireAccountLock(accountID) defer unlock() @@ -759,7 +759,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { am.peerLoginExpiry.Cancel([]string{account.Id}) - if ok, nextRun := account.GetNextPeerExpiration(); ok { + if nextRun, ok := account.GetNextPeerExpiration(); ok { go am.peerLoginExpiry.Schedule(nextRun, account.Id, am.peerLoginExpirationJob(account.Id)) } } diff --git a/management/server/account_test.go b/management/server/account_test.go index f2e54f98658..baa76185dc6 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1322,7 +1322,7 @@ func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) { CancelFunc: func(IDs []string) { wg.Done() }, - ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + ScheduleFunc: func(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { wg.Done() }, } @@ -1369,7 +1369,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing. CancelFunc: func(IDs []string) { wg.Done() }, - ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + ScheduleFunc: func(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { wg.Done() }, } @@ -1409,7 +1409,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test CancelFunc: func(IDs []string) { wg.Done() }, - ScheduleFunc: func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { + ScheduleFunc: func(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { wg.Done() }, } @@ -1655,7 +1655,7 @@ func TestAccount_GetNextPeerExpiration(t *testing.T) { Settings: &Settings{PeerLoginExpiration: testCase.expiration, PeerLoginExpirationEnabled: testCase.expirationEnabled}, } - ok, expiration := account.GetNextPeerExpiration() + expiration, ok := account.GetNextPeerExpiration() assert.Equal(t, ok, testCase.expectedNextRun) if testCase.expectedNextRun { assert.True(t, expiration >= 0 && expiration <= testCase.expectedNextExpiration) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index 513310793fb..a35bdc30ce1 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -9,13 +9,13 @@ import ( // Scheduler is an interface which implementations can schedule and cancel jobs type Scheduler interface { Cancel(IDs []string) - Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) + Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) } // MockScheduler is a mock implementation of Scheduler type MockScheduler struct { CancelFunc func(IDs []string) - ScheduleFunc func(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) + ScheduleFunc func(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) } // Cancel mocks the Cancel function of the Scheduler interface @@ -28,7 +28,7 @@ func (mock *MockScheduler) Cancel(IDs []string) { } // Schedule mocks the Schedule function of the Scheduler interface -func (mock *MockScheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { +func (mock *MockScheduler) Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { if mock.ScheduleFunc != nil { mock.ScheduleFunc(in, ID, job) return @@ -80,7 +80,7 @@ func (wm *DefaultScheduler) Cancel(IDs []string) { // Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time. // If job with the provided ID already exists, a new one won't be scheduled. -func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { +func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { wm.mu.Lock() defer wm.mu.Unlock() cancel := make(chan struct{}) @@ -96,7 +96,7 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (re select { case <-time.After(in): log.Debugf("time to do a scheduled job %s", ID) - reschedule, runIn := job() + runIn, reschedule := job() wm.mu.Lock() defer wm.mu.Unlock() delete(wm.jobs, ID) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 89300f9ea47..f4b03cf696e 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -12,19 +12,23 @@ import ( func TestScheduler_Performance(t *testing.T) { scheduler := NewDefaultScheduler() n := 1000 - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} wg.Add(n) for i := 0; i < n; i++ { millis := time.Duration(rand.Intn(500-50)+50) * time.Millisecond - go scheduler.Schedule(millis, fmt.Sprintf("test-scheduler-job-%d", i), func() (reschedule bool, nextRunIn time.Duration) { + go scheduler.Schedule(millis, fmt.Sprintf("test-scheduler-job-%d", i), func() (nextRunIn time.Duration, reschedule bool) { time.Sleep(millis) wg.Done() - return false, 0 + return 0, false }) } assert.True(t, len(scheduler.jobs) > 0) - wg.Wait() + failed := waitTimeout(wg, time.Second) + if failed { + t.Fatal("timed out while waiting for test to finish") + return + } assert.Len(t, scheduler.jobs, 0) } @@ -32,11 +36,11 @@ func TestScheduler_Cancel(t *testing.T) { jobID1 := "test-scheduler-job-1" jobID2 := "test-scheduler-job-2" scheduler := NewDefaultScheduler() - scheduler.Schedule(2*time.Second, jobID1, func() (reschedule bool, nextRunIn time.Duration) { - return false, 0 + scheduler.Schedule(2*time.Second, jobID1, func() (nextRunIn time.Duration, reschedule bool) { + return 0, false }) - scheduler.Schedule(2*time.Second, jobID2, func() (reschedule bool, nextRunIn time.Duration) { - return false, 0 + scheduler.Schedule(2*time.Second, jobID2, func() (nextRunIn time.Duration, reschedule bool) { + return 0, false }) assert.Len(t, scheduler.jobs, 2) @@ -48,26 +52,34 @@ func TestScheduler_Cancel(t *testing.T) { func TestScheduler_Schedule(t *testing.T) { jobID := "test-scheduler-job-1" scheduler := NewDefaultScheduler() - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} wg.Add(1) // job without reschedule should be triggered once - job := func() (reschedule bool, nextRunIn time.Duration) { + job := func() (nextRunIn time.Duration, reschedule bool) { wg.Done() - return false, 0 + return 0, false } scheduler.Schedule(300*time.Millisecond, jobID, job) - wg.Wait() + failed := waitTimeout(wg, time.Second) + if failed { + t.Fatal("timed out while waiting for test to finish") + return + } // job with reschedule should be triggered at least twice - wg = sync.WaitGroup{} + wg = &sync.WaitGroup{} wg.Add(2) - job = func() (reschedule bool, nextRunIn time.Duration) { + job = func() (nextRunIn time.Duration, reschedule bool) { wg.Done() - return true, 300 * time.Millisecond + return 300 * time.Millisecond, true } scheduler.Schedule(300*time.Millisecond, jobID, job) - wg.Wait() + failed = waitTimeout(wg, time.Second) + if failed { + t.Fatal("timed out while waiting for test to finish") + return + } scheduler.cancel(jobID) } From 586e6fce7720f8c0245bfe7656c75e8fa1e22abb Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 14:52:06 +0100 Subject: [PATCH 21/25] test GetExpiredPeers --- management/server/account_test.go | 79 +++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/management/server/account_test.go b/management/server/account_test.go index baa76185dc6..1d672e1b780 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1468,6 +1468,85 @@ func TestDefaultAccountManager_UpdateAccountSettings(t *testing.T) { } func TestAccount_GetExpiredPeers(t *testing.T) { + type test struct { + name string + peers map[string]*Peer + expectedPeers map[string]struct{} + } + testCases := []test{ + { + name: "Peers with login expiration disabled, no expired peers", + peers: map[string]*Peer{ + "peer-1": { + LoginExpirationEnabled: false, + }, + "peer-2": { + LoginExpirationEnabled: false, + }, + }, + expectedPeers: map[string]struct{}{}, + }, + { + name: "Two peers expired", + peers: map[string]*Peer{ + "peer-1": { + ID: "peer-1", + LoginExpirationEnabled: true, + Status: &PeerStatus{ + LastSeen: time.Now(), + Connected: true, + LoginExpired: false, + }, + LastLogin: time.Now().Add(-30 * time.Minute), + }, + "peer-2": { + ID: "peer-2", + LoginExpirationEnabled: true, + Status: &PeerStatus{ + LastSeen: time.Now(), + Connected: true, + LoginExpired: false, + }, + LastLogin: time.Now().Add(-2 * time.Hour), + }, + + "peer-3": { + ID: "peer-3", + LoginExpirationEnabled: true, + Status: &PeerStatus{ + LastSeen: time.Now(), + Connected: true, + LoginExpired: false, + }, + LastLogin: time.Now().Add(-1 * time.Hour), + }, + }, + expectedPeers: map[string]struct{}{ + "peer-2": {}, + "peer-3": {}, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + account := &Account{ + Peers: testCase.peers, + Settings: &Settings{ + PeerLoginExpirationEnabled: true, + PeerLoginExpiration: time.Hour, + }, + } + + expiredPeers := account.GetExpiredPeers() + assert.Len(t, expiredPeers, len(testCase.expectedPeers)) + for _, peer := range expiredPeers { + if _, ok := testCase.expectedPeers[peer.ID]; !ok { + t.Fatalf("expected to have peer %s expired", peer.ID) + } + } + }) + } } From 894f778b2904f45456876b169376f53c0f6b1a8f Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 14:52:22 +0100 Subject: [PATCH 22/25] Make scheduler test more clear --- management/server/scheduler_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index f4b03cf696e..3312ffe66f9 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -14,8 +14,10 @@ func TestScheduler_Performance(t *testing.T) { n := 1000 wg := &sync.WaitGroup{} wg.Add(n) + maxMs := 500 + minMs := 50 for i := 0; i < n; i++ { - millis := time.Duration(rand.Intn(500-50)+50) * time.Millisecond + millis := time.Duration(rand.Intn(maxMs-minMs)+minMs) * time.Millisecond go scheduler.Schedule(millis, fmt.Sprintf("test-scheduler-job-%d", i), func() (nextRunIn time.Duration, reschedule bool) { time.Sleep(millis) wg.Done() From 963caffe934bdd79a8a64937d719ca359d1cc0ba Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 15:53:05 +0100 Subject: [PATCH 23/25] Simplify load test of the Scheduler --- management/server/scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 3312ffe66f9..398c983ff71 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -11,7 +11,7 @@ import ( func TestScheduler_Performance(t *testing.T) { scheduler := NewDefaultScheduler() - n := 1000 + n := 500 wg := &sync.WaitGroup{} wg.Add(n) maxMs := 500 @@ -26,7 +26,7 @@ func TestScheduler_Performance(t *testing.T) { } assert.True(t, len(scheduler.jobs) > 0) - failed := waitTimeout(wg, time.Second) + failed := waitTimeout(wg, 3*time.Second) if failed { t.Fatal("timed out while waiting for test to finish") return From 0867bbe88843cb52d3c13a3e2e7ef1b967bdb4f1 Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 16:19:03 +0100 Subject: [PATCH 24/25] Fix scheduler test --- management/server/scheduler_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 398c983ff71..7e0ab0b8e53 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -70,10 +70,19 @@ func TestScheduler_Schedule(t *testing.T) { // job with reschedule should be triggered at least twice wg = &sync.WaitGroup{} + mx := &sync.Mutex{} + scheduledTimes := 0 wg.Add(2) job = func() (nextRunIn time.Duration, reschedule bool) { - wg.Done() - return 300 * time.Millisecond, true + mx.Lock() + defer mx.Unlock() + // ensure we repeat only twice + if scheduledTimes < 2 { + wg.Done() + scheduledTimes++ + return 300 * time.Millisecond, true + } + return 0, false } scheduler.Schedule(300*time.Millisecond, jobID, job) From 83099920c82fd7d54da90d9b8065fbee9365f24a Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 27 Feb 2023 16:28:06 +0100 Subject: [PATCH 25/25] Fix scheduler test --- management/server/scheduler_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 7e0ab0b8e53..0c0cef99b35 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -24,8 +24,6 @@ func TestScheduler_Performance(t *testing.T) { return 0, false }) } - - assert.True(t, len(scheduler.jobs) > 0) failed := waitTimeout(wg, 3*time.Second) if failed { t.Fatal("timed out while waiting for test to finish")