diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 9329d96d5..c588736ef 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "net" "net/http" "net/url" @@ -1081,42 +1082,94 @@ func tryWaitForSystemdActiveState(units []string, maxAttempts int) (err error) { return nil } - for _, uName := range units { - err := waitForSystemdActiveState(uName) - if err != nil { - stderr("cannot find an active unit: %v", err) - return err - } + errchan := waitForSystemdActiveState(units, maxAttempts) + for err := range errchan { + stderr("Error waiting for units: %v", err) + return err } return nil } // waitForSystemdActiveState tries to assert that the given unit becomes -// active, retrying periodically until the unit gets active. It will retry -// up to 15 seconds. -func waitForSystemdActiveState(unitName string) (err error) { - return func() error { - timeout := 15 * time.Second - alarm := time.After(timeout) - ticker := time.Tick(250 * time.Millisecond) - for { - select { - case <-alarm: - return fmt.Errorf("Failed to reach expected state within %v.", timeout) - case <-ticker: - if errA := assertSystemdActiveState(unitName); errA == nil { - return nil - } - } - } +// active, making use of multiple goroutines that check unit states. +func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan error) { + apiStates, err := cAPI.UnitStates() + if err != nil { + errch <- fmt.Errorf("Error retrieving list of units: %v", err) + return + } + + errchan := make(chan error) + var wg sync.WaitGroup + for _, name := range units { + wg.Add(1) + go checkSystemdActiveState(apiStates, name, maxAttempts, &wg, errchan) + } + + go func() { + wg.Wait() + close(errchan) }() + + return errchan +} + +func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) { + defer wg.Done() + + // "isInf == true" means "blocking forever until it succeeded". + // In that case, maxAttempts is set to an arbitrary large integer number. + var isInf bool + if maxAttempts < 1 { + isInf = true + maxAttempts = math.MaxInt32 + } + + for attempt := 0; attempt < maxAttempts; attempt++ { + if err := assertFetchSystemdActiveState(apiStates, name); err == nil { + return + } else { + errchan <- err + } + + if !isInf { + errchan <- fmt.Errorf("timed out waiting for unit %s to report active state", name) + } + } +} + +func assertFetchSystemdActiveState(apiStates []*schema.UnitState, name string) error { + if err := assertSystemdActiveState(apiStates, name); err == nil { + return nil + } + + // If the assertion failed, we again need to get unit states via cAPI, + // to retry the assertion repeatedly. + // + // NOTE: Ideally we should be able to fetch the state only for a single + // unit. However, we cannot do that for now, because cAPI.UnitState() + // is not available. In the future we need to implement cAPI.UnitState() + // and all dependendent parts all over the tree in fleet, (schema, + // etcdRegistry, rpcRegistry, etc) to replace UnitStates() in this place + // with the new method UnitState(). In practice, calling UnitStates() here + // is not as badly inefficient as it looks, because it will be anyway + // rarely called only when the assertion failed. - dpark 20160907 + + time.Sleep(defaultSleepTime) + + var errU error + apiStates, errU = cAPI.UnitStates() + if errU != nil { + return fmt.Errorf("Error retrieving list of units: %v", errU) + } + return nil } // assertSystemdActiveState determines if a given systemd unit is actually // in the active state, making use of cAPI -func assertSystemdActiveState(unitName string) (err error) { - uState, err := getSingleUnitState(unitName) +func assertSystemdActiveState(apiStates []*schema.UnitState, unitName string) error { + uState, err := getSingleUnitState(apiStates, unitName) if err != nil { return err } @@ -1131,12 +1184,7 @@ func assertSystemdActiveState(unitName string) (err error) { // getSingleUnitState returns a single uState of type suState, which consists // of necessary systemd states, only for a given unit name. -func getSingleUnitState(unitName string) (suState, error) { - apiStates, err := cAPI.UnitStates() - if err != nil { - return suState{}, fmt.Errorf("Error retrieving list of units: %v", err) - } - +func getSingleUnitState(apiStates []*schema.UnitState, unitName string) (suState, error) { for _, us := range apiStates { if us.Name == unitName { return suState{