Skip to content

Commit

Permalink
rapide: make the client code cleaner with happy left
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Apr 14, 2023
1 parent 5f374bc commit 66fd3c8
Showing 1 changed file with 63 additions and 58 deletions.
121 changes: 63 additions & 58 deletions rapide/clientdriven.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ func (d *download) startClientDrivenWorker(impl ClientDrivenDownloader, start *n

// err must be called while holding w.mu.
func (w *clientDrivenWorker) err(err error) {
if err == io.EOF {
w.dl.workerFinished()
} else {
*w.outErr = err
w.dl.workerErrored()
}
*w.outErr = err
w.dl.workerErrored()

toCancel := make([]cid.Cid, w.len)
for i, p := 0, w.head; p != nil; i, p = i+1, p.children {
Expand Down Expand Up @@ -99,67 +95,76 @@ func (s *snake) callback(data []byte, err error) {

if err != nil {
// TODO: handle ErrNotFound
goto Errr
s.update()
w.err(err)
return
}

n.mu.Lock()
if n.state == todo {
var block blocks.Block
block, err = blocks.NewBlockWithCid(data, n.cid)
if err != nil {
goto Errr
}
err = n.expand(w.dl, block)
n.mu.Lock()
if err != nil {
goto Errr
}
if n.state != todo {
// duplicated block
s.update()
return
}

newBlocksWanted := uint(len(n.childrens))
if remainingSpace := targetParallelBlockDownloads - w.len; newBlocksWanted > remainingSpace {
newBlocksWanted = remainingSpace
}
w.len += newBlocksWanted
if newBlocksWanted != 0 {
downloads := make([]CidCallbackPair, newBlocksWanted)
// TODO: select blocks randomly within the children
left, right := s.dup()
for i := range downloads {
child := n.childrens[i]
child.mu.Lock()
child.workers++
child.mu.Unlock()
ns := &snake{
worker: s.worker,
node: child,
parent: left,
level: s.level + 1,
}
left.children, left = ns, ns
downloads[i] = CidCallbackPair{child.cid, ns.callback}
var block blocks.Block
block, err = blocks.NewBlockWithCid(data, n.cid)
if err != nil {
s.update()
w.err(err)
}
err = n.expand(w.dl, block)
eof := err == io.EOF
n.mu.Lock()
if err != nil && !eof {
s.update()
w.err(err)
return
}

newBlocksWanted := uint(len(n.childrens))
if remainingSpace := targetParallelBlockDownloads - w.len; newBlocksWanted > remainingSpace {
newBlocksWanted = remainingSpace
}
w.len += newBlocksWanted
if newBlocksWanted != 0 {
downloads := make([]CidCallbackPair, newBlocksWanted)
// TODO: select blocks randomly within the children
left, right := s.dup()
for i := range downloads {
child := n.childrens[i]
child.mu.Lock()
child.workers++
child.mu.Unlock()
ns := &snake{
worker: s.worker,
node: child,
parent: left,
level: s.level + 1,
}
left.children, right.parent = right, left
s.update()
// TODO: if we havn't found enough blocks to download, try in other nodes of the snake or try backtracking from the head.
s.worker.impl.Download(downloads...)
} else {
s.update()
}
select {
case w.dl.out <- blocks.Is(block):
case <-w.dl.ctx.Done():
err = w.dl.ctx.Err()
goto Errr
left.children, left = ns, ns
downloads[i] = CidCallbackPair{child.cid, ns.callback}
}
left.children, right.parent = right, left
s.update()
// TODO: if we havn't found enough blocks to download, try in other nodes of the snake or try backtracking from the head.
s.worker.impl.Download(downloads...)
} else {
s.update()
}

dl := w.dl
ctx := dl.ctx
select {
case dl.out <- blocks.Is(block):
case <-ctx.Done():
w.err(ctx.Err())
return
}
// duplicated block
s.update()
return

Errr:
s.update()
w.err(err)
if eof {
dl.finish()
}
}

// update checks if this node should be removed from the snake and do so if needed. It will update the metric if needed.
Expand Down

0 comments on commit 66fd3c8

Please sign in to comment.