-
-
Notifications
You must be signed in to change notification settings - Fork 578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proactively expire peers' login per account #698
Changes from 8 commits
5a3993a
7176f73
153452f
36dc28e
c5f9ce5
6078c2b
44b3a85
51a756a
07a26cf
52cdccf
615ce0c
57775b0
68f8ab3
e1dbf51
da65f16
4f672c7
c42d051
694d7cd
0a95e06
f83b96e
586e6fc
894f778
963caff
0867bbe
8309992
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,7 +119,8 @@ type DefaultAccountManager struct { | |
// singleAccountModeDomain is a domain to use in singleAccountMode setup | ||
singleAccountModeDomain string | ||
// dnsDomain is used for peer resolution. This is appended to the peer's name | ||
dnsDomain string | ||
dnsDomain string | ||
peerLoginExpiry *Scheduler | ||
} | ||
|
||
// Settings represents Account settings structure that can be modified via API and Dashboard | ||
|
@@ -307,6 +308,53 @@ func (a *Account) GetGroup(groupID string) *Group { | |
return a.Groups[groupID] | ||
} | ||
|
||
// GetExpiredPeers returns peers tha have been expired | ||
func (a *Account) GetExpiredPeers() []*Peer { | ||
var peers []*Peer | ||
for _, peer := range a.GetPeersWithExpiration() { | ||
expired, _ := peer.LoginExpired(a.Settings.PeerLoginExpiration) | ||
if expired { | ||
peers = append(peers, peer) | ||
} | ||
} | ||
|
||
return peers | ||
} | ||
|
||
// GetNextPeerExpiration returns the minimum duration in which the next peer of the account will expire and true if it was found. | ||
// If there is no peer that expires this function returns false and a zero time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we return max value not 0 if no peer is found
|
||
// This function only considers peers that haven't been expired yet and connected. | ||
func (a *Account) GetNextPeerExpiration() (time.Duration, bool) { | ||
nextExpiry := time.Duration(1<<63 - 1) // max duration | ||
peersWithExpiry := a.GetPeersWithExpiration() | ||
if len(peersWithExpiry) == 0 { | ||
return nextExpiry, false | ||
} | ||
for _, peer := range peersWithExpiry { | ||
// consider only connected peers because others will require login on connecting to the management server | ||
if peer.Status.LoginExpired || !peer.Status.Connected { | ||
continue | ||
} | ||
_, duration := peer.LoginExpired(a.Settings.PeerLoginExpiration) | ||
if duration < nextExpiry { | ||
nextExpiry = duration | ||
} | ||
} | ||
|
||
return nextExpiry, nextExpiry != time.Duration(1<<63-1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe go with nil instead of max value for |
||
} | ||
|
||
// GetPeersWithExpiration returns a list of peers that have Peer.LoginExpirationEnabled set to true | ||
func (a *Account) GetPeersWithExpiration() []*Peer { | ||
var peers []*Peer | ||
for _, peer := range a.Peers { | ||
if peer.LoginExpirationEnabled { | ||
peers = append(peers, peer) | ||
} | ||
} | ||
return peers | ||
} | ||
|
||
// GetPeers returns a list of all Account peers | ||
func (a *Account) GetPeers() []*Peer { | ||
var peers []*Peer | ||
|
@@ -550,13 +598,14 @@ func BuildManager(store Store, peersUpdateManager *PeersUpdateManager, idpManage | |
cacheLoading: map[string]chan struct{}{}, | ||
dnsDomain: dnsDomain, | ||
eventStore: eventStore, | ||
peerLoginExpiry: NewScheduler(), | ||
} | ||
allAccounts := store.GetAllAccounts() | ||
// enable single account mode only if configured by user and number of existing accounts is not grater than 1 | ||
am.singleAccountMode = singleAccountModeDomain != "" && len(allAccounts) <= 1 | ||
if am.singleAccountMode { | ||
if !isDomainValid(singleAccountModeDomain) { | ||
return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for single accound mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain) | ||
return nil, status.Errorf(status.InvalidArgument, "invalid domain \"%s\" provided for a single account mode. Please review your input for --single-account-mode-domain", singleAccountModeDomain) | ||
} | ||
am.singleAccountModeDomain = singleAccountModeDomain | ||
log.Infof("single account mode enabled, accounts number %d", len(allAccounts)) | ||
|
@@ -640,12 +689,16 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, | |
event := activity.AccountPeerLoginExpirationEnabled | ||
if !newSettings.PeerLoginExpirationEnabled { | ||
event = activity.AccountPeerLoginExpirationDisabled | ||
am.peerLoginExpiry.Cancel([]string{accountID}) | ||
} else { | ||
am.checkAndSchedulePeerLoginExpiration(account) | ||
} | ||
am.storeEvent(userID, accountID, accountID, event, nil) | ||
} | ||
|
||
if oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration { | ||
am.storeEvent(userID, accountID, accountID, activity.AccountPeerLoginExpirationDurationUpdated, nil) | ||
am.checkAndSchedulePeerLoginExpiration(account) | ||
} | ||
|
||
updatedAccount := account.UpdateSettings(newSettings) | ||
|
@@ -658,6 +711,55 @@ func (am *DefaultAccountManager) UpdateAccountSettings(accountID, userID string, | |
return updatedAccount, nil | ||
} | ||
|
||
func (am *DefaultAccountManager) peerLoginExpirationJob(accountID string) func() (bool, time.Duration) { | ||
return func() (bool, time.Duration) { | ||
unlock := am.Store.AcquireAccountLock(accountID) | ||
defer unlock() | ||
|
||
account, err := am.Store.GetAccount(accountID) | ||
if err != nil { | ||
log.Errorf("failed getting account %s expiring peers", account.Id) | ||
// todo return retry? | ||
return false, 0 | ||
} | ||
|
||
var peerIDs []string | ||
for _, peer := range account.GetExpiredPeers() { | ||
peerIDs = append(peerIDs, peer.ID) | ||
peer.MarkLoginExpired(true) | ||
account.UpdatePeer(peer) | ||
err = am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status) | ||
if err != nil { | ||
log.Errorf("failed saving peer status while expiring peer %s", peer.ID) | ||
// todo return retry? | ||
return false, 0 | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure on this one but I think we do way to much computing overhead here.
I would add additional check for status changes at the top of the loop:
|
||
|
||
log.Debugf("discovered %d peers to expire for account %s", len(peerIDs), account.Id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log message should also show wrong number of peers. Not newly expired but total expired in account |
||
|
||
if len(peerIDs) != 0 { | ||
// this will trigger peer disconnect from the management service | ||
am.peersUpdateManager.CloseChannels(peerIDs) | ||
err := am.updateAccountPeers(account) | ||
if err != nil { | ||
log.Errorf("failed updating account peers while expiring peers for account %s", accountID) | ||
return false, 0 | ||
} | ||
} | ||
|
||
nextExpiration, shouldRun := account.GetNextPeerExpiration() | ||
return shouldRun, nextExpiration | ||
} | ||
} | ||
|
||
func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(account *Account) { | ||
am.peerLoginExpiry.Cancel([]string{account.Id}) | ||
if expiryNextRunIn, ok := account.GetNextPeerExpiration(); ok { | ||
go am.peerLoginExpiry.Schedule(expiryNextRunIn, account.Id, am.peerLoginExpirationJob(account.Id)) | ||
} | ||
} | ||
|
||
// newAccount creates a new Account with a generated ID and generated default setup keys. | ||
// If ID is already in use (due to collision) we try one more time before returning error | ||
func (am *DefaultAccountManager) newAccount(userID, domain string) (*Account, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package server | ||
|
||
import ( | ||
log "github.com/sirupsen/logrus" | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Scheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them. | ||
type Scheduler struct { | ||
// jobs map holds cancellation channels indexed by the job ID | ||
jobs map[string]chan struct{} | ||
mu *sync.Mutex | ||
} | ||
|
||
// NewScheduler creates an instance of a Scheduler | ||
func NewScheduler() *Scheduler { | ||
return &Scheduler{ | ||
jobs: make(map[string]chan struct{}), | ||
mu: &sync.Mutex{}, | ||
} | ||
} | ||
|
||
func (wm *Scheduler) cancel(ID string) bool { | ||
cancel, ok := wm.jobs[ID] | ||
if ok { | ||
delete(wm.jobs, ID) | ||
select { | ||
case cancel <- struct{}{}: | ||
log.Debugf("cancelled scheduled job %s", ID) | ||
default: | ||
log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) | ||
} | ||
|
||
} | ||
return ok | ||
} | ||
|
||
// Cancel cancels the scheduled job by ID if present. | ||
// If job wasn't found the function returns false. | ||
func (wm *Scheduler) Cancel(IDs []string) { | ||
wm.mu.Lock() | ||
defer wm.mu.Unlock() | ||
|
||
for _, id := range IDs { | ||
wm.cancel(id) | ||
} | ||
} | ||
|
||
// Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time. | ||
func (wm *Scheduler) Schedule(in time.Duration, ID string, job func() (reschedule bool, nextRunIn time.Duration)) { | ||
wm.mu.Lock() | ||
defer wm.mu.Unlock() | ||
cancel := make(chan struct{}) | ||
if _, ok := wm.jobs[ID]; ok { | ||
log.Debugf("couldn't schedule a job %s because it already exists. There are %d total jobs scheduled.", | ||
ID, len(wm.jobs)) | ||
return | ||
} | ||
|
||
wm.jobs[ID] = cancel | ||
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs)) | ||
go func() { | ||
select { | ||
case <-time.After(in): | ||
log.Debugf("time to do a scheduled job %s", ID) | ||
reschedule, runIn := job() | ||
wm.mu.Lock() | ||
defer wm.mu.Unlock() | ||
delete(wm.jobs, ID) | ||
if reschedule { | ||
go wm.Schedule(runIn, ID, job) | ||
} | ||
case <-cancel: | ||
log.Debugf("stopped scheduled job %s ", ID) | ||
wm.mu.Lock() | ||
defer wm.mu.Unlock() | ||
delete(wm.jobs, ID) | ||
return | ||
} | ||
}() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a 't' in that