diff --git a/cmds/identityd/main.go b/cmds/identityd/main.go index ba324b616..1ab345f68 100644 --- a/cmds/identityd/main.go +++ b/cmds/identityd/main.go @@ -88,7 +88,7 @@ func main() { if farm { env := environment.MustGet() - fmt.Println(env.FarmerID) + fmt.Println(env.FarmID) os.Exit(0) } else if net { env := environment.MustGet() @@ -355,7 +355,7 @@ func getIdentityMgr(root string, debug bool) (pkg.IdentityManager, error) { log.Info(). Bool("orphan", env.Orphan). - Uint64("farmer_id", uint64(env.FarmerID)). + Uint64("farmer_id", uint64(env.FarmID)). Msg("farmer identified") return manager, nil diff --git a/cmds/modules/noded/main.go b/cmds/modules/noded/main.go index 8e05b0f58..7893d5d38 100644 --- a/cmds/modules/noded/main.go +++ b/cmds/modules/noded/main.go @@ -217,7 +217,7 @@ func action(cli *cli.Context) error { return err } - events, err := events.NewRedisStream(sub, msgBrokerCon, node, eventsBlock) + events, err := events.NewRedisStream(sub, msgBrokerCon, env.FarmID, node, eventsBlock) if err != nil { return err } @@ -264,33 +264,6 @@ func action(cli *cli.Context) error { return err } - // uptime update - go func() { - defer log.Info().Msg("uptime reporting exited permanently") - safeUptime := func(ctx context.Context, redis zbus.Client) (err error) { - defer func() { - if p := recover(); p != nil { - err = fmt.Errorf("uptime reporting has panicked: %+v", p) - } - }() - - err = uptime(ctx, id) - return err - } - - for { - err := safeUptime(ctx, redis) - if errors.Is(err, context.Canceled) { - log.Info().Msg("stop uptime reporting. context cancelled") - return - } else if err != nil { - log.Error().Err(err).Msg("sending uptime failed") - } - // even there is no error we try again until ctx is cancelled - <-time.After(10 * time.Second) - } - }() - log.Debug().Msg("start message bus") for { err := runMsgBus(ctx, sk, env.SubstrateURL, env.RelayURL, msgBrokerCon) diff --git a/cmds/modules/noded/register.go b/cmds/modules/noded/register.go deleted file mode 100644 index 1eea68fc1..000000000 --- a/cmds/modules/noded/register.go +++ /dev/null @@ -1,54 +0,0 @@ -package noded - -import ( - "context" - "time" - - "github.com/centrifuge/go-substrate-rpc-client/v4/types" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - "github.com/shirou/gopsutil/host" - "github.com/threefoldtech/substrate-client" - "github.com/threefoldtech/zos/pkg/environment" -) - -const ( - reportUptimeEvery = 40 * time.Minute -) - -func uptime(ctx context.Context, id substrate.Identity) error { - subMgr, err := environment.GetSubstrate() - if err != nil { - return err - } - - update := func(uptime uint64) (types.Hash, error) { - sub, err := subMgr.Substrate() - if err != nil { - return types.Hash{}, err - } - defer sub.Close() - return sub.UpdateNodeUptime(id, uptime) - } - - for { - uptime, err := host.Uptime() - if err != nil { - return errors.Wrap(err, "failed to get uptime") - } - log.Debug().Msg("updating node uptime") - hash, err := update(uptime) - if err != nil { - return errors.Wrap(err, "failed to report uptime") - } - - log.Info().Str("hash", hash.Hex()).Msg("node uptime hash") - - select { - case <-ctx.Done(): - return nil - case <-time.After(reportUptimeEvery): - continue - } - } -} diff --git a/cmds/modules/powerd/main.go b/cmds/modules/powerd/main.go new file mode 100644 index 000000000..3c7b734a0 --- /dev/null +++ b/cmds/modules/powerd/main.go @@ -0,0 +1,112 @@ +package powerd + +import ( + "crypto/ed25519" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/threefoldtech/substrate-client" + "github.com/threefoldtech/zbus" + "github.com/threefoldtech/zos/pkg/environment" + "github.com/threefoldtech/zos/pkg/events" + "github.com/threefoldtech/zos/pkg/power" + "github.com/threefoldtech/zos/pkg/stubs" + "github.com/threefoldtech/zos/pkg/utils" + "github.com/urfave/cli/v2" +) + +const ( + module = "power" +) + +// Module is entry point for module +var Module cli.Command = cli.Command{ + Name: "powerd", + Usage: "handles the node power events", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "broker", + Usage: "connection string to the message `BROKER`", + Value: "unix:///var/run/redis.sock", + }, + }, + Action: action, +} + +func action(cli *cli.Context) error { + var ( + msgBrokerCon string = cli.String("broker") + ) + + ctx, _ := utils.WithSignal(cli.Context) + utils.OnDone(ctx, func(_ error) { + log.Info().Msg("shutting down") + }) + + env := environment.MustGet() + + cl, err := zbus.NewRedisClient(msgBrokerCon) + if err != nil { + return errors.Wrap(err, "failed to connect to message broker server") + } + + identity := stubs.NewIdentityManagerStub(cl) + register := stubs.NewRegistrarStub(cl) + + nodeID, err := register.NodeID(ctx) + if err != nil { + return errors.Wrap(err, "failed to get node id") + } + + twinID, err := register.TwinID(ctx) + if err != nil { + return errors.Wrap(err, "failed to get node id") + } + + sk := ed25519.PrivateKey(identity.PrivateKey(ctx)) + id, err := substrate.NewIdentityFromEd25519Key(sk) + log.Info().Str("address", id.Address()).Msg("node address") + if err != nil { + return err + } + + sub, err := environment.GetSubstrate() + if err != nil { + return err + } + + uptime, err := power.NewUptime(sub, id) + if err != nil { + return errors.Wrap(err, "failed to initialize uptime reported") + } + + // start uptime reporting + go uptime.Start(cli.Context) + + // if the feature is globally enabled try to ensure + // wake on lan is set correctly. + // then override the enabled flag + enabled, err := power.EnsureWakeOnLan(cli.Context) + if err != nil { + return errors.Wrap(err, "failed to enable wol") + } + + if !enabled { + // if the zos nics don't support wol we can automatically + // disable the feature + log.Info().Msg("no wol support found by zos nic") + } + + consumer, err := events.NewConsumer(msgBrokerCon, module) + if err != nil { + return errors.Wrap(err, "failed to to create event consumer") + } + + // start power manager + power, err := power.NewPowerServer(cl, sub, consumer, enabled, env.FarmID, nodeID, twinID, id, uptime) + if err != nil { + return errors.Wrap(err, "failed to initialize power manager") + } + + return power.Start(ctx) +} diff --git a/cmds/modules/provisiond/main.go b/cmds/modules/provisiond/main.go index 3dfe41c32..e5ab4ff51 100644 --- a/cmds/modules/provisiond/main.go +++ b/cmds/modules/provisiond/main.go @@ -231,7 +231,7 @@ func action(cli *cli.Context) error { return errors.Wrap(err, "failed to create substrate users database") } - admins, err := provision.NewSubstrateAdmins(mgr, uint32(env.FarmerID)) + admins, err := provision.NewSubstrateAdmins(mgr, uint32(env.FarmID)) if err != nil { return errors.Wrap(err, "failed to create substrate admins database") } diff --git a/cmds/zos/main.go b/cmds/zos/main.go index bc97b3200..a84181042 100644 --- a/cmds/zos/main.go +++ b/cmds/zos/main.go @@ -12,6 +12,7 @@ import ( "github.com/threefoldtech/zos/cmds/modules/gateway" "github.com/threefoldtech/zos/cmds/modules/networkd" "github.com/threefoldtech/zos/cmds/modules/noded" + "github.com/threefoldtech/zos/cmds/modules/powerd" "github.com/threefoldtech/zos/cmds/modules/provisiond" "github.com/threefoldtech/zos/cmds/modules/qsfsd" "github.com/threefoldtech/zos/cmds/modules/storaged" @@ -54,6 +55,7 @@ func main() { &zbusdebug.Module, &gateway.Module, &qsfsd.Module, + &powerd.Module, }, Before: func(c *cli.Context) error { if c.Bool("debug") { diff --git a/etc/zinit/powerd.yaml b/etc/zinit/powerd.yaml new file mode 100644 index 000000000..c496b34b5 --- /dev/null +++ b/etc/zinit/powerd.yaml @@ -0,0 +1,4 @@ +exec: powerd --broker unix://var/run/redis.sock +after: + - boot + - noded diff --git a/go.sum b/go.sum index 4b9800165..9d21f4ce9 100644 --- a/go.sum +++ b/go.sum @@ -992,8 +992,6 @@ github.com/threefoldtech/0-fs v1.3.1-0.20201203163303-d963de9adea7 h1:64QIPSO1Ac github.com/threefoldtech/0-fs v1.3.1-0.20201203163303-d963de9adea7/go.mod h1:OPPZiE/GthPR2IepjKSc8wa+t/7wl3dtHQyEdUcftZI= github.com/threefoldtech/go-substrate-rpc-client/v4 v4.0.6-0.20230102154731-7c633b7d3c71 h1:vvoXPRI10quMOEXHKcakKhBZe+TlLXhNsh1wwX4cyk4= github.com/threefoldtech/go-substrate-rpc-client/v4 v4.0.6-0.20230102154731-7c633b7d3c71/go.mod h1:5g1oM4Zu3BOaLpsKQ+O8PAv2kNuq+kPcA1VzFbsSqxE= -github.com/threefoldtech/rmb-sdk-go v0.0.0-20230120120324-7c96fec72b33 h1:uSpwP4Du/yKCgpeaw9/wJKH7QmwC0O54QOD8tsTKgwc= -github.com/threefoldtech/rmb-sdk-go v0.0.0-20230120120324-7c96fec72b33/go.mod h1:PgqoUXMfkJEY6TD6Yoy0Gx+w/7hdGoV7Hel21wvNQk8= github.com/threefoldtech/rmb-sdk-go v0.0.0-20230208121945-1920816a9268 h1:HL0WRgjKn18az5pDDD9XEFkeMTBB1lWJGBF+5Cb8LLg= github.com/threefoldtech/rmb-sdk-go v0.0.0-20230208121945-1920816a9268/go.mod h1:ddt04wNV0lkrfijSa1x2nbEoyBO41WO0B0BFLIeP6Mg= github.com/threefoldtech/substrate-client v0.0.0-20230203145052-45be626d311b h1:zeUyutA3Siibb0P9vx3I6P6UIu1WzzOaR39QJm4SLRY= diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index 538f2886d..f44384e89 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -24,8 +24,8 @@ type Environment struct { FlistURL string BinRepo string - FarmerID pkg.FarmID - Orphan bool + FarmID pkg.FarmID + Orphan bool FarmSecret string SubstrateURL []string @@ -219,11 +219,11 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { switch env.RunningMode { case RunningDev: - env.FarmerID = OrphanageDev + env.FarmID = OrphanageDev case RunningTest: - env.FarmerID = OrphanageTest + env.FarmID = OrphanageTest case RunningMain: - env.FarmerID = OrphanageMain + env.FarmID = OrphanageMain } } else { @@ -232,7 +232,7 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { if err != nil { return env, errors.Wrap(err, "wrong format for farm ID") } - env.FarmerID = pkg.FarmID(id) + env.FarmID = pkg.FarmID(id) } // Checking if there environment variable diff --git a/pkg/events.go b/pkg/events.go index a725aeb18..9c60f879b 100644 --- a/pkg/events.go +++ b/pkg/events.go @@ -25,3 +25,9 @@ type ContractLockedEvent struct { TwinId uint32 Lock bool } + +type PowerTargetChangeEvent struct { + FarmID FarmID + NodeID uint32 + Target substrate.Power +} diff --git a/pkg/events/redis.go b/pkg/events/redis.go index 13ad8f190..054316a11 100644 --- a/pkg/events/redis.go +++ b/pkg/events/redis.go @@ -5,6 +5,7 @@ import ( "context" "encoding/gob" "fmt" + "reflect" "strings" "github.com/centrifuge/go-substrate-rpc-client/v4/types" @@ -23,16 +24,18 @@ const ( streamPublicConfig = "stream:public-config" streamContractCancelled = "stream:contract-cancelled" streamContractGracePeriod = "stream:contract-lock" + streamPowerTargetChange = "stream:power-target" ) type RedisStream struct { sub substrate.Manager state string + farm pkg.FarmID node uint32 pool *redis.Pool } -func NewRedisStream(sub substrate.Manager, address string, node uint32, state string) (*RedisStream, error) { +func NewRedisStream(sub substrate.Manager, address string, farm pkg.FarmID, node uint32, state string) (*RedisStream, error) { pool, err := utils.NewRedisPool(address, 2) if err != nil { return nil, err @@ -41,6 +44,7 @@ func NewRedisStream(sub substrate.Manager, address string, node uint32, state st return &RedisStream{ sub: sub, state: state, + farm: farm, node: node, pool: pool, }, nil @@ -120,6 +124,21 @@ func (r *RedisStream) process(events *substrate.EventRecords) { } } + for _, event := range events.TfgridModule_PowerTargetChanged { + if event.Farm != types.U32(r.farm) { + continue + } + + log.Info().Uint32("node", uint32(event.Node)).Msg("got power target change event") + if err := r.push(con, streamPowerTargetChange, pkg.PowerTargetChangeEvent{ + FarmID: pkg.FarmID(event.Farm), + NodeID: uint32(event.Node), + Target: event.PowerTarget, + }); err != nil { + log.Error().Err(err).Msg("failed to push event") + } + } + } func (r *RedisStream) Start(ctx context.Context) { @@ -194,15 +213,22 @@ func (r *RedisConsumer) ack(ctx context.Context, con redis.Conn, group, stream, return err } -func (r *RedisConsumer) PublicConfig(ctx context.Context) (<-chan pkg.PublicConfigEvent, error) { - con := r.pool.Get() - ch := make(chan pkg.PublicConfigEvent) +func (r *RedisConsumer) consumer(ctx context.Context, stream string, ch reflect.Value) error { + chType := ch.Type() + if chType.Kind() != reflect.Chan { + panic("ch must be of a channel type") + } - const stream = streamPublicConfig + elem := chType.Elem() + if elem.Kind() != reflect.Struct { + panic("channel element must be a structure") + } + + con := r.pool.Get() group, err := r.ensureGroup(con, stream) if err != nil && !isBusyGroup(err) { - return nil, err + return err } logger := log.With().Str("stream", stream).Logger() @@ -216,11 +242,17 @@ func (r *RedisConsumer) PublicConfig(ctx context.Context) (<-chan pkg.PublicConf } for _, message := range messages { - var event pkg.PublicConfigEvent - if err := message.Decode(&event); err == nil { - select { - case ch <- event: - case <-ctx.Done(): + event := reflect.New(elem) + ptr := event.Interface() + if err := message.Decode(ptr); err == nil { + // since we don't know the type of the event nor the channel + // type we need to do this in runtime as follows + chosen, _, _ := reflect.Select([]reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}, + {Dir: reflect.SelectSend, Chan: ch, Send: event.Elem()}, + }) + + if chosen == 0 { return } } else if err != nil { @@ -240,105 +272,27 @@ func (r *RedisConsumer) PublicConfig(ctx context.Context) (<-chan pkg.PublicConf } }() - return ch, nil + return nil +} + +func (r *RedisConsumer) PublicConfig(ctx context.Context) (<-chan pkg.PublicConfigEvent, error) { + ch := make(chan pkg.PublicConfigEvent) + return ch, r.consumer(ctx, streamPublicConfig, reflect.ValueOf(ch)) } func (r *RedisConsumer) ContractCancelled(ctx context.Context) (<-chan pkg.ContractCancelledEvent, error) { - con := r.pool.Get() ch := make(chan pkg.ContractCancelledEvent) - - const stream = streamContractCancelled - group, err := r.ensureGroup(con, stream) - - if err != nil && !isBusyGroup(err) { - return nil, err - } - - logger := log.With().Str("stream", stream).Logger() - go func() { - defer con.Close() - - for { - messages, err := r.pop(con, group, stream) - if err != nil { - logger.Error().Err(err).Msg("failed to get events from") - } - - for _, message := range messages { - var event pkg.ContractCancelledEvent - if err := message.Decode(&event); err == nil { - select { - case ch <- event: - case <-ctx.Done(): - return - } - } else if err != nil { - logger.Error().Err(err).Str("id", message.ID).Msg("failed to handle message") - } - - if err := r.ack(ctx, con, group, stream, message.ID); err != nil { - logger.Error().Err(err).Str("id", message.ID).Msg("failed to ack message") - } - } - - select { - case <-ctx.Done(): - return - default: - } - } - }() - - return ch, nil + return ch, r.consumer(ctx, streamContractCancelled, reflect.ValueOf(ch)) } func (r *RedisConsumer) ContractLocked(ctx context.Context) (<-chan pkg.ContractLockedEvent, error) { - con := r.pool.Get() ch := make(chan pkg.ContractLockedEvent) + return ch, r.consumer(ctx, streamContractGracePeriod, reflect.ValueOf(ch)) +} - const stream = streamContractGracePeriod - group, err := r.ensureGroup(con, stream) - - if err != nil && !isBusyGroup(err) { - return nil, err - } - - logger := log.With().Str("stream", stream).Logger() - go func() { - defer con.Close() - - for { - messages, err := r.pop(con, group, stream) - if err != nil { - logger.Error().Err(err).Msg("failed to get events from") - } - - for _, message := range messages { - var event pkg.ContractLockedEvent - if err := message.Decode(&event); err == nil { - select { - case <-ctx.Done(): - return - case ch <- event: - } - } else if err != nil { - logger.Error().Err(err).Str("id", message.ID).Msg("failed to handle message") - } - - if err := r.ack(ctx, con, group, stream, message.ID); err != nil { - logger.Error().Err(err).Str("id", message.ID).Msg("failed to ack message") - } - } - - select { - case <-ctx.Done(): - return - default: - } - } - }() - - return ch, nil +func (r *RedisConsumer) PowerTargetChange(ctx context.Context) (<-chan pkg.PowerTargetChangeEvent, error) { + ch := make(chan pkg.PowerTargetChangeEvent) + return ch, r.consumer(ctx, streamPowerTargetChange, reflect.ValueOf(ch)) } type payload struct { diff --git a/pkg/identity/identityd.go b/pkg/identity/identityd.go index a341fe546..fa19396c6 100644 --- a/pkg/identity/identityd.go +++ b/pkg/identity/identityd.go @@ -102,7 +102,7 @@ func (d *identityManager) Farm() (string, error) { } defer cl.Close() - farm, err := cl.GetFarm(uint32(d.env.FarmerID)) + farm, err := cl.GetFarm(uint32(d.env.FarmID)) if errors.Is(err, substrate.ErrNotFound) { return "", fmt.Errorf("wrong farm id") } else if err != nil { @@ -119,7 +119,7 @@ func (d *identityManager) FarmID() (pkg.FarmID, error) { if err != nil { return 0, errors.Wrap(err, "failed to parse node environment") } - return env.FarmerID, nil + return env.FarmID, nil } // FarmSecret returns farm secret from kernel params diff --git a/pkg/kernel/kernel.go b/pkg/kernel/kernel.go index c58bcdf60..429b6467c 100644 --- a/pkg/kernel/kernel.go +++ b/pkg/kernel/kernel.go @@ -36,14 +36,14 @@ func (k Params) Get(key string) ([]string, bool) { return v, ok } +// IsDebug checks if zos-debug is set func (k Params) IsDebug() bool { - _, ok := k.Get(Debug) - return ok + return k.Exists(Debug) } +// IsVirtualMachine checks if zos-debug-vm is set func (k Params) IsVirtualMachine() bool { - _, ok := k.Get(VirtualMachine) - return ok + return k.Exists(VirtualMachine) } func parseParams(content string) Params { diff --git a/pkg/network/bridge/bridge.go b/pkg/network/bridge/bridge.go index 0c2979330..b10184315 100644 --- a/pkg/network/bridge/bridge.go +++ b/pkg/network/bridge/bridge.go @@ -132,6 +132,28 @@ func AttachNic(link netlink.Link, bridge *netlink.Bridge) error { return netlink.LinkSetMaster(link, bridge) } +// List all nics attached to a bridge +func ListNics(bridge *netlink.Bridge, physical bool) ([]netlink.Link, error) { + links, err := netlink.LinkList() + if err != nil { + return nil, err + } + + filtered := links[:0] + + for _, link := range links { + if link.Attrs().MasterIndex != bridge.Index { + continue + } + if physical && link.Type() != "device" { + continue + } + filtered = append(filtered, link) + } + + return filtered, nil +} + // AttachNicWithMac attaches an interface to a bridge and sets // the MAC of the bridge to the same of the NIC func AttachNicWithMac(link netlink.Link, bridge *netlink.Bridge) error { diff --git a/pkg/network/mbus.go b/pkg/network/mbus.go index f54c4c45f..09ca81783 100644 --- a/pkg/network/mbus.go +++ b/pkg/network/mbus.go @@ -80,7 +80,7 @@ func (n *Network) setup(router rmb.Router) error { if err != nil { return errors.Wrap(err, "failed to get substrate") } - mw, err := Authorized(mgr, uint32(env.FarmerID)) + mw, err := Authorized(mgr, uint32(env.FarmID)) if err != nil { return errors.Wrap(err, "failed to initialized admin mw") } diff --git a/pkg/power/ethtool.go b/pkg/power/ethtool.go new file mode 100644 index 000000000..85a28a51f --- /dev/null +++ b/pkg/power/ethtool.go @@ -0,0 +1,69 @@ +package power + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os/exec" + "strings" + + "github.com/pkg/errors" +) + +type Flag string + +const ( + SupportsWakeOn Flag = "Supports Wake-on" + WakeOn Flag = "Wake-on" +) + +type WolMode string + +const ( + MagicPacket WolMode = "g" +) + +var ( + ErrFlagNotFound = fmt.Errorf("flag not found") +) + +func ethtool(ctx context.Context, arg ...string) ([]byte, error) { + return exec.CommandContext(ctx, "ethtool", arg...).CombinedOutput() +} + +func ValueOfFlag(ctx context.Context, nic string, flag Flag) (string, error) { + output, err := ethtool(ctx, nic) + if err != nil { + return "", err + } + + return valueOfFlag(output, flag) +} + +func valueOfFlag(output []byte, flag Flag) (string, error) { + buf := bytes.NewBuffer(output) + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + if err := scanner.Err(); err != nil { + return "", err + } + + line := strings.TrimSpace(scanner.Text()) + parts := strings.Split(line, ":") + if parts[0] != string(flag) { + continue + } + if len(parts) != 2 { + return "", fmt.Errorf("invalid ethtool output format (%s)", line) + } + return strings.TrimSpace(string(parts[1])), nil + } + + return "", ErrFlagNotFound +} + +func SetWol(ctx context.Context, nic string, mode WolMode) error { + _, err := ethtool(ctx, "-s", nic, "wol", string(mode)) + return errors.Wrap(err, "failed to set nic wol") +} diff --git a/pkg/power/ethtool_test.go b/pkg/power/ethtool_test.go new file mode 100644 index 000000000..b4c41a709 --- /dev/null +++ b/pkg/power/ethtool_test.go @@ -0,0 +1,75 @@ +package power + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseFlags(t *testing.T) { + const input = `Settings for enp6s0f0: + Supported ports: [ TP ] + Supported link modes: 10baseT/Half 10baseT/Full + 100baseT/Half 100baseT/Full + 1000baseT/Full + Supported pause frame use: Symmetric + Supports auto-negotiation: Yes + Supported FEC modes: Not reported + Advertised link modes: 10baseT/Half 10baseT/Full + 100baseT/Half 100baseT/Full + 1000baseT/Full + Advertised pause frame use: Symmetric + Advertised auto-negotiation: Yes + Advertised FEC modes: Not reported + Speed: 1000Mb/s + Duplex: Full + Port: Twisted Pair + PHYAD: 1 + Transceiver: internal + Auto-negotiation: on + MDI-X: off (auto) + Supports Wake-on: pumbg + Wake-on: g + Current message level: 0x00000007 (7) + drv probe link + Link detected: yes + ` + + value, err := valueOfFlag([]byte(input), SupportsWakeOn) + require.NoError(t, err) + + require.Equal(t, "pumbg", value) +} + +func TestParseFlagNotSet(t *testing.T) { + const input = `Settings for enp6s0f0: + Supported ports: [ TP ] + Supported link modes: 10baseT/Half 10baseT/Full + 100baseT/Half 100baseT/Full + 1000baseT/Full + Supported pause frame use: Symmetric + Supports auto-negotiation: Yes + Supported FEC modes: Not reported + Advertised link modes: 10baseT/Half 10baseT/Full + 100baseT/Half 100baseT/Full + 1000baseT/Full + Advertised pause frame use: Symmetric + Advertised auto-negotiation: Yes + Advertised FEC modes: Not reported + Speed: 1000Mb/s + Duplex: Full + Port: Twisted Pair + PHYAD: 1 + Transceiver: internal + Auto-negotiation: on + MDI-X: off (auto) + Wake-on: g + Current message level: 0x00000007 (7) + drv probe link + Link detected: yes + ` + + _, err := valueOfFlag([]byte(input), SupportsWakeOn) + require.ErrorIs(t, err, ErrFlagNotFound) + +} diff --git a/pkg/power/power.go b/pkg/power/power.go new file mode 100644 index 000000000..7d3981bbd --- /dev/null +++ b/pkg/power/power.go @@ -0,0 +1,279 @@ +package power + +import ( + "context" + "fmt" + "os/exec" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/threefoldtech/substrate-client" + "github.com/threefoldtech/zbus" + "github.com/threefoldtech/zos/pkg" + "github.com/threefoldtech/zos/pkg/events" + "github.com/threefoldtech/zos/pkg/network/bridge" + "github.com/threefoldtech/zos/pkg/zinit" +) + +type PowerServer struct { + cl zbus.Client + consumer *events.RedisConsumer + sub substrate.Manager + + // enabled means the node can power off! + enabled bool + farm pkg.FarmID + node uint32 + twin uint32 + identity substrate.Identity + ut *Uptime +} + +func NewPowerServer( + cl zbus.Client, + sub substrate.Manager, + consumer *events.RedisConsumer, + enabled bool, + farm pkg.FarmID, + node uint32, + twin uint32, + identity substrate.Identity, + ut *Uptime) (*PowerServer, error) { + + return &PowerServer{ + cl: cl, + sub: sub, + consumer: consumer, + enabled: enabled, + farm: farm, + node: node, + twin: twin, + identity: identity, + ut: ut, + }, nil +} + +const ( + DefaultWolBridge = "zos" + PowerServerPort = 8039 +) + +var ( + errConnectionError = fmt.Errorf("connection error") +) + +func EnsureWakeOnLan(ctx context.Context) (bool, error) { + inf, err := bridge.Get(DefaultWolBridge) + if err != nil { + return false, errors.Wrap(err, "failed to get zos bridge") + } + + nics, err := bridge.ListNics(inf, true) + if err != nil { + return false, errors.Wrap(err, "failed to list attached nics to zos bridge") + } + + filtered := nics[:0] + for _, nic := range nics { + if nic.Type() == "device" { + filtered = append(filtered, nic) + } + } + + if len(filtered) != 1 { + return false, fmt.Errorf("zos bridge has multiple interfaces") + } + + nic := filtered[0].Attrs().Name + log.Info().Str("nic", nic).Msg("enabling wol on interface") + support, err := ValueOfFlag(ctx, nic, SupportsWakeOn) + + if errors.Is(err, ErrFlagNotFound) { + // no support for + return false, nil + } else if err != nil { + return false, errors.Wrap(err, "failed to detect support for wake on lan") + } + + if !strings.Contains(support, string(MagicPacket)) { + // no magic packet support either + return false, nil + } + + return true, SetWol(ctx, nic, MagicPacket) +} + +func (p *PowerServer) syncSelf() error { + cl, err := p.sub.Substrate() + if err != nil { + return err + } + + defer cl.Close() + + power, err := cl.GetPowerTarget(p.node) + if err != nil { + return err + } + + // power target is the state the node has to be in + // while the node state is the actual state set by the node. + + // if target is up, and the node state is up, we do nothing + // if target is up, but th state is down, we set the state to up and return + // if target is down, we make sure state is down, then shutdown + + if power.Target.IsUp { + if power.State.IsDown { + _, err = cl.SetNodePowerState(p.identity, true) + return errors.Wrap(err, "failed to set state to up") + } + + return nil + } + + // now the target must be down. + // we need to shutdown + + if power.State.IsUp { + _, err = cl.SetNodePowerState(p.identity, false) + return errors.Wrap(err, "failed to set state to up") + } + + // otherwise node need to get back to sleep. + if err := p.shutdown(); err != nil { + return errors.Wrap(err, "failed to issue shutdown") + } + + return nil +} + +func (p *PowerServer) powerUp(node *substrate.Node, reason string) error { + log.Info().Uint32("node", uint32(node.ID)).Str("reason", reason).Msg("powering on node") + + mac := "" + for _, inf := range node.Interfaces { + if inf.Name == "zos" { + mac = inf.Mac + break + } + } + if mac == "" { + return fmt.Errorf("can't find mac address of node '%d'", node.ID) + } + + return exec.Command("ether-wake", "-i", "zos", mac).Run() + +} + +func (p *PowerServer) shutdown() error { + if !p.enabled { + log.Info().Msg("ignoring shutdown because power-management is not enabled") + return nil + } + + log.Info().Msg("shutting down node because of chain") + if _, err := p.ut.SendNow(); err != nil { + log.Error().Err(err).Msg("failed to send uptime before shutting down") + } + + // is down! + init := zinit.Default() + err := init.Shutdown() + + if errors.Is(err, zinit.ErrNotSupported) { + log.Info().Msg("node does not support shutdown. rebooting to update") + return init.Reboot() + } + + return err +} + +func (p *PowerServer) event(event *pkg.PowerTargetChangeEvent) error { + if event.FarmID != p.farm { + return nil + } + + log.Debug(). + Uint32("farm", uint32(p.farm)). + Uint32("node", p.node). + Msg("received power event for farm") + + cl, err := p.sub.Substrate() + if err != nil { + return err + } + + defer cl.Close() + node, err := cl.GetNode(event.NodeID) + if err != nil { + return err + } + + if event.NodeID == p.node && event.Target.IsDown { + // we need to shutdown! + if _, err := cl.SetNodePowerState(p.identity, false); err != nil { + return errors.Wrap(err, "failed to set node power state to down") + } + + return p.shutdown() + } else if event.Target.IsDown { + return nil + } + + if event.Target.IsUp { + log.Info().Uint32("target", event.NodeID).Msg("received an event to power up") + return p.powerUp(node, "target is up") + } + + return nil +} + +func (p *PowerServer) recv(ctx context.Context) error { + log.Info().Msg("listening for power events") + stream, err := p.consumer.PowerTargetChange(ctx) + if err != nil { + return errors.Wrapf(errConnectionError, "failed to connect to zbus events: %s", err) + } + + for event := range stream { + if err := p.event(&event); err != nil { + log.Error().Err(err).Msg("failed to process power event") + } + } + + return nil +} + +// start processing time events. +func (p *PowerServer) events(ctx context.Context) error { + // first thing we need to make sure we are not suppose to be powered + // off, so we need to sync with grid + // 1) make sure at least one uptime was already sent + _ = p.ut.Mark.Done(ctx) + // 2) do we need to power off + if err := p.syncSelf(); err != nil { + return errors.Wrap(err, "failed to synchronize power status") + } + + // if the stream loop fails for any reason retry + // unless context was cancelled + for { + err := p.recv(ctx) + if err == nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + } + } +} + +func (p *PowerServer) Start(ctx context.Context) error { + return p.events(ctx) +} diff --git a/pkg/power/uptime.go b/pkg/power/uptime.go new file mode 100644 index 000000000..246e678e0 --- /dev/null +++ b/pkg/power/uptime.go @@ -0,0 +1,116 @@ +package power + +import ( + "context" + "fmt" + "runtime/debug" + "sync" + "time" + + "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/shirou/gopsutil/host" + "github.com/threefoldtech/substrate-client" + "github.com/threefoldtech/zos/pkg/utils" +) + +const ( + reportUptimeEvery = 40 * time.Minute +) + +type Uptime struct { + // Mark is set to done after the first uptime is sent + Mark utils.Mark + + id substrate.Identity + sub substrate.Manager + m sync.Mutex +} + +func NewUptime(sub substrate.Manager, id substrate.Identity) (*Uptime, error) { + return &Uptime{ + id: id, + sub: sub, + Mark: utils.NewMark(), + }, nil +} + +func (u *Uptime) SendNow() (types.Hash, error) { + uptime, err := host.Uptime() + if err != nil { + return types.Hash{}, errors.Wrap(err, "failed to get uptime") + } + return u.send(uptime) +} + +func (u *Uptime) send(uptime uint64) (types.Hash, error) { + // the mutex is to avoid race when SendNow is called + // while the times reporting is working + u.m.Lock() + defer u.m.Unlock() + + sub, err := u.sub.Substrate() + if err != nil { + return types.Hash{}, err + } + defer sub.Close() + return sub.UpdateNodeUptime(u.id, uptime) +} + +func (u *Uptime) uptime(ctx context.Context) error { + for { + uptime, err := host.Uptime() + if err != nil { + return errors.Wrap(err, "failed to get uptime") + } + log.Debug().Msg("updating node uptime") + hash, err := u.send(uptime) + if err != nil { + return errors.Wrap(err, "failed to report uptime") + } + + u.Mark.Signal() + + log.Info().Str("hash", hash.Hex()).Msg("node uptime hash") + + select { + case <-ctx.Done(): + return nil + case <-time.After(reportUptimeEvery): + continue + } + } +} + +// start uptime reporting. returns a channel that is closed immediately after +// the first uptime is reported. +func (u *Uptime) Start(ctx context.Context) { + // uptime update + defer log.Info().Msg("uptime reporting exited permanently") + safeUptime := func(ctx context.Context) (err error) { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("uptime reporting has panicked: %+v\n%s", p, string(debug.Stack())) + } + }() + + err = u.uptime(ctx) + return err + } + + for { + err := safeUptime(ctx) + if errors.Is(err, context.Canceled) { + log.Info().Msg("stop uptime reporting. context cancelled") + return + } else if err != nil { + log.Error().Err(err).Msg("sending uptime failed") + } else { + // context was cancelled + return + } + // even there is no error we try again until ctx is cancelled + <-time.After(10 * time.Second) + } +} diff --git a/pkg/registrar/register.go b/pkg/registrar/register.go index 669df5df1..2b7cdaf1e 100644 --- a/pkg/registrar/register.go +++ b/pkg/registrar/register.go @@ -169,7 +169,7 @@ func registerNode( } create := substrate.Node{ - FarmID: types.U32(env.FarmerID), + FarmID: types.U32(env.FarmID), TwinID: types.U32(twinID), Resources: resources, Location: location, diff --git a/pkg/utils/mark.go b/pkg/utils/mark.go new file mode 100644 index 000000000..448f963b9 --- /dev/null +++ b/pkg/utils/mark.go @@ -0,0 +1,44 @@ +package utils + +import ( + "context" + "sync" +) + +// Mark defined a placeholder where multiple routines can +// use to synchronize their operation. Routines calling Done +// will be blocked until one calls Signal. After that all calls +// to Done should return immediately +type Mark interface { + Done(ctx context.Context) error + Signal() +} + +type mark struct { + ch chan struct{} + o sync.Once +} + +func NewMark() Mark { + return &mark{ch: make(chan struct{})} +} + +// Done blocks until either ctx times out (returns error) +// or Signal has been called +// if Signal was already called, return immediately +func (m *mark) Done(ctx context.Context) error { + select { + case <-m.ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Signal the mark to release anyone blocked +// on Done +func (m *mark) Signal() { + m.o.Do(func() { + close(m.ch) + }) +} diff --git a/pkg/utils/mark_test.go b/pkg/utils/mark_test.go new file mode 100644 index 000000000..e28a42267 --- /dev/null +++ b/pkg/utils/mark_test.go @@ -0,0 +1,20 @@ +package utils + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMark(t *testing.T) { + mark := NewMark() + + mark.Signal() + mark.Signal() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, mark.Done(ctx)) +} diff --git a/pkg/zinit/commands.go b/pkg/zinit/commands.go index 05d6a436f..423f6b5b9 100644 --- a/pkg/zinit/commands.go +++ b/pkg/zinit/commands.go @@ -3,11 +3,13 @@ package zinit import ( "fmt" "os" + "os/exec" "path/filepath" "regexp" "strings" "time" + "github.com/blang/semver" "github.com/google/shlex" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -20,6 +22,8 @@ var ( ErrUnknownService = errors.New("unknown service") ErrAlreadyMonitored = errors.New("already monitored") + + ErrNotSupported = errors.New("operation not supported") ) // PossibleState represents the state of a service managed by zinit @@ -208,7 +212,58 @@ func (c *Client) Status(service string) (result ServiceStatus, err error) { return } +func (c *Client) Version() (semver.Version, error) { + // we need to read the version from the binary + // which is a problem because it might be different + // from the one actually running. + // but there is nothing else we can do + output, err := exec.Command("zinit", "-V").Output() + if err != nil { + return semver.Version{}, errors.Wrap(err, "failed to get zinit binary version") + } + + parts := strings.Split(string(output), " ") + if len(parts) != 2 { + return semver.Version{}, fmt.Errorf("invalid version output from zinit command: %s", string(output)) + } + + v := strings.TrimPrefix(strings.TrimSpace(parts[1]), "v") + + version, err := semver.Parse(v) + if err != nil { + return version, errors.Wrapf(err, "failed to parse version returned by zos: '%s'", v) + } + + return version, nil +} + func (c *Client) Reboot() error { + ver, err := c.Version() + if err != nil { + return err + } + // separate reboot and shutdown commands were implemented + // in version 0.2.9. Before this version `shutdown` caused + // a reboot. + if ver.LT(semver.MustParse("0.2.9")) { + return c.cmd("shutdown", nil) + } + + return c.cmd("reboot", nil) +} + +func (c *Client) Shutdown() error { + ver, err := c.Version() + if err != nil { + return err + } + // separate reboot and shutdown commands were implemented + // in version 0.2.9. Before this version `shutdown` caused + // a reboot. + if ver.LT(semver.MustParse("0.2.9")) { + return errors.Wrap(ErrNotSupported, "shutdown is not supported in this version of zinit") + } + return c.cmd("shutdown", nil) } diff --git a/qemu/overlay/etc/zinit/logger.yaml b/qemu/overlay/etc/zinit/logger.yaml deleted file mode 120000 index 6d86c9d0f..000000000 --- a/qemu/overlay/etc/zinit/logger.yaml +++ /dev/null @@ -1 +0,0 @@ -../../../../etc/zinit/logger.yaml \ No newline at end of file