-
Notifications
You must be signed in to change notification settings - Fork 307
/
Copy pathread.go
297 lines (253 loc) · 6.9 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package ipld
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"github.com/ipfs/go-cid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/lazyledger/rsmt2d"
"github.com/lazyledger/lazyledger-core/p2p/ipld/plugin/nodes"
"github.com/lazyledger/lazyledger-core/types"
)
const baseErrorMsg = "failure to retrieve block data:"
var ErrEncounteredTooManyErrors = fmt.Errorf("%s %s", baseErrorMsg, "encountered too many errors")
var ErrTimeout = fmt.Errorf("%s %s", baseErrorMsg, "timeout")
// RetrieveBlockData asynchronously fetches block data using the minimum number
// of requests to IPFS. It fails if one of the random samples sampled is not available.
func RetrieveBlockData(
ctx context.Context,
dah *types.DataAvailabilityHeader,
api coreiface.CoreAPI,
codec rsmt2d.Codec,
) (types.Data, error) {
edsWidth := len(dah.RowsRoots)
sc := newshareCounter(ctx, uint32(edsWidth))
// convert the row and col roots into Cids
rowRoots := dah.RowsRoots.Bytes()
colRoots := dah.ColumnRoots.Bytes()
// sample 1/4 of the total extended square by sampling half of the leaves in
// half of the rows
for _, row := range uniqueRandNumbers(edsWidth/2, edsWidth) {
for _, col := range uniqueRandNumbers(edsWidth/2, edsWidth) {
rootCid, err := nodes.CidFromNamespacedSha256(rowRoots[row])
if err != nil {
return types.Data{}, err
}
go sc.retrieveShare(rootCid, true, row, col, api)
}
}
// wait until enough data has been collected, too many errors encountered,
// or the timeout is reached
err := sc.wait()
if err != nil {
return types.Data{}, err
}
// flatten the square
flattened := sc.flatten()
tree := NewErasuredNamespacedMerkleTree(uint64(edsWidth) / 2)
// repair the square
eds, err := rsmt2d.RepairExtendedDataSquare(rowRoots, colRoots, flattened, codec, tree.Constructor)
if err != nil {
return types.Data{}, err
}
blockData, err := types.DataFromSquare(eds)
if err != nil {
return types.Data{}, err
}
return blockData, nil
}
// uniqueRandNumbers generates count unique random numbers with a max of max
func uniqueRandNumbers(count, max int) []uint32 {
if count > max {
panic(fmt.Sprintf("cannot create %d unique samples from a max of %d", count, max))
}
samples := make(map[uint32]struct{}, count)
for i := 0; i < count; {
// nolint:gosec // G404: Use of weak random number generator
sample := uint32(rand.Intn(max))
if _, has := samples[sample]; has {
continue
}
samples[sample] = struct{}{}
i++
}
out := make([]uint32, count)
counter := 0
for s := range samples {
out[counter] = s
counter++
}
return out
}
type index struct {
row uint32
col uint32
}
type indexedShare struct {
data []byte
index
}
// shareCounter is a thread safe tallying mechanism for share retrieval
type shareCounter struct {
// all shares
shares map[index][]byte
// number of shares successfully collected
counter uint32
// the width of the extended data square
edsWidth uint32
// the minimum shares needed to repair the extended data square
minSharesNeeded uint32
shareChan chan indexedShare
ctx context.Context
cancel context.CancelFunc
// any errors encountered when attempting to retrieve shares
errc chan error
}
func newshareCounter(parentCtx context.Context, edsWidth uint32) *shareCounter {
ctx, cancel := context.WithCancel(parentCtx)
// calculate the min number of shares needed to repair the square
minSharesNeeded := (edsWidth * edsWidth / 4)
return &shareCounter{
shares: make(map[index][]byte),
edsWidth: edsWidth,
minSharesNeeded: minSharesNeeded,
shareChan: make(chan indexedShare, 1),
errc: make(chan error, 1),
ctx: ctx,
cancel: cancel,
}
}
// retrieveLeaf uses GetLeafData to fetch a single leaf and counts that leaf
func (sc *shareCounter) retrieveShare(
rootCid cid.Cid,
isRow bool,
axisIdx uint32,
idx uint32,
api coreiface.CoreAPI,
) {
data, err := GetLeafData(sc.ctx, rootCid, idx, sc.edsWidth, api)
if err != nil {
select {
case <-sc.ctx.Done():
case sc.errc <- err:
}
}
if len(data) < types.ShareSize {
return
}
// switch the row and col indexes if needed
rowIdx := idx
colIdx := axisIdx
if isRow {
rowIdx = axisIdx
colIdx = idx
}
select {
case <-sc.ctx.Done():
default:
sc.shareChan <- indexedShare{data: data[types.NamespaceSize:], index: index{row: rowIdx, col: colIdx}}
}
}
// wait until enough data has been collected, the timeout has been reached, or
// too many errors are encountered
func (sc *shareCounter) wait() error {
defer sc.cancel()
for {
select {
case <-sc.ctx.Done():
return ErrTimeout
case share := <-sc.shareChan:
_, has := sc.shares[share.index]
// add iff it does not already exists
if !has {
sc.shares[share.index] = share.data
sc.counter++
// check finishing condition
if sc.counter >= sc.minSharesNeeded {
return nil
}
}
case err := <-sc.errc:
return fmt.Errorf("failure to retrieve data square: %w", err)
}
}
}
func (sc *shareCounter) flatten() [][]byte {
flattended := make([][]byte, sc.edsWidth*sc.edsWidth)
for index, data := range sc.shares {
flattended[(index.row*sc.edsWidth)+index.col] = data
}
return flattended
}
// GetLeafData fetches and returns the data for leaf leafIndex of root rootCid.
// It stops and returns an error if the provided context is cancelled before
// finishing
func GetLeafData(
ctx context.Context,
rootCid cid.Cid,
leafIndex uint32,
totalLeafs uint32, // this corresponds to the extended square width
api coreiface.CoreAPI,
) ([]byte, error) {
// calculate the path to the leaf
leafPath, err := leafPath(leafIndex, totalLeafs)
if err != nil {
return nil, err
}
// use the root cid and the leafPath to create an ipld path
p := path.Join(path.IpldPath(rootCid), leafPath...)
// resolve the path
node, err := api.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
// return the leaf, without the nmt-leaf-or-node byte
return node.RawData()[1:], nil
}
func leafPath(index, total uint32) ([]string, error) {
// ensure that the total is a power of two
if total != nextPowerOf2(total) {
return nil, errors.New("expected total to be a power of 2")
}
if total == 0 {
return nil, nil
}
depth := int(math.Log2(float64(total)))
cursor := index
path := make([]string, depth)
for i := depth - 1; i >= 0; i-- {
if cursor%2 == 0 {
path[i] = "0"
} else {
path[i] = "1"
}
cursor /= 2
}
return path, nil
}
// nextPowerOf2 returns the next lowest power of 2 unless the input is a power
// of two, in which case it returns the input
func nextPowerOf2(v uint32) uint32 {
if v == 1 {
return 1
}
// keep track of the input
i := v
// find the next highest power using bit mashing
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
// check if the input was the next highest power
if i == v {
return v
}
// return the next lowest power
return v / 2
}