Skip to content

Commit 77b948c

Browse files
authored
Add isRestart param to validators (#197)
* feat: add isRestart param to validators * fix: on restart dont unpause unstarted transfer (#199)
1 parent f528962 commit 77b948c

File tree

7 files changed

+52
-11
lines changed

7 files changed

+52
-11
lines changed

channelmonitor/channelmonitor.go

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type Config struct {
5252
// Max time to wait for the responder to send a Complete message once all
5353
// data has been sent
5454
CompleteTimeout time.Duration
55+
// Called when a restart completes successfully
56+
OnRestartComplete func(id datatransfer.ChannelID)
5557
}
5658

5759
func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
@@ -382,6 +384,9 @@ func (mc *monitoredChannel) restartChannel() {
382384

383385
if !restartAgain {
384386
// No restart queued, we're done
387+
if mc.cfg.OnRestartComplete != nil {
388+
mc.cfg.OnRestartComplete(mc.chid)
389+
}
385390
return
386391
}
387392

impl/events.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
322322
return nil, err
323323
}
324324

325-
voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
325+
voucher, result, err := m.validateVoucher(true, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
326326
if err != nil && err != datatransfer.ErrPause {
327327
return result, xerrors.Errorf("failed to validate voucher: %w", err)
328328
}
@@ -361,7 +361,7 @@ func (m *manager) acceptRequest(
361361
return nil, err
362362
}
363363

364-
voucher, result, err := m.validateVoucher(initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
364+
voucher, result, err := m.validateVoucher(false, initiator, incoming, incoming.IsPull(), incoming.BaseCid(), stor)
365365
if err != nil && err != datatransfer.ErrPause {
366366
return result, err
367367
}
@@ -410,16 +410,19 @@ func (m *manager) acceptRequest(
410410
// * reading voucher fails
411411
// * deserialization of selector fails
412412
// * validation fails
413-
func (m *manager) validateVoucher(sender peer.ID,
413+
func (m *manager) validateVoucher(
414+
isRestart bool,
415+
sender peer.ID,
414416
incoming datatransfer.Request,
415417
isPull bool,
416418
baseCid cid.Cid,
417-
stor ipld.Node) (datatransfer.Voucher, datatransfer.VoucherResult, error) {
419+
stor ipld.Node,
420+
) (datatransfer.Voucher, datatransfer.VoucherResult, error) {
418421
vouch, err := m.decodeVoucher(incoming, m.validatedTypes)
419422
if err != nil {
420423
return nil, nil, err
421424
}
422-
var validatorFunc func(peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error)
425+
var validatorFunc func(bool, peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) (datatransfer.VoucherResult, error)
423426
processor, _ := m.validatedTypes.Processor(vouch.Type())
424427
validator := processor.(datatransfer.RequestValidator)
425428
if isPull {
@@ -428,7 +431,7 @@ func (m *manager) validateVoucher(sender peer.ID,
428431
validatorFunc = validator.ValidatePush
429432
}
430433

431-
result, err := validatorFunc(sender, vouch, baseCid, stor)
434+
result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor)
432435
return vouch, result, err
433436
}
434437

impl/integration_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ func TestAutoRestart(t *testing.T) {
725725
// Set up
726726
restartConf := ChannelRestartConfig(channelmonitor.Config{
727727
AcceptTimeout: 100 * time.Millisecond,
728+
RestartDebounce: 500 * time.Millisecond,
728729
RestartBackoff: 500 * time.Millisecond,
729730
MaxConsecutiveRestarts: 5,
730731
RestartAckTimeout: 100 * time.Millisecond,

impl/restart.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (m *manager) validateRestartVoucher(channel datatransfer.ChannelState, isPu
7373
}
7474

7575
// revalidate the voucher by reconstructing the request that would have led to the creation of this channel
76-
if _, _, err := m.validateVoucher(channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil {
76+
if _, _, err := m.validateVoucher(true, channel.OtherPeer(), req, isPull, channel.BaseCID(), channel.Selector()); err != nil {
7777
return err
7878
}
7979

manager.go

+2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import (
1313
type RequestValidator interface {
1414
// ValidatePush validates a push request received from the peer that will send data
1515
ValidatePush(
16+
isRestart bool,
1617
sender peer.ID,
1718
voucher Voucher,
1819
baseCid cid.Cid,
1920
selector ipld.Node) (VoucherResult, error)
2021
// ValidatePull validates a pull request received from the peer that will receive data
2122
ValidatePull(
23+
isRestart bool,
2224
receiver peer.ID,
2325
voucher Voucher,
2426
baseCid cid.Cid,

testutil/stubbedvalidator.go

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func NewStubbedValidator() *StubbedValidator {
1919

2020
// ValidatePush returns a stubbed result for a push validation
2121
func (sv *StubbedValidator) ValidatePush(
22+
isRestart bool,
2223
sender peer.ID,
2324
voucher datatransfer.Voucher,
2425
baseCid cid.Cid,
@@ -30,6 +31,7 @@ func (sv *StubbedValidator) ValidatePush(
3031

3132
// ValidatePull returns a stubbed result for a pull validation
3233
func (sv *StubbedValidator) ValidatePull(
34+
isRestart bool,
3335
receiver peer.ID,
3436
voucher datatransfer.Voucher,
3537
baseCid cid.Cid,

transport/graphsync/graphsync.go

+32-4
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type Transport struct {
7878
contextCancelMap map[datatransfer.ChannelID]cancelRequest
7979
pending map[datatransfer.ChannelID]chan struct{}
8080
requestorCancelledMap map[datatransfer.ChannelID]struct{}
81+
channelXferStarted map[datatransfer.ChannelID]bool
8182
pendingExtensions map[datatransfer.ChannelID][]graphsync.ExtensionData
8283
stores map[datatransfer.ChannelID]struct{}
8384
supportedExtensions []graphsync.ExtensionName
@@ -98,6 +99,7 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange, options ...Option)
9899
pendingExtensions: make(map[datatransfer.ChannelID][]graphsync.ExtensionData),
99100
channelIDMap: make(map[datatransfer.ChannelID]graphsyncKey),
100101
pending: make(map[datatransfer.ChannelID]chan struct{}),
102+
channelXferStarted: make(map[datatransfer.ChannelID]bool),
101103
stores: make(map[datatransfer.ChannelID]struct{}),
102104
supportedExtensions: defaultSupportedExtensions,
103105
}
@@ -149,15 +151,22 @@ func (t *Transport) OpenChannel(ctx context.Context,
149151
// Relock now that request has been cancelled
150152
t.dataLock.Lock()
151153
}
152-
// Set up the request listeners
154+
155+
// Keep track of "pending" channels.
156+
// The channel is in the "pending" state when we've made a call to
157+
// Graphsync to open a request, but Graphsync hasn't yet called the
158+
// outgoing request hook.
153159
t.pending[channelID] = make(chan struct{})
154160

161+
// Create a cancellable context for the channel so that the graphsync
162+
// request can be cancelled
155163
internalCtx, internalCancel := context.WithCancel(ctx)
156164
cancelRQ := cancelRequest{
157165
cancel: internalCancel,
158166
completed: make(chan struct{}),
159167
}
160168
t.contextCancelMap[channelID] = cancelRQ
169+
161170
t.dataLock.Unlock()
162171

163172
// If this is a restart request, the client can send a list of CIDs of
@@ -348,10 +357,10 @@ func (t *Transport) ResumeChannel(ctx context.Context,
348357
defer t.dataLock.Unlock()
349358

350359
if _, ok := t.requestorCancelledMap[chid]; ok {
351-
352360
t.pendingExtensions[chid] = append(t.pendingExtensions[chid], extensions...)
353361
return nil
354362
}
363+
t.channelXferStarted[chid] = true
355364
return t.gs.UnpauseResponse(gsKey.p, gsKey.requestID, extensions...)
356365
}
357366

@@ -375,10 +384,11 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI
375384
return nil
376385
}
377386
t.dataLock.Lock()
378-
if _, ok := t.requestorCancelledMap[chid]; ok {
387+
_, ok := t.requestorCancelledMap[chid]
388+
t.dataLock.Unlock()
389+
if ok {
379390
return nil
380391
}
381-
t.dataLock.Unlock()
382392
return t.gs.CancelResponse(gsKey.p, gsKey.requestID)
383393
}
384394

@@ -606,11 +616,26 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
606616
return
607617
}
608618

619+
// Check if the callback indicated that the channel should be paused
620+
// immediately
621+
paused := false
609622
if err == datatransfer.ErrPause {
623+
paused = true
610624
hookActions.PauseResponse()
611625
}
612626

613627
t.dataLock.Lock()
628+
629+
// If this is a restart request, and the data transfer still hasn't got
630+
// out of the paused state (eg because we're still unsealing), start this
631+
// graphsync response in the paused state.
632+
hasXferStarted, isRestart := t.channelXferStarted[chid]
633+
if isRestart && !hasXferStarted && !paused {
634+
paused = true
635+
hookActions.PauseResponse()
636+
}
637+
t.channelXferStarted[chid] = !paused
638+
614639
gsKey := graphsyncKey{request.ID(), p}
615640
if _, ok := t.requestorCancelledMap[chid]; ok {
616641
delete(t.requestorCancelledMap, chid)
@@ -626,7 +651,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
626651
if ok {
627652
hookActions.UsePersistenceOption("data-transfer-" + chid.String())
628653
}
654+
629655
t.dataLock.Unlock()
656+
630657
hookActions.ValidateRequest()
631658
}
632659

@@ -695,6 +722,7 @@ func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncK
695722
delete(t.graphsyncRequestMap, gsKey)
696723
delete(t.pendingExtensions, chid)
697724
delete(t.requestorCancelledMap, chid)
725+
delete(t.channelXferStarted, chid)
698726
_, ok := t.stores[chid]
699727
if ok {
700728
opt := "data-transfer-" + chid.String()

0 commit comments

Comments
 (0)