Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
fleetctl: call cAPI.UnitStates() only once before checking unit states
Browse files Browse the repository at this point in the history
As it's basically not efficient to call cAPI.UnitStates() for each unit,
let's call it only once outside goroutines for processing units, i.e.
calling in waitForSystemdActiveState(), not in getSingleUnitState().
To do that, we need to share the result apiStates across all units.

Also make use of goroutines in waitForSystemdActiveState(), instead of
sequential iteration over each unit.

Note that we still need to keep calling UnitStates() when
assertSystemdActiveState() failed, because in the next attempt the old
apiState is not necessarily going to be valid. Ideally in that case, we
should be able to fetch the state only for a single unit. However, we
cannot do that for now, because cAPI.UnitState() is not available. In
the future we would need to implement cAPI.UnitState() and all
dependendent parts all over the tree in fleet, e.g. schema, etcdRegistry,
rpcRegistry, etc. to replace UnitStates() in this place with the new
method UnitState(). In practice, calling UnitStates() here is not as
badly inefficient as it looks, because it will be anyway rarely called
only when the assertion failed.
  • Loading branch information
Dongsu Park committed Sep 19, 2016
1 parent cc2837f commit 413f6e3
Showing 1 changed file with 79 additions and 31 deletions.
110 changes: 79 additions & 31 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -1081,42 +1082,94 @@ func tryWaitForSystemdActiveState(units []string, maxAttempts int) (err error) {
return nil
}

for _, uName := range units {
err := waitForSystemdActiveState(uName)
if err != nil {
stderr("cannot find an active unit: %v", err)
return err
}
errchan := waitForSystemdActiveState(units, maxAttempts)
for err := range errchan {
stderr("Error waiting for units: %v", err)
return err
}

return nil
}

// waitForSystemdActiveState tries to assert that the given unit becomes
// active, retrying periodically until the unit gets active. It will retry
// up to 15 seconds.
func waitForSystemdActiveState(unitName string) (err error) {
return func() error {
timeout := 15 * time.Second
alarm := time.After(timeout)
ticker := time.Tick(250 * time.Millisecond)
for {
select {
case <-alarm:
return fmt.Errorf("Failed to reach expected state within %v.", timeout)
case <-ticker:
if errA := assertSystemdActiveState(unitName); errA == nil {
return nil
}
}
}
// active, making use of multiple goroutines that check unit states.
func waitForSystemdActiveState(units []string, maxAttempts int) (errch chan error) {
apiStates, err := cAPI.UnitStates()
if err != nil {
errch <- fmt.Errorf("Error retrieving list of units: %v", err)
return
}

errchan := make(chan error)
var wg sync.WaitGroup
for _, name := range units {
wg.Add(1)
go checkSystemdActiveState(apiStates, name, maxAttempts, &wg, errchan)
}

go func() {
wg.Wait()
close(errchan)
}()

return errchan
}

func checkSystemdActiveState(apiStates []*schema.UnitState, name string, maxAttempts int, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

// "isInf == true" means "blocking forever until it succeeded".
// In that case, maxAttempts is set to an arbitrary large integer number.
var isInf bool
if maxAttempts < 1 {
isInf = true
maxAttempts = math.MaxInt32
}

for attempt := 0; attempt < maxAttempts; attempt++ {
if err := assertFetchSystemdActiveState(apiStates, name); err == nil {
return
} else {
errchan <- err
}

if !isInf {
errchan <- fmt.Errorf("timed out waiting for unit %s to report active state", name)
}
}
}

func assertFetchSystemdActiveState(apiStates []*schema.UnitState, name string) error {
if err := assertSystemdActiveState(apiStates, name); err == nil {
return nil
}

// If the assertion failed, we again need to get unit states via cAPI,
// to retry the assertion repeatedly.
//
// NOTE: Ideally we should be able to fetch the state only for a single
// unit. However, we cannot do that for now, because cAPI.UnitState()
// is not available. In the future we need to implement cAPI.UnitState()
// and all dependendent parts all over the tree in fleet, (schema,
// etcdRegistry, rpcRegistry, etc) to replace UnitStates() in this place
// with the new method UnitState(). In practice, calling UnitStates() here
// is not as badly inefficient as it looks, because it will be anyway
// rarely called only when the assertion failed. - dpark 20160907

time.Sleep(defaultSleepTime)

var errU error
apiStates, errU = cAPI.UnitStates()
if errU != nil {
return fmt.Errorf("Error retrieving list of units: %v", errU)
}
return nil
}

// assertSystemdActiveState determines if a given systemd unit is actually
// in the active state, making use of cAPI
func assertSystemdActiveState(unitName string) (err error) {
uState, err := getSingleUnitState(unitName)
func assertSystemdActiveState(apiStates []*schema.UnitState, unitName string) error {
uState, err := getSingleUnitState(apiStates, unitName)
if err != nil {
return err
}
Expand All @@ -1131,12 +1184,7 @@ func assertSystemdActiveState(unitName string) (err error) {

// getSingleUnitState returns a single uState of type suState, which consists
// of necessary systemd states, only for a given unit name.
func getSingleUnitState(unitName string) (suState, error) {
apiStates, err := cAPI.UnitStates()
if err != nil {
return suState{}, fmt.Errorf("Error retrieving list of units: %v", err)
}

func getSingleUnitState(apiStates []*schema.UnitState, unitName string) (suState, error) {
for _, us := range apiStates {
if us.Name == unitName {
return suState{
Expand Down

0 comments on commit 413f6e3

Please sign in to comment.