Skip to content

Commit

Permalink
Fix counters on provisiond restart. also add counter for ipv4 (#1254)
Browse files Browse the repository at this point in the history
* Fix counters

* add docstr
  • Loading branch information
muhamadazmy authored Apr 27, 2021
1 parent 5a87122 commit ae9938b
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 68 deletions.
19 changes: 19 additions & 0 deletions cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/threefoldtech/zos/pkg/app"
"github.com/threefoldtech/zos/pkg/capacity"
"github.com/threefoldtech/zos/pkg/environment"
"github.com/threefoldtech/zos/pkg/gridtypes"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/primitives"
"github.com/threefoldtech/zos/pkg/provision/api"
Expand Down Expand Up @@ -151,10 +152,26 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to get node capacity")
}

var current gridtypes.Capacity
if !app.IsFirstBoot(module) {
// if this is the first boot of this module.
// it means the provision engine will still
// rerun all deployments, which means we don't need
// to set the current consumed capacity from store
// since the counters will get populated anyway.
// but if not, we need to set the current counters
// from store.
current, err = store.Capacity()
if err != nil {
log.Error().Err(err).Msg("failed to compute current consumed capacity")
}
}

// statistics collects information about workload statistics
// also does some checks on capacity
statistics := primitives.NewStatistics(
cap,
current,
reserved,
nodeID.Identity(),
provisioners,
Expand Down Expand Up @@ -198,6 +215,8 @@ func action(cli *cli.Context) error {
zos.NetworkType,
zos.PublicIPType,
),
// if this is a node reboot, the node needs to
// recreate all reservations. so we set rerun = true
provision.WithRerunAll(app.IsFirstBoot(module)),
)

Expand Down
1 change: 0 additions & 1 deletion cmds/modules/provisiond/swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type fsWithPrefix struct {

func (f *fsWithPrefix) Open(name string) (fs.File, error) {
newName := filepath.Join(f.prefix, name)
log.Debug().Str("file", name).Str("re-write-name", newName).Msg("requesting file")
file, err := f.FS.Open(newName)
if err != nil {
log.Error().Err(err).Msg("failed to open file")
Expand Down
45 changes: 17 additions & 28 deletions pkg/primitives/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ func (c *AtomicValue) Current() uint64 {
type Counters struct {
//types map[gridtypes.WorkloadType]AtomicValue

SRU AtomicValue // SSD storage in bytes
HRU AtomicValue // HDD storage in bytes
MRU AtomicValue // Memory storage in bytes
CRU AtomicValue // CPU count absolute
SRU AtomicValue // SSD storage in bytes
HRU AtomicValue // HDD storage in bytes
MRU AtomicValue // Memory storage in bytes
CRU AtomicValue // CPU count absolute
IPv4 AtomicValue // IPv4 count absolute
}

const (
Expand All @@ -56,31 +57,19 @@ const (
)

// Increment is called by the provision.Engine when a reservation has been provisionned
func (c *Counters) Increment(r *gridtypes.Workload) error {
u, err := r.Capacity()
if err != nil {
return err
}

c.CRU.Increment(u.CRU)
c.MRU.Increment(u.MRU)
c.SRU.Increment(u.SRU)
c.HRU.Increment(u.HRU)

return nil
func (c *Counters) Increment(cap gridtypes.Capacity) {
c.CRU.Increment(cap.CRU)
c.MRU.Increment(cap.MRU)
c.SRU.Increment(cap.SRU)
c.HRU.Increment(cap.HRU)
c.IPv4.Increment(cap.IPV4U)
}

// Decrement is called by the provision.Engine when a reservation has been decommissioned
func (c *Counters) Decrement(r *gridtypes.Workload) error {
u, err := r.Capacity()
if err != nil {
return err
}

c.CRU.Decrement(u.CRU)
c.MRU.Decrement(u.MRU)
c.SRU.Decrement(u.SRU)
c.HRU.Decrement(u.HRU)

return nil
func (c *Counters) Decrement(cap gridtypes.Capacity) {
c.CRU.Decrement(cap.CRU)
c.MRU.Decrement(cap.MRU)
c.SRU.Decrement(cap.SRU)
c.HRU.Decrement(cap.HRU)
c.IPv4.Decrement(cap.IPV4U)
}
62 changes: 24 additions & 38 deletions pkg/primitives/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,34 @@ type Statistics struct {

// NewStatistics creates a new statistics provisioner interceptor.
// Statistics provisioner keeps track of used capacity and update explorer when it changes
func NewStatistics(total gridtypes.Capacity, reserved Counters, nodeID string, inner provision.Provisioner) *Statistics {
func NewStatistics(total, initial gridtypes.Capacity, reserved Counters, nodeID string, inner provision.Provisioner) *Statistics {
vm, err := mem.VirtualMemory()
if err != nil {
panic(err)
}

log.Debug().Msgf("initial used capacity %+v", initial)
var counters Counters
counters.Increment(initial)
ram := math.Ceil(float64(vm.Total) / (1024 * 1024 * 1024))
return &Statistics{inner: inner, total: total, reserved: reserved, nodeID: nodeID, mem: uint64(ram)}
return &Statistics{
inner: inner,
total: total,
counters: counters,
reserved: reserved,
mem: uint64(ram),
nodeID: nodeID,
}
}

// Current returns the current used capacity
func (s *Statistics) Current() gridtypes.Capacity {
return gridtypes.Capacity{
CRU: s.counters.CRU.Current() + s.reserved.CRU.Current(),
MRU: s.counters.MRU.Current() + s.reserved.MRU.Current(),
HRU: s.counters.HRU.Current() + s.reserved.HRU.Current(),
SRU: s.counters.SRU.Current() + s.reserved.SRU.Current(),
//IPV4U: s.counters.CRU.Current() + s.reserved.SRU.Current(),
CRU: s.counters.CRU.Current() + s.reserved.CRU.Current(),
MRU: s.counters.MRU.Current() + s.reserved.MRU.Current(),
HRU: s.counters.HRU.Current() + s.reserved.HRU.Current(),
SRU: s.counters.SRU.Current() + s.reserved.SRU.Current(),
IPV4U: s.counters.IPv4.Current(),
}
}

Expand Down Expand Up @@ -100,9 +110,8 @@ func (s *Statistics) Provision(ctx context.Context, wl *gridtypes.WorkloadWithID
}

if result.State == gridtypes.StateOk {
if err := s.counters.Increment(wl.Workload); err != nil {
log.Error().Err(err).Msg("failed to increment statistics counter")
}
log.Debug().Str("type", wl.Type.String()).Str("id", wl.ID.String()).Msgf("incrmenting capacity +%+v", needed)
s.counters.Increment(needed)
}

return result, nil
Expand All @@ -113,11 +122,14 @@ func (s *Statistics) Decommission(ctx context.Context, wl *gridtypes.WorkloadWit
if err := s.inner.Decommission(ctx, wl); err != nil {
return err
}

if err := s.counters.Decrement(wl.Workload); err != nil {
cap, err := wl.Capacity()
if err != nil {
log.Error().Err(err).Msg("failed to decrement statistics counter")
return nil
}

s.counters.Decrement(cap)

return nil
}

Expand Down Expand Up @@ -156,29 +168,3 @@ func (s *statisticsAPI) getCounters(r *http.Request) (interface{}, mw.Response)
Used: s.stats.Current(),
}, nil
}

// func (s *statsProvisioner) shouldUpdateCounters(ctx context.Context, wl *gridtypes.Workload) (bool, error) {
// // rule, we always should update counters UNLESS it is a network reservation that
// // already have been counted before.
// if wl.Type != zos.NetworkType {
// return true, nil
// }

// var nr zos.Network
// if err := json.Unmarshal(wl.Data, &nr); err != nil {
// return false, fmt.Errorf("failed to unmarshal network from reservation: %w", err)
// }
// // otherwise we check the cache if a network
// // with the same id already exists
// id := zos.NetworkID(wl.User.String(), nr.Name)
// cache := provision.GetEngine(ctx).Storage()

// _, err := cache.GetNetwork(id)
// if errors.Is(err, provision.ErrWorkloadNotExists) {
// return true, nil
// } else if err != nil {
// return false, err
// }

// return false, nil
// }
58 changes: 57 additions & 1 deletion pkg/provision/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (s *Fs) Get(twin, deployment uint32) (gridtypes.Deployment, error) {

// ByTwin return list of deployment for given twin id
func (s *Fs) ByTwin(twin uint32) ([]uint32, error) {
s.m.RLock()
defer s.m.RUnlock()
return s.byTwin(twin)
}

func (s *Fs) byTwin(twin uint32) ([]uint32, error) {
base := filepath.Join(s.root, fmt.Sprint(twin))

entities, err := ioutil.ReadDir(base)
Expand Down Expand Up @@ -183,6 +189,13 @@ func (s *Fs) ByTwin(twin uint32) ([]uint32, error) {

// Twins lists available users
func (s *Fs) Twins() ([]uint32, error) {
s.m.RLock()
defer s.m.RUnlock()

return s.twins()
}

func (s *Fs) twins() ([]uint32, error) {
entities, err := ioutil.ReadDir(s.root)
if os.IsNotExist(err) {
return nil, nil
Expand All @@ -197,7 +210,8 @@ func (s *Fs) Twins() ([]uint32, error) {

id, err := strconv.ParseUint(entry.Name(), 10, 32)
if err != nil {
log.Error().Str("name", entry.Name()).Err(err).Msg("invalid twin id directory")
log.Error().Str("name", entry.Name()).Err(err).Msg("invalid twin id directory, removing")
os.RemoveAll(filepath.Join(s.root, entry.Name()))
continue
}

Expand All @@ -207,6 +221,48 @@ func (s *Fs) Twins() ([]uint32, error) {
return ids, nil
}

// Capacity returns the total capacity of all deployments
// that are in OK state.
func (s *Fs) Capacity() (cap gridtypes.Capacity, err error) {
s.m.RLock()
defer s.m.RUnlock()

twins, err := s.twins()
if err != nil {
return cap, err
}

for _, twin := range twins {
ids, err := s.byTwin(twin)
if err != nil {
return cap, err
}

for _, id := range ids {
p := s.rooted(fmt.Sprint(twin), fmt.Sprint(id))
deployment, err := s.get(p)
if err != nil {
return cap, err
}

for _, wl := range deployment.Workloads {
if wl.Result.State != gridtypes.StateOk {
continue
}

c, err := wl.Capacity()
if err != nil {
return cap, err
}

cap.Add(&c)
}
}
}

return
}

// Close makes sure the backend of the store is closed properly
func (s *Fs) Close() error {
return nil
Expand Down

0 comments on commit ae9938b

Please sign in to comment.