From 9e64babe93edeb474fc740d9be74c709d47bed1a Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 15 Jan 2024 10:36:50 +0100 Subject: [PATCH] fix: avoid duplicates on aggregator buffer concat (#1259) While we know that some queues/streams we use are "at least 1 delivery", we have barely seen any duplicate (see https://filecoinproject.slack.com/archives/C02BZPRS9HP/p1704731857750279). However, when we had a considerable amount of failures and retries, looks like the "at least 1 delivery" primitive got more significant and we actually got some repeated items. This tries to decrease likelihood of repeated items. Of course that, if they actually make their way into different aggregates this won't be effective, but same piece being added in same aggregate should not happen anymore. For this, we can just do the concat manually of both buffers to check if a piece is already there --- .../src/aggregator/buffer-reducing.js | 12 +++++++--- .../filecoin-api/test/events/aggregator.js | 24 +++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index fd134c541..c1262d0ab 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -227,14 +227,20 @@ export async function getBufferedPieces(bufferPieces, bufferStore) { bufferPieces.map((bufferPiece) => bufferStore.get(bufferPiece)) ) - // Concatenate pieces and sort them by policy and size + // Concatenate pieces uniquely and sort them by policy and size /** @type {BufferedPiece[]} */ let bufferedPieces = [] + const uniquePieces = new Set() for (const b of getBufferRes) { if (b.error) return b - bufferedPieces = bufferedPieces.concat(b.ok.buffer.pieces || []) + for (const piece of b.ok.buffer.pieces) { + const isDuplicate = uniquePieces.has(piece.piece.toString()) + if (!isDuplicate) { + bufferedPieces.push(piece) + uniquePieces.add(piece.piece.toString()) + } + } } - bufferedPieces.sort(sortPieces) return { diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index c15844c59..d2de21104 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -6,6 +6,7 @@ import { CBOR } from '@ucanto/core' import * as API from '../../src/types.js' import * as TestAPI from '../types.js' import * as AggregatorEvents from '../../src/aggregator/events.js' +import { getBufferedPieces } from '../../src/aggregator/buffer-reducing.js' import { FailingStore } from '../context/store.js' import { FailingQueue } from '../context/queue.js' @@ -175,6 +176,29 @@ export const test = { bufferQueue: new FailingQueue(), }) ), + 'handles buffer queue messages repeated items as unique': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(1, group) + + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + const bufferedPieces = await getBufferedPieces( + [blocks[0].cid, blocks[0].cid], + context.bufferStore + ) + + assert.equal(bufferedPieces.ok?.bufferedPieces.length, buffers[0].pieces.length) + }, 'handles buffer queue messages successfully to requeue bigger buffer': async ( assert, context