This repository has been archived by the owner on Feb 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 112
/
getter.go
132 lines (112 loc) · 3.69 KB
/
getter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package getter
import (
"context"
"errors"
notifications "github.com/ipfs/go-bitswap/notifications"
logging "github.com/ipfs/go-log"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
var log = logging.Logger("bitswap")
// GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks.
type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
// SyncGetBlock takes a block cid and an async function for getting several
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, blockstore.ErrNotFound
}
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancel := context.WithCancel(p)
defer cancel()
promise, err := gb(ctx, []cid.Cid{k})
if err != nil {
return nil, err
}
select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-p.Done():
return nil, p.Err()
}
}
// WantFunc is any function that can express a want for set of blocks.
type WantFunc func(context.Context, []cid.Cid)
// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function, and returns a channel of
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
// If there are no keys supplied, just return a closed channel
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
return out, nil
}
// Use a PubSub notifier to listen for incoming blocks for each key
remaining := cid.NewSet()
promise := notif.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
remaining.Add(k)
}
// Send the want request for the keys to the network
want(ctx, keys)
out := make(chan blocks.Block)
go handleIncoming(ctx, sessctx, remaining, promise, out, cwants)
return out, nil
}
// Listens for incoming blocks, passing them to the out channel.
// If the context is cancelled or the incoming channel closes, calls cfun with
// any keys corresponding to blocks that were never received.
func handleIncoming(ctx context.Context, sessctx context.Context, remaining *cid.Set,
in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {
ctx, cancel := context.WithCancel(ctx)
// Clean up before exiting this function, and call the cancel function on
// any remaining keys
defer func() {
cancel()
close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
cfun(remaining.Keys())
}()
for {
select {
case blk, ok := <-in:
// If the channel is closed, we're done (note that PubSub closes
// the channel once all the keys have been received)
if !ok {
return
}
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
return
case <-sessctx.Done():
return
}
case <-ctx.Done():
return
case <-sessctx.Done():
return
}
}
}