diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 39848f566..daf8a0711 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -105,7 +105,7 @@ export class ShardingStream extends TransformStream { overflowCurrentLength = 0 /** @type {Map} */ const overflowSlices = new DigestMap() - for (const block of blocks) { + for (const block of overflowBlocks) { const overflowBlockHeaderLength = blockHeaderEncodingLength(block) overflowSlices.set(block.cid.multihash, [ headerLength + overflowCurrentLength + overflowBlockHeaderLength, diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 79a3ef2ab..6c1444c32 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -78,24 +78,22 @@ describe('ShardingStream', () => { await randomBlock(32), // encoded block length = 70 ] - /** @type {import('../src/types.js').CARFile[]} */ + /** @type {import('../src/types.js').IndexedCARFile[]} */ const shards = [] + let i = 0 await new ReadableStream({ pull(controller) { - const block = blocks.shift() + const block = blocks[i] if (!block) return controller.close() controller.enqueue(block) + i++ }, }) - // shard with no roots = encoded block (166) + CAR header (18) = 183 - // shard with no roots = encoded block (102) + CAR header (18) = 120 - // shard with 1 root = encoded block (70) + CAR header (18) = 88 - // shard with 1 root = encoded block (70) + CAR header (59) = 155 - // i.e. shard size of 208 (120 + 88) should allow us 1 shard with 0 roots - // and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root - // will actually exceed the shard size. It must then be refactored into - // 2 shards. - .pipeThrough(new ShardingStream({ shardSize: 208 })) + // 166 + 102 + 70 + 18 (0 root CAR header) = 356 + // 166 + 102 + 70 + 59 (1 root CAR header) = 397 + // Choose 360 as shard size so when CAR header with a root is added, the + // 3rd block is moved into a new shard. + .pipeThrough(new ShardingStream({ shardSize: 360 })) .pipeTo( new WritableStream({ write: (s) => { @@ -104,7 +102,39 @@ describe('ShardingStream', () => { }) ) - assert.equal(shards.length, 3) + assert.equal(shards.length, 2) + + const shard0Bytes = new Uint8Array(await shards[0].arrayBuffer()) + const shard1Bytes = new Uint8Array(await shards[1].arrayBuffer()) + + // block 0 and 1 should be in shard 0 + const slice0 = shards[0].slices.get(blocks[0].cid.multihash) + assert.ok(slice0) + assert( + equals( + blocks[0].bytes, + shard0Bytes.slice(slice0[0], slice0[0] + slice0[1]) + ) + ) + + const slice1 = shards[0].slices.get(blocks[1].cid.multihash) + assert.ok(slice1) + assert( + equals( + blocks[1].bytes, + shard0Bytes.slice(slice1[0], slice1[0] + slice1[1]) + ) + ) + + // block 2 should be in shard 1 + const slice2 = shards[1].slices.get(blocks[2].cid.multihash) + assert.ok(slice2) + assert( + equals( + blocks[2].bytes, + shard1Bytes.slice(slice2[0], slice2[0] + slice2[1]) + ) + ) }) it('exceeds shard size when block is encoded with root CID', async () => {