From 413f6e336b94750556a642798f666080673a35a1 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Mon, 19 Sep 2016 14:19:53 +0200 Subject: [PATCH] fleetctl: call cAPI.UnitStates() only once before checking unit states As it's basically not efficient to call cAPI.UnitStates() for each unit, let's call it only once outside goroutines for processing units, i.e. calling in waitForSystemdActiveState(), not in getSingleUnitState(). To do that, we need to share the result apiStates across all units. Also make use of goroutines in waitForSystemdActiveState(), instead of sequential iteration over each unit. Note that we still need to keep calling UnitStates() when assertSystemdActiveState() failed, because in the next attempt the old apiState is not necessarily going to be valid. Ideally in that case, we should be able to fetch the state only for a single unit. However, we cannot do that for now, because cAPI.UnitState() is not available. In the future we would need to implement cAPI.UnitState() and all dependendent parts all over the tree in fleet, e.g. schema, etcdRegistry, rpcRegistry, etc. to replace UnitStates() in this place with the new method UnitState(). In practice, calling UnitStates() here is not as badly inefficient as it looks, because it will be anyway rarely called only when the assertion failed. --- fleetctl/fleetctl.go | 110 +++++++++++++++++++++++++++++++------------ 1 file changed, 79 insertions(+), 31 deletions(-) diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 9329d96d5..c588736ef 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "net" "net/http" "net/url" @@ -1081,42 +1082,94 @@ func tryWaitForSystemdActiveState(units []string, maxAttempts int) (err error) { return nil } - for _, uName := range units { - err := waitForSystemdActiveState(uName) - if err != nil { - stderr("cannot find an active unit: %v", err) - return err - } + errchan := waitForSystemdActiveState(units, maxAttempts) + for err := range errchan { + stderr("Error waiting for units: %v", err) + return err } return nil } // waitForSystemdActiveState tries to assert that the given unit becomes -// active, retrying periodically until the unit gets active. It will retry -// up to 15 seconds. -func waitForSystemdActiveState(unitName string) (err error) { - return func() error { - timeout := 15 * time.Second - alarm := time.After(timeout) - ticker := time.Tick(250 * time.Millisecond) - for { - select { - case <-alarm: - return fmt.Errorf("Failed to reach expected state within %v.", timeout) - case <-ticker: - if errA := assertSystemdActiveState(unitName); errA == nil { - return nil - } - } - } +// active, making use of multiple goroutines that check unit states. +func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan error) { + apiStates, err := cAPI.UnitStates() + if err != nil { + errch <- fmt.Errorf("Error retrieving list of units: %v", err) + return + } + + errchan := make(chan error) + var wg sync.WaitGroup + for _, name := range units { + wg.Add(1) + go checkSystemdActiveState(apiStates, name, maxAttempts, &wg, errchan) + } + + go func() { + wg.Wait() + close(errchan) }() + + return errchan +} + +func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) { + defer wg.Done() + + // "isInf == true" means "blocking forever until it succeeded". + // In that case, maxAttempts is set to an arbitrary large integer number. + var isInf bool + if maxAttempts < 1 { + isInf = true + maxAttempts = math.MaxInt32 + } + + for attempt := 0; attempt < maxAttempts; attempt++ { + if err := assertFetchSystemdActiveState(apiStates, name); err == nil { + return + } else { + errchan <- err + } + + if !isInf { + errchan <- fmt.Errorf("timed out waiting for unit %s to report active state", name) + } + } +} + +func assertFetchSystemdActiveState(apiStates []*schema.UnitState, name string) error { + if err := assertSystemdActiveState(apiStates, name); err == nil { + return nil + } + + // If the assertion failed, we again need to get unit states via cAPI, + // to retry the assertion repeatedly. + // + // NOTE: Ideally we should be able to fetch the state only for a single + // unit. However, we cannot do that for now, because cAPI.UnitState() + // is not available. In the future we need to implement cAPI.UnitState() + // and all dependendent parts all over the tree in fleet, (schema, + // etcdRegistry, rpcRegistry, etc) to replace UnitStates() in this place + // with the new method UnitState(). In practice, calling UnitStates() here + // is not as badly inefficient as it looks, because it will be anyway + // rarely called only when the assertion failed. - dpark 20160907 + + time.Sleep(defaultSleepTime) + + var errU error + apiStates, errU = cAPI.UnitStates() + if errU != nil { + return fmt.Errorf("Error retrieving list of units: %v", errU) + } + return nil } // assertSystemdActiveState determines if a given systemd unit is actually // in the active state, making use of cAPI -func assertSystemdActiveState(unitName string) (err error) { - uState, err := getSingleUnitState(unitName) +func assertSystemdActiveState(apiStates []*schema.UnitState, unitName string) error { + uState, err := getSingleUnitState(apiStates, unitName) if err != nil { return err } @@ -1131,12 +1184,7 @@ func assertSystemdActiveState(unitName string) (err error) { // getSingleUnitState returns a single uState of type suState, which consists // of necessary systemd states, only for a given unit name. -func getSingleUnitState(unitName string) (suState, error) { - apiStates, err := cAPI.UnitStates() - if err != nil { - return suState{}, fmt.Errorf("Error retrieving list of units: %v", err) - } - +func getSingleUnitState(apiStates []*schema.UnitState, unitName string) (suState, error) { for _, us := range apiStates { if us.Name == unitName { return suState{