Skip to content

Commit

Permalink
Jamesmoore/arch 224 silo support device groups (#66)
Browse files Browse the repository at this point in the history
* Start on impl of dev groups

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Device group creation

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* First test for deviceGroup

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added dg.SendDevInfo and test

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* test cleanup

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added first try at dg.MigrateAll

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* DeviceGroup migrate and test working

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Lint fixes

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added logging for dg.MigrateAll

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Device concurrency is now proportional to size and allocated from a deviceGroup max

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* First stab at migrateDirty

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added hooks for dg.MigrateDirty phase

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* refactored dg tests common setup

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Drastic simplification of cmd/serve using devicegroup

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added new packet for deviceGroup

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* devicegroup devInfo change passes test, but /cmd/connect will be broken atm

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Started on dg.NewFromProtocol (migrate_from)

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* First deviceGroup migrate test working

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Split dg.FromProtocol in two, so we can start using devices before migration completed.

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added waitingCache bits into dg from

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added simple authority transfer to dg

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Improved progressHandler for dg

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added README for dg usage

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Simplified cmd/connect to use new dg api

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Updates to schema encode/decode

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* dg tests pass. Order issue fixed

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* cmd serve/connect working

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Some minor tweaks to readme

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Silly typos

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added event compat and exposed things from dg

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Refined MigrateDirty hooks for drafter integrate

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Updated README, and progress now map by name

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Small possible race in nbd_dispatch

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Better shutdown

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Even better nbd dispatch shutdown

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* lint

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Moved customDataHandler. Updated WaitForCompletion to honour context

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

---------

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod authored Dec 19, 2024
1 parent 1964658 commit e0d0733
Show file tree
Hide file tree
Showing 17 changed files with 1,446 additions and 801 deletions.
408 changes: 20 additions & 388 deletions cmd/connect.go

Large diffs are not rendered by default.

466 changes: 73 additions & 393 deletions cmd/serve.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkg/storage/config/silo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"errors"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -123,6 +124,25 @@ func (ds *DeviceSchema) Encode() []byte {
return f.Bytes()
}

func (ds *DeviceSchema) EncodeAsBlock() []byte {
f := hclwrite.NewEmptyFile()
block := gohcl.EncodeAsBlock(ds, "device")
f.Body().AppendBlock(block)
return f.Bytes()
}

func DecodeDeviceFromBlock(schema string) (*DeviceSchema, error) {
sf := &SiloSchema{}
err := sf.Decode([]byte(schema))
if err != nil {
return nil, err
}
if len(sf.Device) != 1 {
return nil, errors.New("more than one device in schema")
}
return sf.Device[0], nil
}

func (ds *DeviceSchema) Decode(schema string) error {
file, diag := hclsyntax.ParseConfig([]byte(schema), "", hcl.Pos{Line: 1, Column: 1})
if diag.HasErrors() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/config/silo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,31 @@ func TestSiloConfig(t *testing.T) {
assert.NoError(t, err)
// TODO: Check data is as expected
}

func TestSiloConfigBlock(t *testing.T) {

schema := `device Disk0 {
size = "1G"
expose = true
system = "memory"
}
device Disk1 {
size = "2M"
system = "memory"
}
`

s := new(SiloSchema)
err := s.Decode([]byte(schema))
assert.NoError(t, err)

block0 := s.Device[0].EncodeAsBlock()

ds := &SiloSchema{}
err = ds.Decode(block0)
assert.NoError(t, err)

// Make sure it used the label
assert.Equal(t, ds.Device[0].Name, s.Device[0].Name)
}
75 changes: 75 additions & 0 deletions pkg/storage/devicegroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Device Group

The `DeviceGroup` combines some number of Silo devices into a single unit, which can then be migrated to another Silo instance.
All internal concerns such as volatilityMonitor, waitingCache, as well as the new S3 assist, are now hidden from the consumer.

## Creation

There are two methods to create a `DeviceGroup`.

### NewFromSchema

This takes in an array of Silo device configs, and creates the devices. If `expose==true` then a corresponding NBD device will be created and attached.

### NewFromProtocol

This takes in a `protocol` and creates the devices as they are received from a sender.

## Usage (Sending devices)

Devices in a `DeviceGroup` are sent together, which allows Silo to optimize all aspects of the transfer.

// Create a device group from schema
dg, err := devicegroup.NewFromSchema(devices, log, siloMetrics)

// Start a migration
err := dg.StartMigrationTo(protocol)

// Migrate the data with max total concurrency 100
err = dg.MigrateAll(100, pHandler)

// Migrate any dirty blocks
// hooks gives some control over the dirty loop
err = dg.MigrateDirty(hooks)

// Send completion events for all devices
err = dg.Completed()

// Close everything
dg.CloseAll()

Within the `MigrateDirty` there are a number of hooks we can use to control things. MigrateDirty will return once all devices have no more dirty data. You can of course then call MigrateDirty again eg for continuous sync.

type MigrateDirtyHooks struct {
PreGetDirty func(name string) error
PostGetDirty func(name string, blocks []uint) (bool, error)
PostMigrateDirty func(name string, blocks []uint) (bool, error)
Completed func(name string)
}


There is also support for sending global custom data. This would typically be done either in `pHandler` (The progress handler), or in one of the `MigrateDirty` hooks.

pHandler := func(ps []*migrator.MigrationProgress) {
// Do some test here to see if enough data migrated

// If so, send a custom Authority Transfer event.
dg.SendCustomData(authorityTransferPacket)
}

## Usage (Receiving devices)

// Create a DeviceGroup from protocol
// tweak func allows us to modify the schema, eg pathnames
dg, err = NewFromProtocol(ctx, protocol, tweak, nil, nil)

// Handle any custom data events
// For example resume the VM here.
go dg.HandleCustomData(func(data []byte) {
// We got sent some custom data!
})

// Wait for migration completion
dg.WaitForCompletion()

Once a `DeviceGroup` is has been created and migration is completed, you can then send the devices somewhere else with `StartMigration(protocol)`.
139 changes: 139 additions & 0 deletions pkg/storage/devicegroup/device_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package devicegroup

import (
"context"
"errors"
"sync"
"time"

"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/blocks"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/dirtytracker"
"github.com/loopholelabs/silo/pkg/storage/metrics"
"github.com/loopholelabs/silo/pkg/storage/migrator"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/volatilitymonitor"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
)

const volatilityExpiry = 30 * time.Minute
const defaultBlockSize = 1024 * 1024

var errNotSetup = errors.New("toProtocol not setup")

type DeviceGroup struct {
log types.Logger
met metrics.SiloMetrics
ctx context.Context
devices []*DeviceInformation
controlProtocol protocol.Protocol
incomingDevicesCh chan bool
progressLock sync.Mutex
progress map[string]*migrator.MigrationProgress
}

type DeviceInformation struct {
Size uint64
BlockSize uint64
NumBlocks int
Schema *config.DeviceSchema
Prov storage.Provider
Storage storage.LockableProvider
Exp storage.ExposedStorage
Volatility *volatilitymonitor.VolatilityMonitor
DirtyLocal *dirtytracker.Local
DirtyRemote *dirtytracker.Remote
To *protocol.ToProtocol
Orderer *blocks.PriorityBlockOrder
Migrator *migrator.Migrator
migrationError chan error
WaitingCacheLocal *waitingcache.Local
WaitingCacheRemote *waitingcache.Remote
EventHandler func(e *packets.Event)
}

func (dg *DeviceGroup) GetDeviceSchema() []*config.DeviceSchema {
s := make([]*config.DeviceSchema, 0)
for _, di := range dg.devices {
s = append(s, di.Schema)
}
return s
}

func (dg *DeviceGroup) GetAllNames() []string {
names := make([]string, 0)
for _, di := range dg.devices {
names = append(names, di.Schema.Name)
}
return names
}

func (dg *DeviceGroup) GetDeviceInformationByName(name string) *DeviceInformation {
for _, di := range dg.devices {
if di.Schema.Name == name {
return di
}
}
return nil
}

func (dg *DeviceGroup) GetExposedDeviceByName(name string) storage.ExposedStorage {
for _, di := range dg.devices {
if di.Schema.Name == name && di.Exp != nil {
return di.Exp
}
}
return nil
}

func (dg *DeviceGroup) GetProviderByName(name string) storage.Provider {
for _, di := range dg.devices {
if di.Schema.Name == name {
return di.Prov
}
}
return nil
}

func (dg *DeviceGroup) GetBlockSizeByName(name string) int {
for _, di := range dg.devices {
if di.Schema.Name == name {
return int(di.BlockSize)
}
}
return -1
}

func (dg *DeviceGroup) CloseAll() error {
if dg.log != nil {
dg.log.Debug().Int("devices", len(dg.devices)).Msg("close device group")
}

var e error
for _, d := range dg.devices {
// Unlock the storage so nothing blocks here...
// If we don't unlock there may be pending nbd writes that can't be completed.
d.Storage.Unlock()

err := d.Prov.Close()
if err != nil {
if dg.log != nil {
dg.log.Error().Err(err).Msg("error closing device group storage provider")
}
e = errors.Join(e, err)
}
if d.Exp != nil {
err = d.Exp.Shutdown()
if err != nil {
if dg.log != nil {
dg.log.Error().Err(err).Msg("error closing device group exposed storage")
}
e = errors.Join(e, err)
}
}
}
return e
}
Loading

0 comments on commit e0d0733

Please sign in to comment.