Skip to content

Commit

Permalink
Merge pull request #1982 from threefoldtech/return-gpu-contract
Browse files Browse the repository at this point in the history
  • Loading branch information
xmonader authored Jun 21, 2023
2 parents 9ae56bf + d6775c1 commit c1c2677
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 70 deletions.
7 changes: 4 additions & 3 deletions client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ type UsersCounters struct {

// GPU information
type GPU struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
Contract uint64 `json:"contract"`
}

// Counters returns some node statistics. Including total and available cpu, memory, storage, etc...
Expand Down
109 changes: 109 additions & 0 deletions cmds/modules/provisiond/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package provisiond

import (
"context"
"encoding/json"

"github.com/pkg/errors"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg/capacity"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/primitives"
"github.com/threefoldtech/zos/pkg/provision"
"github.com/threefoldtech/zos/pkg/provision/mbus"
"github.com/threefoldtech/zos/pkg/stubs"
)

// build all api handlers and attach them to the rmb router.
func setupApi(router rmb.Router, cl zbus.Client, engine provision.Engine, store provision.Storage, statistics *primitives.Statistics) error {

// attach statistics module to rmb
if err := primitives.NewStatisticsMessageBus(router, statistics); err != nil {
return errors.Wrap(err, "failed to create statistics api")
}

setupStorageRmb(router, cl)
setupGPURmb(router, store)

_ = mbus.NewDeploymentMessageBus(router, engine)

return nil
}

func setupStorageRmb(router rmb.Router, cl zbus.Client) {
storage := router.Subroute("storage")
storage.WithHandler("pools", func(ctx context.Context, payload []byte) (interface{}, error) {
stub := stubs.NewStorageModuleStub(cl)
return stub.Metrics(ctx)
})
}

func setupGPURmb(router rmb.Router, store provision.Storage) {
type Info struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
Contract uint64 `json:"contract"`
}
gpus := router.Subroute("gpu")
usedGpus := func() (map[string]uint64, error) {
gpus := make(map[string]uint64)
active, err := store.Capacity()
if err != nil {
return nil, err
}
for _, dl := range active.Deployments {
for _, wl := range dl.Workloads {
if wl.Type != zos.ZMachineType {
continue
}
var vm zos.ZMachine
if err := json.Unmarshal(wl.Data, &vm); err != nil {
return nil, errors.Wrapf(err, "invalid workload data (%d.%s)", dl.ContractID, wl.Name)
}

for _, gpu := range vm.GPU {
gpus[string(gpu)] = dl.ContractID
}
}
}
return gpus, nil
}
gpus.WithHandler("list", func(ctx context.Context, payload []byte) (interface{}, error) {
devices, err := capacity.ListPCI(capacity.GPU)
if err != nil {
return nil, errors.Wrap(err, "failed to list available devices")
}

if err != nil {
return nil, errors.Wrap(err, "failed to list active deployments")
}

used, err := usedGpus()
if err != nil {
return nil, errors.Wrap(err, "failed to list used gpus")
}

var list []Info
for _, device := range devices {
id := device.ShortID()
info := Info{
ID: id,
Vendor: "unknown",
Device: "unknown",
Contract: used[id],
}

vendor, device, ok := device.GetDevice()
if ok {
info.Vendor = vendor.Name
info.Device = device.Name
}

list = append(list, info)
}

return list, nil
})
}
74 changes: 7 additions & 67 deletions cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"time"

"github.com/cenkalti/backoff/v3"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rusart/muxprom"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
"github.com/threefoldtech/zos/pkg"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/threefoldtech/zos/pkg/gridtypes"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/primitives"
"github.com/threefoldtech/zos/pkg/provision/mbus"
"github.com/threefoldtech/zos/pkg/provision/storage"
fsStorage "github.com/threefoldtech/zos/pkg/provision/storage.fs"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -193,14 +190,6 @@ func action(cli *cli.Context) error {
log.Error().Err(err).Msg("networkd is not ready yet")
})

router := mux.NewRouter().StrictSlash(true)

prom := muxprom.New(
muxprom.Router(router),
muxprom.Namespace("provision"),
)
prom.Instrument()

// the v1 endpoint will be used by all components to register endpoints
// that are specific for that component
//v1 := router.PathPrefix("/api/v1").Subrouter()
Expand All @@ -212,6 +201,7 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to create local reservation store")
}
defer store.Close()

// we check if the old fs storage still exists
fsStoragePath := filepath.Join(rootDir, fsStorageDB)
if _, err := os.Stat(fsStoragePath); err == nil {
Expand Down Expand Up @@ -241,7 +231,6 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to get node capacity")
}

var current gridtypes.Capacity
var active []gridtypes.Deployment
if !app.IsFirstBoot(serverName) {
// if this is the first boot of this module.
Expand All @@ -252,18 +241,16 @@ func action(cli *cli.Context) error {
// but if not, we need to set the current counters
// from store.
storageCap, err := store.Capacity()
current = storageCap.Cap
active = storageCap.Deployments
if err != nil {
log.Error().Err(err).Msg("failed to compute current consumed capacity")
}
}

if err := netResourceMigration(active); err != nil {
log.Error().Err(err).Msg("failed to migrate network resources")
if err := netResourceMigration(active); err != nil {
log.Error().Err(err).Msg("failed to migrate network resources")
}
}

log.Debug().Msgf("current used capacity: %+v", current)
// statistics collects information about workload statistics
// also does some checks on capacity
statistics := primitives.NewStatistics(
Expand Down Expand Up @@ -443,15 +430,10 @@ func action(cli *cli.Context) error {

zosRouter := mBus.Subroute("zos")
zosRouter.Use(rmb.LoggerMiddleware)
// attach statistics module to rmb
if err := primitives.NewStatisticsMessageBus(zosRouter, statistics); err != nil {
return errors.Wrap(err, "failed to create statistics api")
}

setupStorageRmb(zosRouter, cl)
setupGPURmb(zosRouter, cl)

_ = mbus.NewDeploymentMessageBus(zosRouter, engine)
if err := setupApi(zosRouter, cl, engine, store, statistics); err != nil {
return err
}

log.Info().Msg("running rmb handler")

Expand Down Expand Up @@ -484,45 +466,3 @@ func getNodeReserved(cl zbus.Client, available gridtypes.Capacity) primitives.Re
return
}
}

func setupStorageRmb(router rmb.Router, cl zbus.Client) {
storage := router.Subroute("storage")
storage.WithHandler("pools", func(ctx context.Context, payload []byte) (interface{}, error) {
stub := stubs.NewStorageModuleStub(cl)
return stub.Metrics(ctx)
})
}

func setupGPURmb(router rmb.Router, cl zbus.Client) {
type Info struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
}
gpus := router.Subroute("gpu")
gpus.WithHandler("list", func(ctx context.Context, payload []byte) (interface{}, error) {
devices, err := capacity.ListPCI(capacity.GPU)
if err != nil {
return nil, errors.Wrap(err, "failed to list available devices")
}

var list []Info
for _, device := range devices {
info := Info{
ID: device.ShortID(),
Vendor: "unknown",
Device: "unknown",
}

vendor, device, ok := device.GetDevice()
if ok {
info.Vendor = vendor.Name
info.Device = device.Name
}

list = append(list, info)
}

return list, nil
})
}
2 changes: 2 additions & 0 deletions pkg/events/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type RedisConsumer struct {
pool *redis.Pool
}

// NewConsumer creates a new event consumer on given redis address, and
// consumer id, consumer id has to be unique
func NewConsumer(address, id string) (*RedisConsumer, error) {
pool, err := utils.NewRedisPool(address)
if err != nil {
Expand Down

0 comments on commit c1c2677

Please sign in to comment.