Skip to content

Commit

Permalink
fix: avoid duplicates on aggregator buffer concat (#1259)
Browse files Browse the repository at this point in the history
While we know that some queues/streams we use are "at least 1 delivery",
we have barely seen any duplicate (see
https://filecoinproject.slack.com/archives/C02BZPRS9HP/p1704731857750279).

However, when we had a considerable amount of failures and retries,
looks like the "at least 1 delivery" primitive got more significant and
we actually got some repeated items. This tries to decrease likelihood
of repeated items. Of course that, if they actually make their way into
different aggregates this won't be effective, but same piece being added
in same aggregate should not happen anymore.

For this, we can just do the concat manually of both buffers to check if
a piece is already there
  • Loading branch information
vasco-santos authored Jan 15, 2024
1 parent d33b3a9 commit 9e64bab
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
12 changes: 9 additions & 3 deletions packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,20 @@ export async function getBufferedPieces(bufferPieces, bufferStore) {
bufferPieces.map((bufferPiece) => bufferStore.get(bufferPiece))
)

// Concatenate pieces and sort them by policy and size
// Concatenate pieces uniquely and sort them by policy and size
/** @type {BufferedPiece[]} */
let bufferedPieces = []
const uniquePieces = new Set()
for (const b of getBufferRes) {
if (b.error) return b
bufferedPieces = bufferedPieces.concat(b.ok.buffer.pieces || [])
for (const piece of b.ok.buffer.pieces) {
const isDuplicate = uniquePieces.has(piece.piece.toString())
if (!isDuplicate) {
bufferedPieces.push(piece)
uniquePieces.add(piece.piece.toString())
}
}
}

bufferedPieces.sort(sortPieces)

return {
Expand Down
24 changes: 24 additions & 0 deletions packages/filecoin-api/test/events/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { CBOR } from '@ucanto/core'
import * as API from '../../src/types.js'
import * as TestAPI from '../types.js'
import * as AggregatorEvents from '../../src/aggregator/events.js'
import { getBufferedPieces } from '../../src/aggregator/buffer-reducing.js'

import { FailingStore } from '../context/store.js'
import { FailingQueue } from '../context/queue.js'
Expand Down Expand Up @@ -175,6 +176,29 @@ export const test = {
bufferQueue: new FailingQueue(),
})
),
'handles buffer queue messages repeated items as unique': async (
assert,
context
) => {
const group = context.id.did()
const { buffers, blocks } = await getBuffers(1, group)

// Store buffers
for (let i = 0; i < blocks.length; i++) {
const putBufferRes = await context.bufferStore.put({
buffer: buffers[i],
block: blocks[i].cid,
})
assert.ok(putBufferRes.ok)
}

const bufferedPieces = await getBufferedPieces(
[blocks[0].cid, blocks[0].cid],
context.bufferStore
)

assert.equal(bufferedPieces.ok?.bufferedPieces.length, buffers[0].pieces.length)
},
'handles buffer queue messages successfully to requeue bigger buffer': async (
assert,
context
Expand Down

0 comments on commit 9e64bab

Please sign in to comment.