Skip to content

Commit

Permalink
Network usage report (#1815)
Browse files Browse the repository at this point in the history
* WIP: create a link from workload-id to network-id

This will make it easy to find which nework reousrce belongs to which
contract

* build metrics collection of public interface of an nr

Collect data on network reosurce for each NR (best effort)
and report to chain

* delete deadcode
  • Loading branch information
muhamadazmy authored Oct 24, 2022
1 parent d678e48 commit bb13921
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 88 deletions.
52 changes: 4 additions & 48 deletions cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package provisiond
import (
"context"
"crypto/ed25519"
"fmt"
"os"
"path/filepath"
"sort"
"time"

"github.com/cenkalti/backoff/v3"
Expand Down Expand Up @@ -205,6 +203,10 @@ func action(cli *cli.Context) error {
}
}

if err := netResourceMigration(active); err != nil {
log.Error().Err(err).Msg("failed to migrate network resources")
}

log.Debug().Msgf("current used capacity: %+v", current)
// statistics collects information about workload statistics
// also does some checks on capacity
Expand Down Expand Up @@ -419,49 +421,3 @@ func getNodeReserved(cl zbus.Client, available gridtypes.Capacity) (counter prim

return
}

func storageMigration(db *storage.BoltStorage, fs *fsStorage.Fs) error {
log.Info().Msg("starting storage migration")
twins, err := fs.Twins()
if err != nil {
return err
}
migration := db.Migration()
errorred := false
for _, twin := range twins {
dls, err := fs.ByTwin(twin)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Msg("failed to list twin deployments")
continue
}

sort.Slice(dls, func(i, j int) bool {
return dls[i] < dls[j]
})

for _, dl := range dls {
log.Info().Uint32("twin", twin).Uint64("deployment", dl).Msg("processing deployment migration")
deployment, err := fs.Get(twin, dl)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to get deployment")
errorred = true
continue
}
if err := migration.Migrate(deployment); err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to migrate deployment")
errorred = true
continue
}
if err := fs.Delete(deployment); err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to delete migrated deployment")
continue
}
}
}

if errorred {
return fmt.Errorf("not all deployments where migrated")
}

return nil
}
132 changes: 132 additions & 0 deletions cmds/modules/provisiond/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package provisiond

import (
"fmt"
"os"
"path/filepath"
"sort"

"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg/gridtypes"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/provision/storage"
fsStorage "github.com/threefoldtech/zos/pkg/provision/storage.fs"
)

func storageMigration(db *storage.BoltStorage, fs *fsStorage.Fs) error {
log.Info().Msg("starting storage migration")
twins, err := fs.Twins()
if err != nil {
return err
}
migration := db.Migration()
errorred := false
for _, twin := range twins {
dls, err := fs.ByTwin(twin)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Msg("failed to list twin deployments")
continue
}

sort.Slice(dls, func(i, j int) bool {
return dls[i] < dls[j]
})

for _, dl := range dls {
log.Info().Uint32("twin", twin).Uint64("deployment", dl).Msg("processing deployment migration")
deployment, err := fs.Get(twin, dl)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to get deployment")
errorred = true
continue
}
if err := migration.Migrate(deployment); err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to migrate deployment")
errorred = true
continue
}
if err := fs.Delete(deployment); err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dl).Msg("failed to delete migrated deployment")
continue
}
}
}

if errorred {
return fmt.Errorf("not all deployments where migrated")
}

return nil
}

func netResourceMigration(active []gridtypes.Deployment) error {
/*
because of limit on the net devices names (length mainly) it was always needed to
name the devices with unique name that is derived from the actual user twin/deployment and network workload name
hence the zos.NetworkID function which takes into account all required inputs to make a unique network id.
The problem now it's impossible for the system to map back network resources names to a unique reservation.
a bridge br-27xVrq9bva3vJ or a namespace n-27xVrq9bva3vJ means nothing and you can't tell which user owns this.
Since networkd stores the network object anyway on disk (under /var/run/cache/networkd/networks) it's then possible to update those objects
to also contain the workload full id not only the network id.
The first way to do this is to update all cached files on this volatile storage, update the file version and add the extra missing filed. but this
requires changes in multiple places (define a new type and track the version of the file). make sure the correct types are used and possibly support
of multiple versions of the structure in multiple places.
The other "easier" approach is simply creating a symlink from the ID to the correct network file. this means only entities that need to find the network
by it's full workload name can use the link to find the NR bridge and namespace.
Networkd will be modified to always create the symlink (if not exist already) to the persisted NR file. But for already created networks, it's not possible to
create them from within networkd because it does not know this information.
Hence this migration code witch will go over all active deployments on start, and create the missing symlinks. Newer networks objects will be created with
their proper symlinks by networkd.
*/

const volatile = "/var/run/cache/networkd/networks"
_, err := os.Stat(volatile)
if os.IsNotExist(err) {
// if this doesn't exist it means it's probably first start (after boot) and hence networkd
// will be called to create all NRs hence it will create the symlink and nothing we need to
// do now.
return nil
} else if err != nil {
return errors.Wrap(err, "failed to check networkd volatile cache")
}

sym := filepath.Join(volatile, "link")
if err := os.MkdirAll(sym, 0755); err != nil {
return errors.Wrap(err, "failed to create network link directory")
}

for _, dl := range active {
for _, wl := range dl.Workloads {
if wl.Type != zos.NetworkType ||
!wl.Result.State.IsOkay() {
continue
}
id, err := gridtypes.NewWorkloadID(dl.TwinID, dl.ContractID, wl.Name)
if err != nil {
log.Error().Err(err).Msg("failed to build network workload id")
continue
}

netId := zos.NetworkID(dl.TwinID, wl.Name)
if _, err := os.Stat(filepath.Join(volatile, netId.String())); os.IsNotExist(err) {
continue
}

if err := os.Symlink(
filepath.Join("..", string(netId)),
filepath.Join(sym, string(id)),
); err != nil && !os.IsExist(err) {
log.Error().Err(err).Msgf("failed to create network symlink for %s -> ../%s", id, netId)
}
}
}

return nil
}
37 changes: 32 additions & 5 deletions cmds/modules/provisiond/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,29 @@ func (r *Reporter) getVmMetrics(ctx context.Context, slot rrd.Slot) error {
return nil
}

// getNetworkMetrics will collect network consumption for network resource and store it in the given slot
func (r *Reporter) getNetworkMetrics(ctx context.Context, slot rrd.Slot) error {
log.Debug().Msg("collecting networking metrics")
stub := stubs.NewNetworkerStub(r.cl)

ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
metrics, err := stub.Metrics(ctx)
if err != nil {
return err
}

for wl, consumption := range metrics {
nu := consumption.Nu()
log.Debug().Str("network", wl).Uint64("computed", uint64(nu)).Msgf("consumption: %+v", consumption)
if err := slot.Counter(wl, float64(nu)); err != nil {
return errors.Wrapf(err, "failed to store metrics for '%s'", wl)
}
}

return nil
}

// getVmMetrics will collect network consumption every 5 min and store
// it in the rrd database.
func (r *Reporter) getGwMetrics(ctx context.Context, slot rrd.Slot) error {
Expand Down Expand Up @@ -205,8 +228,12 @@ func (r *Reporter) getMetrics(ctx context.Context) error {
return err
}

if err := r.getNetworkMetrics(ctx, slot); err != nil {
log.Error().Err(err).Msg("failed to get network resource consumption")
}

if err := r.getVmMetrics(ctx, slot); err != nil {
log.Error().Err(err).Msg("failed to get vm network consumption")
log.Error().Err(err).Msg("failed to get vm public ip consumption")
}

if err := r.getGwMetrics(ctx, slot); err != nil {
Expand Down Expand Up @@ -327,23 +354,23 @@ func (r *Reporter) report(ctx context.Context, since time.Time) (time.Time, erro
continue
}

_, deploment, _, err := gridtypes.WorkloadID(key).Parts()
_, deployment, _, err := gridtypes.WorkloadID(key).Parts()
if err != nil {
log.Error().Err(err).Msgf("failed to parse metric key '%s'", key)
continue
}

rep, ok := reports[deploment]
rep, ok := reports[deployment]
if !ok {
rep = substrate.NruConsumption{
ContractID: types.U64(deploment),
ContractID: types.U64(deployment),
Timestamp: types.U64(now.Unix()),
Window: types.U64(window / time.Second),
}
}

rep.NRU += types.U64(value)
reports[deploment] = rep
reports[deployment] = rep
}

var report Report
Expand Down
10 changes: 9 additions & 1 deletion pkg/gridtypes/zos/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func NetworkID(twin uint32, network gridtypes.Name) NetID {
return NetID(string(b))
}

func NetworkIDFromWorkloadID(wl gridtypes.WorkloadID) (NetID, error) {
twin, _, name, err := wl.Parts()
if err != nil {
return "", err
}
return NetworkID(twin, name), nil
}

// Network is the description of a part of a network local to a specific node.
// A network workload defines a wireguard network that is usually spans multiple nodes. One of the nodes must work as an access node
// in other words, it must be reachable from other nodes, hence it needs to have a `PublicConfig`.
Expand Down Expand Up @@ -155,7 +163,7 @@ func (p *Peer) Valid() error {
return nil
}

//Challenge for peer
// Challenge for peer
func (p Peer) Challenge(w io.Writer) error {
if _, err := fmt.Fprintf(w, "%s", p.WGPublicKey); err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions pkg/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,18 @@ func (e *ExitDevice) String() string {
return "unknown"
}

//Networker is the interface for the network module
type NetResourceMetrics map[string]NetMetric

// Networker is the interface for the network module
type Networker interface {
// Ready return nil is networkd is ready to operate
// This function is used by other deamon to test if networkd is done booting
Ready() error

// Create a new network resource
CreateNR(Network) (string, error)
CreateNR(wl gridtypes.WorkloadID, network Network) (string, error)
// Delete a network resource
DeleteNR(Network) error
DeleteNR(wl gridtypes.WorkloadID) error

// Namespace returns the namespace name for given netid.
// it doesn't check if network exists.
Expand Down Expand Up @@ -202,6 +204,7 @@ type Networker interface {

SetPublicExitDevice(iface string) error

Metrics() (NetResourceMetrics, error)
// Monitoring methods

// ZOSAddresses monitoring streams for ZOS bridge IPs
Expand Down
Loading

0 comments on commit bb13921

Please sign in to comment.