Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proactively expire peers' login per account #698

Merged
merged 25 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5a3993a
Enable peer login expiration by default when adding a peer
braginini Feb 20, 2023
7176f73
Force peer expiration
braginini Feb 22, 2023
153452f
Check expiration on peer update and connect
braginini Feb 22, 2023
36dc28e
Fix test
braginini Feb 22, 2023
c5f9ce5
Fix lint and codacy issues
braginini Feb 22, 2023
6078c2b
Optimize scheduler routines
braginini Feb 22, 2023
44b3a85
Add scheduler test
braginini Feb 22, 2023
51a756a
Add scheduler performance test
braginini Feb 22, 2023
07a26cf
Refactor to use nil duration in GetNextPeerExpiration
braginini Feb 23, 2023
52cdccf
Skip already expired peers when cleaning up.
braginini Feb 23, 2023
615ce0c
Fix typos and refactor GetNextPeerExpiration
braginini Feb 23, 2023
57775b0
Minor scheduler corrections
braginini Feb 23, 2023
68f8ab3
Add account expiration tests
braginini Feb 23, 2023
e1dbf51
Notify account peers when expired peer authenticates
braginini Feb 26, 2023
da65f16
Simplify code of expiration
braginini Feb 26, 2023
4f672c7
Fix lint issues
braginini Feb 26, 2023
c42d051
Add peer expiration tests
braginini Feb 27, 2023
694d7cd
Fix lint issues
braginini Feb 27, 2023
0a95e06
Fix Codacy
braginini Feb 27, 2023
f83b96e
Refactor to use conventional (value, bool) return
braginini Feb 27, 2023
586e6fc
test GetExpiredPeers
braginini Feb 27, 2023
894f778
Make scheduler test more clear
braginini Feb 27, 2023
963caff
Simplify load test of the Scheduler
braginini Feb 27, 2023
0867bbe
Fix scheduler test
braginini Feb 27, 2023
8309992
Fix scheduler test
braginini Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ type DefaultAccountManager struct {
// singleAccountModeDomain is a domain to use in singleAccountMode setup
singleAccountModeDomain string
// dnsDomain is used for peer resolution. This is appended to the peer's name
dnsDomain string
dnsDomain string
peerLoginExpiry *Scheduler
}

// Settings represents Account settings structure that can be modified via API and Dashboard
Expand Down Expand Up @@ -307,6 +308,53 @@ func (a *Account) GetGroup(groupID string) *Group {
return a.Groups[groupID]
}

// GetExpiredPeers returns peers tha have been expired
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a 't' in that

func (a *Account) GetExpiredPeers() []*Peer {
var peers []*Peer
for _, peer := range a.GetPeersWithExpiration() {
expired, _ := peer.LoginExpired(a.Settings.PeerLoginExpiration)
if expired {
peers = append(peers, peer)
}
}

return peers
}

// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found.
// If there is no peer that expires this function returns false and a zero time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we return max value not 0 if no peer is found

If there is no peer that is not yet expired, this function returns false and the max time.Duration value as default

// This function only considers peers that haven't been expired yet and connected.
func (a *Account) GetNextPeerExpiration() (time.Duration, bool) {
nextExpiry := time.Duration(1<<63 - 1) // max duration
peersWithExpiry := a.GetPeersWithExpiration()
if len(peersWithExpiry) == 0 {
return nextExpiry, false
}
for _, peer := range peersWithExpiry {
// consider only connected peers because others will require login on connecting to the management server
if peer.Status.LoginExpired || !peer.Status.Connected {
continue
}
_, duration := peer.LoginExpired(a.Settings.PeerLoginExpiration)
if duration < nextExpiry {
nextExpiry = duration
}
}

return nextExpiry, nextExpiry != time.Duration(1<<63-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe go with nil instead of max value for nextExpiry if no more expiring peers are available as it is more clean from a coding point of view to not return a value if there is no value available.

}

// GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true
func (a *Account) GetPeersWithExpiration() []*Peer {
var peers []*Peer
for _, peer := range a.Peers {
if peer.LoginExpirationEnabled {
peers = append(peers, peer)
}
}
return peers
}

// GetPeers returns a list of all Account peers
func (a *Account) GetPeers() []*Peer {
var peers []*Peer
Expand Down Expand Up @@ -550,13 +598,14 @@ func BuildManager(store Store, peersUpdateManager *PeersUpdateManager, idpManage
cacheLoading: map[string]chan struct{}{},
dnsDomain: dnsDomain,
eventStore: eventStore,
peerLoginExpiry: NewScheduler(),
}
allAccounts := store.GetAllAccounts()
// enable single account mode only if configured by user and number of existing accounts is not grater than 1
am.singleAccountMode = singleAccountModeDomain != "" && len(allAccounts) <= 1
if am.singleAccountMode {
if !isDomainValid(singleAccountModeDomain) {
return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for single accound mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain)
return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for a single account mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain)
}
am.singleAccountModeDomain = singleAccountModeDomain
log.Infof("single account mode enabled, accounts number %d", len(allAccounts))
Expand Down Expand Up @@ -640,12 +689,16 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string,
event := activity.AccountPeerLoginExpirationEnabled
if !newSettings.PeerLoginExpirationEnabled {
event = activity.AccountPeerLoginExpirationDisabled
am.peerLoginExpiry.Cancel([]string{accountID})
} else {
am.checkAndSchedulePeerLoginExpiration(account)
}
am.storeEvent(userID, accountID, accountID, event, nil)
}

if oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration {
am.storeEvent(userID, accountID, accountID, activity.AccountPeerLoginExpirationDurationUpdated, nil)
am.checkAndSchedulePeerLoginExpiration(account)
}

updatedAccount := account.UpdateSettings(newSettings)
Expand All @@ -658,6 +711,55 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string,
return updatedAccount, nil
}

func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() (bool, time.Duration) {
return func() (bool, time.Duration) {
unlock := am.Store.AcquireAccountLock(accountID)
defer unlock()

account, err := am.Store.GetAccount(accountID)
if err != nil {
log.Errorf("failed getting account %s expiring peers", account.Id)
// todo return retry?
return false, 0
}

var peerIDs []string
for _, peer := range account.GetExpiredPeers() {
peerIDs = append(peerIDs, peer.ID)
peer.MarkLoginExpired(true)
account.UpdatePeer(peer)
err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status)
if err != nil {
log.Errorf("failed saving peer status while expiring peer %s", peer.ID)
// todo return retry?
return false, 0
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure on this one but I think we do way to much computing overhead here.

account.GetExpiredPeers() will return the complete list of expired peers. Even peers that have been expired for months. So in case we we have lots of zombies in the account that are not used anymore but are doing database overrides and peer updates for all of them on any run with the same (unchanged) data.

I would add additional check for status changes at the top of the loop:

if peer.Status.LoginExpired {
     continue
}


log.Debugf("discovered %d peers to expire for account %s", len(peerIDs), account.Id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log message should also show wrong number of peers. Not newly expired but total expired in account


if len(peerIDs) != 0 {
// this will trigger peer disconnect from the management service
am.peersUpdateManager.CloseChannels(peerIDs)
err := am.updateAccountPeers(account)
if err != nil {
log.Errorf("failed updating account peers while expiring peers for account %s", accountID)
return false, 0
}
}

nextExpiration, shouldRun := account.GetNextPeerExpiration()
return shouldRun, nextExpiration
}
}

func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) {
am.peerLoginExpiry.Cancel([]string{account.Id})
if expiryNextRunIn, ok := account.GetNextPeerExpiration(); ok {
go am.peerLoginExpiry.Schedule(expiryNextRunIn, account.Id, am.peerLoginExpirationJob(account.Id))
}
}

// newAccount creates a new Account with a generated ID and generated default setup keys.
// If ID is already in use (due to collision) we try one more time before returning error
func (am *DefaultAccountManager) newAccount(userID, domain string) (*Account, error) {
Expand Down
7 changes: 5 additions & 2 deletions management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil {
return status.Error(codes.Internal, "internal server error")
}
expired, left := peer.LoginExpired(account.Settings)
expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration)
expired = account.Settings.PeerLoginExpirationEnabled && expired
if peer.UserID != "" && (expired || peer.Status.LoginExpired) {
err = s.accountManager.MarkPeerLoginExpired(peerKey.String(), true)
if err != nil {
Expand Down Expand Up @@ -380,7 +381,9 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
if err != nil {
return nil, status.Error(codes.Internal, "internal server error")
}
expired, left := peer.LoginExpired(account.Settings)

expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration)
expired = account.Settings.PeerLoginExpirationEnabled && expired
if peer.UserID != "" && (expired || peer.Status.LoginExpired) {
// it might be that peer expired but user has logged in already, check token then
if loginReq.GetJwtToken() == "" {
Expand Down
39 changes: 28 additions & 11 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,24 @@ func (p *Peer) Copy() *Peer {
}
}

// MarkLoginExpired marks peer's status expired or not
func (p *Peer) MarkLoginExpired(expired bool) {
newStatus := p.Status.Copy()
newStatus.LastSeen = time.Now()
newStatus.LoginExpired = expired
p.Status = newStatus
}

// LoginExpired indicates whether the peer's login has expired or not.
// If Peer.LastLogin plus the expiresIn duration has happened already; then login has expired.
// Return true if a login has expired, false otherwise, and time left to expiration (negative when expired).
// Login expiration can be disabled/enabled on a Peer level via Peer.LoginExpirationEnabled property.
// Login expiration can also be disabled/enabled globally on the Account level via Settings.PeerLoginExpirationEnabled
// and if disabled on the Account level, then Peer.LoginExpirationEnabled is ineffective.
func (p *Peer) LoginExpired(accountSettings *Settings) (bool, time.Duration) {
expiresAt := p.LastLogin.Add(accountSettings.PeerLoginExpiration)
// Login expiration can also be disabled/enabled globally on the Account level via Settings.PeerLoginExpirationEnabled.
func (p *Peer) LoginExpired(expiresIn time.Duration) (bool, time.Duration) {
expiresAt := p.LastLogin.Add(expiresIn)
now := time.Now()
timeLeft := expiresAt.Sub(now)
return accountSettings.PeerLoginExpirationEnabled && p.LoginExpirationEnabled && (timeLeft <= 0), timeLeft
return p.LoginExpirationEnabled && (timeLeft <= 0), timeLeft
}

// FQDN returns peers FQDN combined of the peer's DNS label and the system's DNS domain
Expand Down Expand Up @@ -202,13 +209,10 @@ func (am *DefaultAccountManager) MarkPeerLoginExpired(peerPubKey string, loginEx
return err
}

newStatus := peer.Status.Copy()
newStatus.LastSeen = time.Now()
newStatus.LoginExpired = loginExpired
peer.Status = newStatus
peer.MarkLoginExpired(loginExpired)
account.UpdatePeer(peer)

err = am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus)
err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,6 +255,11 @@ func (am *DefaultAccountManager) MarkPeerConnected(peerPubKey string, connected
if err != nil {
return err
}

if peer.AddedWithSSOLogin() && peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled {
am.checkAndSchedulePeerLoginExpiration(account)
}

return nil
}

Expand Down Expand Up @@ -307,6 +316,10 @@ func (am *DefaultAccountManager) UpdatePeer(accountID, userID string, update *Pe
event = activity.PeerLoginExpirationDisabled
}
am.storeEvent(userID, peer.IP.String(), accountID, event, peer.EventMeta(am.GetDNSDomain()))

if peer.AddedWithSSOLogin() && peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled {
am.checkAndSchedulePeerLoginExpiration(account)
}
}

account.UpdatePeer(peer)
Expand Down Expand Up @@ -529,7 +542,7 @@ func (am *DefaultAccountManager) AddPeer(setupKey, userID string, peer *Peer) (*
SSHEnabled: false,
SSHKey: peer.SSHKey,
LastLogin: time.Now(),
LoginExpirationEnabled: false,
LoginExpirationEnabled: true,
}

// add peer to 'All' group
Expand Down Expand Up @@ -775,6 +788,10 @@ func (a *Account) getPeersByACL(peerID string) []*Peer {
)
continue
}
expired, _ := peer.LoginExpired(a.Settings.PeerLoginExpiration)
if expired {
continue
}
// exclude original peer
if _, ok := peersSet[peer.ID]; peer.ID != peerID && !ok {
peersSet[peer.ID] = struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion management/server/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestPeer_LoginExpired(t *testing.T) {
LastLogin: c.lastLogin,
}

expired, _ := peer.LoginExpired(c.accountSettings)
expired, _ := peer.LoginExpired(c.accountSettings.PeerLoginExpiration)
assert.Equal(t, expired, c.expected)
})
}
Expand Down
82 changes: 82 additions & 0 deletions management/server/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package server

import (
log "github.com/sirupsen/logrus"
"sync"
"time"
)

// Scheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them.
type Scheduler struct {
// jobs map holds cancellation channels indexed by the job ID
jobs map[string]chan struct{}
mu *sync.Mutex
}

// NewScheduler creates an instance of a Scheduler
func NewScheduler() *Scheduler {
return &Scheduler{
jobs: make(map[string]chan struct{}),
mu: &sync.Mutex{},
}
}

func (wm *Scheduler) cancel(ID string) bool {
cancel, ok := wm.jobs[ID]
if ok {
delete(wm.jobs, ID)
select {
case cancel <- struct{}{}:
log.Debugf("cancelled scheduled job %s", ID)
default:
log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID)
}

}
return ok
}

// Cancel cancels the scheduled job by ID if present.
// If job wasn't found the function returns false.
func (wm *Scheduler) Cancel(IDs []string) {
wm.mu.Lock()
defer wm.mu.Unlock()

for _, id := range IDs {
wm.cancel(id)
}
}

// Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time.
func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) {
wm.mu.Lock()
defer wm.mu.Unlock()
cancel := make(chan struct{})
if _, ok := wm.jobs[ID]; ok {
log.Debugf("couldn't schedule a job %s because it already exists. There are %d total jobs scheduled.",
ID, len(wm.jobs))
return
}

wm.jobs[ID] = cancel
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
go func() {
select {
case <-time.After(in):
log.Debugf("time to do a scheduled job %s", ID)
reschedule, runIn := job()
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
if reschedule {
go wm.Schedule(runIn, ID, job)
}
case <-cancel:
log.Debugf("stopped scheduled job %s ", ID)
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
return
}
}()
}
Loading