Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return contract number if gpu is used #1982

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ type UsersCounters struct {

// GPU information
type GPU struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
Contract uint64 `json:"contract"`
}

// Counters returns some node statistics. Including total and available cpu, memory, storage, etc...
Expand Down
109 changes: 109 additions & 0 deletions cmds/modules/provisiond/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package provisiond

import (
"context"
"encoding/json"

"github.com/pkg/errors"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg/capacity"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/primitives"
"github.com/threefoldtech/zos/pkg/provision"
"github.com/threefoldtech/zos/pkg/provision/mbus"
"github.com/threefoldtech/zos/pkg/stubs"
)

// build all api handlers and attach them to the rmb router.
func setupApi(router rmb.Router, cl zbus.Client, engine provision.Engine, store provision.Storage, statistics *primitives.Statistics) error {

// attach statistics module to rmb
if err := primitives.NewStatisticsMessageBus(router, statistics); err != nil {
return errors.Wrap(err, "failed to create statistics api")
}

setupStorageRmb(router, cl)
setupGPURmb(router, store)

_ = mbus.NewDeploymentMessageBus(router, engine)

return nil
}

func setupStorageRmb(router rmb.Router, cl zbus.Client) {
storage := router.Subroute("storage")
storage.WithHandler("pools", func(ctx context.Context, payload []byte) (interface{}, error) {
stub := stubs.NewStorageModuleStub(cl)
return stub.Metrics(ctx)
})
}

func setupGPURmb(router rmb.Router, store provision.Storage) {
type Info struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
Contract uint64 `json:"contract"`
}
gpus := router.Subroute("gpu")
usedGpus := func() (map[string]uint64, error) {
gpus := make(map[string]uint64)
active, err := store.Capacity()
if err != nil {
return nil, err
}
for _, dl := range active.Deployments {
for _, wl := range dl.Workloads {
if wl.Type != zos.ZMachineType {
continue
}
var vm zos.ZMachine
if err := json.Unmarshal(wl.Data, &vm); err != nil {
return nil, errors.Wrapf(err, "invalid workload data (%d.%s)", dl.ContractID, wl.Name)
}

for _, gpu := range vm.GPU {
gpus[string(gpu)] = dl.ContractID
}
}
}
return gpus, nil
}
gpus.WithHandler("list", func(ctx context.Context, payload []byte) (interface{}, error) {
devices, err := capacity.ListPCI(capacity.GPU)
if err != nil {
return nil, errors.Wrap(err, "failed to list available devices")
}

if err != nil {
return nil, errors.Wrap(err, "failed to list active deployments")
}

used, err := usedGpus()
if err != nil {
return nil, errors.Wrap(err, "failed to list used gpus")
}

var list []Info
for _, device := range devices {
id := device.ShortID()
info := Info{
ID: id,
Vendor: "unknown",
Device: "unknown",
Contract: used[id],
}

vendor, device, ok := device.GetDevice()
if ok {
info.Vendor = vendor.Name
info.Device = device.Name
}

list = append(list, info)
}

return list, nil
})
}
74 changes: 7 additions & 67 deletions cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"time"

"github.com/cenkalti/backoff/v3"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rusart/muxprom"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
"github.com/threefoldtech/zos/pkg"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/threefoldtech/zos/pkg/gridtypes"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
"github.com/threefoldtech/zos/pkg/primitives"
"github.com/threefoldtech/zos/pkg/provision/mbus"
"github.com/threefoldtech/zos/pkg/provision/storage"
fsStorage "github.com/threefoldtech/zos/pkg/provision/storage.fs"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -193,14 +190,6 @@ func action(cli *cli.Context) error {
log.Error().Err(err).Msg("networkd is not ready yet")
})

router := mux.NewRouter().StrictSlash(true)

prom := muxprom.New(
muxprom.Router(router),
muxprom.Namespace("provision"),
)
prom.Instrument()

// the v1 endpoint will be used by all components to register endpoints
// that are specific for that component
//v1 := router.PathPrefix("/api/v1").Subrouter()
Expand All @@ -212,6 +201,7 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to create local reservation store")
}
defer store.Close()

// we check if the old fs storage still exists
fsStoragePath := filepath.Join(rootDir, fsStorageDB)
if _, err := os.Stat(fsStoragePath); err == nil {
Expand Down Expand Up @@ -241,7 +231,6 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to get node capacity")
}

var current gridtypes.Capacity
var active []gridtypes.Deployment
if !app.IsFirstBoot(serverName) {
// if this is the first boot of this module.
Expand All @@ -252,18 +241,16 @@ func action(cli *cli.Context) error {
// but if not, we need to set the current counters
// from store.
storageCap, err := store.Capacity()
current = storageCap.Cap
active = storageCap.Deployments
if err != nil {
log.Error().Err(err).Msg("failed to compute current consumed capacity")
}
}

if err := netResourceMigration(active); err != nil {
log.Error().Err(err).Msg("failed to migrate network resources")
if err := netResourceMigration(active); err != nil {
log.Error().Err(err).Msg("failed to migrate network resources")
}
}

log.Debug().Msgf("current used capacity: %+v", current)
// statistics collects information about workload statistics
// also does some checks on capacity
statistics := primitives.NewStatistics(
Expand Down Expand Up @@ -443,15 +430,10 @@ func action(cli *cli.Context) error {

zosRouter := mBus.Subroute("zos")
zosRouter.Use(rmb.LoggerMiddleware)
// attach statistics module to rmb
if err := primitives.NewStatisticsMessageBus(zosRouter, statistics); err != nil {
return errors.Wrap(err, "failed to create statistics api")
}

setupStorageRmb(zosRouter, cl)
setupGPURmb(zosRouter, cl)

_ = mbus.NewDeploymentMessageBus(zosRouter, engine)
if err := setupApi(zosRouter, cl, engine, store, statistics); err != nil {
return err
}

log.Info().Msg("running rmb handler")

Expand Down Expand Up @@ -484,45 +466,3 @@ func getNodeReserved(cl zbus.Client, available gridtypes.Capacity) primitives.Re
return
}
}

func setupStorageRmb(router rmb.Router, cl zbus.Client) {
storage := router.Subroute("storage")
storage.WithHandler("pools", func(ctx context.Context, payload []byte) (interface{}, error) {
stub := stubs.NewStorageModuleStub(cl)
return stub.Metrics(ctx)
})
}

func setupGPURmb(router rmb.Router, cl zbus.Client) {
type Info struct {
ID string `json:"id"`
Vendor string `json:"vendor"`
Device string `json:"device"`
}
gpus := router.Subroute("gpu")
gpus.WithHandler("list", func(ctx context.Context, payload []byte) (interface{}, error) {
devices, err := capacity.ListPCI(capacity.GPU)
if err != nil {
return nil, errors.Wrap(err, "failed to list available devices")
}

var list []Info
for _, device := range devices {
info := Info{
ID: device.ShortID(),
Vendor: "unknown",
Device: "unknown",
}

vendor, device, ok := device.GetDevice()
if ok {
info.Vendor = vendor.Name
info.Device = device.Name
}

list = append(list, info)
}

return list, nil
})
}
2 changes: 2 additions & 0 deletions pkg/events/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type RedisConsumer struct {
pool *redis.Pool
}

// NewConsumer creates a new event consumer on given redis address, and
// consumer id, consumer id has to be unique
func NewConsumer(address, id string) (*RedisConsumer, error) {
pool, err := utils.NewRedisPool(address)
if err != nil {
Expand Down