Skip to content

Commit

Permalink
Jamesmoore/arch 254 silo s3 grab notification (#78)
Browse files Browse the repository at this point in the history
* Added a completedFunc cb to waitingCache

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added complete func to from_protocol instead of waiting_cache

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Added dg.waitForReady which should wait until S3 grab completed on all devices

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

* Fixes an issue in S3 grab when size isn't multiple of blocksize

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

---------

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod authored Feb 6, 2025
1 parent e60f08e commit 57f7e8e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
// Check the data in S3 hasn't changed.
hash := sha256.Sum256(buffer)
if !bytes.Equal(hash[:], a.Hash[:]) {
panic("The data in S3 is corrupt.")
panic(fmt.Sprintf("The data in S3 is corrupt. %x != %x (Off=%d Len=%d)", hash[:], a.Hash[:], a.Offset, a.Length))
}

n, err = startConfig.Destination.WriteAt(buffer, a.Offset)
Expand Down Expand Up @@ -470,9 +470,15 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
// Translate these to locations so they can be sent to a destination...
altSources := make([]packets.AlternateSource, 0)
for block, hash := range blocks {
l := int64(bs)
o := int64(block * uint(bs))
// If it's the last block, we may need to truncate the length
if o+l > int64(prov.Size()) {
l = int64(prov.Size()) - o
}
as := packets.AlternateSource{
Offset: int64(block * uint(bs)),
Length: int64(bs),
Offset: o,
Length: l,
Hash: hash,
Location: fmt.Sprintf("%s %s %s", ds.Sync.Endpoint, ds.Sync.Bucket, ds.Name),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/devicegroup/device_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DeviceGroup struct {
devices []*DeviceInformation
controlProtocol protocol.Protocol
incomingDevicesCh chan bool
readyDevicesCh chan bool
progressLock sync.Mutex
progress map[string]*migrator.MigrationProgress
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/devicegroup/device_group_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func NewFromProtocol(ctx context.Context,
dg.ctx = ctx

dg.incomingDevicesCh = make(chan bool, len(dg.devices))
dg.readyDevicesCh = make(chan bool, len(dg.devices))

// We need to create the FromProtocol for each device, and associated goroutines here.
for index, di := range dgi.Devices {
Expand All @@ -114,6 +115,12 @@ func NewFromProtocol(ctx context.Context,
}

from := protocol.NewFromProtocol(ctx, uint32(index), destStorageFactory, pro)

// Set something up to tell us when sync started
from.SetCompleteFunc(func() {
dg.readyDevicesCh <- true
})

err = from.SetDevInfo(di)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,3 +183,15 @@ func (dg *DeviceGroup) WaitForCompletion() error {
}
return nil
}

// Wait for devices to be ready (all data local)
func (dg *DeviceGroup) WaitForReady() error {
for range dg.devices {
select {
case <-dg.readyDevicesCh:
case <-dg.ctx.Done():
return dg.ctx.Err()
}
}
return nil
}
28 changes: 24 additions & 4 deletions pkg/storage/protocol/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type FromProtocol struct {
alternateSources []packets.AlternateSource
startSyncAt StartSyncBehaviour

completeFunc func()
completeFuncLock sync.Mutex

// metrics
metricRecvEvents uint64
metricRecvHashes uint64
Expand Down Expand Up @@ -172,10 +175,27 @@ func (fp *FromProtocol) getAltSourcesStartSync() {
// WriteCombinator now.

// Deal with the sync here... We don't wait...
go storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{
AlternateSources: as,
Destination: fp.provAltSources,
})
go func() {
storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{
AlternateSources: as,
Destination: fp.provAltSources,
})
// Notify anyone interested that the S3 grab completed...
fp.completeFuncLock.Lock()
if fp.completeFunc != nil {
fp.completeFunc()
}
fp.completeFuncLock.Unlock()
}()
}

/**
* Set a function to be called when S3 grab is completed and S3 sync started.
*/
func (fp *FromProtocol) SetCompleteFunc(f func()) {
fp.completeFuncLock.Lock()
fp.completeFunc = f
fp.completeFuncLock.Unlock()
}

// Handle any Events
Expand Down

0 comments on commit 57f7e8e

Please sign in to comment.