Skip to content

Commit

Permalink
fix: overflow slices (#1595)
Browse files Browse the repository at this point in the history
In the unusual case where a sharding stream has to create a new shard
because adding the root CID to the CAR header pushes it over the shard
limit the slice indexing info for the new shard was set for the blocks
in the previous shard, not the blocks in the new shard 🤦, this means
that one or more blocks in DAG were not indexed at all.
  • Loading branch information
alanshaw authored Nov 29, 2024
1 parent b737073 commit 0731582
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class ShardingStream extends TransformStream {
overflowCurrentLength = 0
/** @type {Map<import('./types.js').SliceDigest, import('./types.js').Position>} */
const overflowSlices = new DigestMap()
for (const block of blocks) {
for (const block of overflowBlocks) {
const overflowBlockHeaderLength = blockHeaderEncodingLength(block)
overflowSlices.set(block.cid.multihash, [
headerLength + overflowCurrentLength + overflowBlockHeaderLength,
Expand Down
54 changes: 42 additions & 12 deletions packages/upload-client/test/sharding.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,22 @@ describe('ShardingStream', () => {
await randomBlock(32), // encoded block length = 70
]

/** @type {import('../src/types.js').CARFile[]} */
/** @type {import('../src/types.js').IndexedCARFile[]} */
const shards = []
let i = 0
await new ReadableStream({
pull(controller) {
const block = blocks.shift()
const block = blocks[i]
if (!block) return controller.close()
controller.enqueue(block)
i++
},
})
// shard with no roots = encoded block (166) + CAR header (18) = 183
// shard with no roots = encoded block (102) + CAR header (18) = 120
// shard with 1 root = encoded block (70) + CAR header (18) = 88
// shard with 1 root = encoded block (70) + CAR header (59) = 155
// i.e. shard size of 208 (120 + 88) should allow us 1 shard with 0 roots
// and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root
// will actually exceed the shard size. It must then be refactored into
// 2 shards.
.pipeThrough(new ShardingStream({ shardSize: 208 }))
// 166 + 102 + 70 + 18 (0 root CAR header) = 356
// 166 + 102 + 70 + 59 (1 root CAR header) = 397
// Choose 360 as shard size so when CAR header with a root is added, the
// 3rd block is moved into a new shard.
.pipeThrough(new ShardingStream({ shardSize: 360 }))
.pipeTo(
new WritableStream({
write: (s) => {
Expand All @@ -104,7 +102,39 @@ describe('ShardingStream', () => {
})
)

assert.equal(shards.length, 3)
assert.equal(shards.length, 2)

const shard0Bytes = new Uint8Array(await shards[0].arrayBuffer())
const shard1Bytes = new Uint8Array(await shards[1].arrayBuffer())

// block 0 and 1 should be in shard 0
const slice0 = shards[0].slices.get(blocks[0].cid.multihash)
assert.ok(slice0)
assert(
equals(
blocks[0].bytes,
shard0Bytes.slice(slice0[0], slice0[0] + slice0[1])
)
)

const slice1 = shards[0].slices.get(blocks[1].cid.multihash)
assert.ok(slice1)
assert(
equals(
blocks[1].bytes,
shard0Bytes.slice(slice1[0], slice1[0] + slice1[1])
)
)

// block 2 should be in shard 1
const slice2 = shards[1].slices.get(blocks[2].cid.multihash)
assert.ok(slice2)
assert(
equals(
blocks[2].bytes,
shard1Bytes.slice(slice2[0], slice2[0] + slice2[1])
)
)
})

it('exceeds shard size when block is encoded with root CID', async () => {
Expand Down

0 comments on commit 0731582

Please sign in to comment.