From cd97aefc278df875471324323206a77cf65e1641 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 12:34:33 +0530 Subject: [PATCH 01/16] chore: move SubscriptionManager to a separate file --- packages/interfaces/src/filter.ts | 5 + packages/sdk/src/index.ts | 2 +- .../sdk/src/protocols/filter/constants.ts | 5 + packages/sdk/src/protocols/filter/index.ts | 305 +++++++++++++++++ .../subscription_manager.ts} | 309 +----------------- packages/sdk/src/waku.ts | 2 +- 6 files changed, 323 insertions(+), 305 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/constants.ts create mode 100644 packages/sdk/src/protocols/filter/index.ts rename packages/sdk/src/protocols/{filter.ts => filter/subscription_manager.ts} (57%) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 67946a02a9..5b7a62f71e 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -12,6 +12,11 @@ import type { } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; +export type SubscriptionCallback = { + decoders: IDecoder[]; + callback: Callback; +}; + export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2bbdfc0d30..fdff4f4a8a 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -11,7 +11,7 @@ export * from "./waku.js"; export { createLightNode, defaultLibp2p } from "./create/index.js"; export { wakuLightPush } from "./protocols/light_push.js"; -export { wakuFilter } from "./protocols/filter.js"; +export { wakuFilter } from "./protocols/filter/index.js"; export { wakuStore } from "./protocols/store.js"; export * as waku from "@waku/core"; diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts new file mode 100644 index 0000000000..3889e7638b --- /dev/null +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -0,0 +1,5 @@ +export const DEFAULT_KEEP_ALIVE = 30 * 1000; + +export const DEFAULT_SUBSCRIBE_OPTIONS = { + keepAlive: DEFAULT_KEEP_ALIVE +}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts new file mode 100644 index 0000000000..d87cc97468 --- /dev/null +++ b/packages/sdk/src/protocols/filter/index.ts @@ -0,0 +1,305 @@ +import { ConnectionManager, FilterCore } from "@waku/core"; +import { + type Callback, + type CreateSubscriptionResult, + type IAsyncIterator, + type IDecodedMessage, + type IDecoder, + type IFilterSDK, + type Libp2p, + NetworkConfig, + type ProtocolCreateOptions, + ProtocolError, + type ProtocolUseOptions, + type PubsubTopic, + type SubscribeOptions, + SubscribeResult, + type Unsubscribe +} from "@waku/interfaces"; +import { + ensurePubsubTopicIsConfigured, + groupByContentTopic, + Logger, + shardInfoToPubsubTopics, + toAsyncIterator +} from "@waku/utils"; + +import { BaseProtocolSDK } from "../base_protocol.js"; + +import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { SubscriptionManager } from "./subscription_manager.js"; + +const log = new Logger("sdk:filter"); + +class FilterSDK extends BaseProtocolSDK implements IFilterSDK { + public readonly protocol: FilterCore; + + private activeSubscriptions = new Map(); + + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super( + new FilterCore( + async (pubsubTopic, wakuMessage, peerIdStr) => { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error( + `No subscription locally registered for topic ${pubsubTopic}` + ); + return; + } + + await subscription.processIncomingMessage(wakuMessage, peerIdStr); + }, + connectionManager.configuredPubsubTopics, + libp2p + ), + connectionManager, + { numPeersToUse: options?.numPeersToUse } + ); + + this.protocol = this.core as FilterCore; + + this.activeSubscriptions = new Map(); + } + + /** + * Opens a subscription with the Filter protocol using the provided decoders and callback. + * This method combines the functionality of creating a subscription and subscribing to it. + * + * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. + * @param {Callback} callback - The callback function to be invoked with decoded messages. + * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. + * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. + * + * @returns {Promise} A promise that resolves to an object containing: + * - subscription: The created subscription object if successful, or null if failed. + * - error: A ProtocolError if the subscription creation failed, or null if successful. + * - results: An object containing arrays of failures and successes from the subscription process. + * Only present if the subscription was created successfully. + * + * @throws {Error} If there's an unexpected error during the subscription process. + * + * @remarks + * This method attempts to create a subscription using the pubsub topic derived from the provided decoders, + * then tries to subscribe using the created subscription. The return value should be interpreted as follows: + * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely. + * - If `subscription` is non-null and `error` is null, the subscription was created successfully. + * In this case, check the `results` field for detailed information about successes and failures during the subscription process. + * - Even if the subscription was created successfully, there might be some failures in the results. + * + * @example + * ```typescript + * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback); + * if (!subscription || error) { + * console.error("Failed to create subscription:", error); + * } + * console.log("Subscription created successfully"); + * if (results.failures.length > 0) { + * console.warn("Some errors occurred during subscription:", results.failures); + * } + * console.log("Successful subscriptions:", results.successes); + * + * ``` + */ + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + protocolUseOptions?: ProtocolUseOptions, + subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); + + if (uniquePubsubTopics.length !== 1) { + return { + subscription: null, + error: ProtocolError.INVALID_DECODER_TOPICS, + results: null + }; + } + + const pubsubTopic = uniquePubsubTopics[0]; + + const { subscription, error } = await this.createSubscription( + pubsubTopic, + protocolUseOptions + ); + + if (error) { + return { + subscription: null, + error: error, + results: null + }; + } + + const { failures, successes } = await subscription.subscribe( + decoders, + callback, + subscribeOptions + ); + return { + subscription, + error: null, + results: { + failures: failures, + successes: successes + } + }; + } + + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ + private async createSubscription( + pubsubTopicShardInfo: NetworkConfig | PubsubTopic, + options?: ProtocolUseOptions + ): Promise { + options = { + autoRetry: true, + ...options + } as ProtocolUseOptions; + + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; + + ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); + + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { + return { + error: ProtocolError.NO_PEER_AVAILABLE, + subscription: null + }; + } + + log.info( + `Creating filter subscription with ${this.connectedPeers.length} peers: `, + this.connectedPeers.map((peer) => peer.id.toString()) + ); + + const subscription = + this.getActiveSubscription(pubsubTopic) ?? + this.setActiveSubscription( + pubsubTopic, + new SubscriptionManager( + pubsubTopic, + this.protocol, + () => this.connectedPeers, + this.renewPeer.bind(this) + ) + ); + + return { + error: null, + subscription + }; + } + + /** + * This method is used to satisfy the `IReceiver` interface. + * + * @hidden + * + * @param decoders The decoders to use for the subscription. + * @param callback The callback function to use for the subscription. + * @param opts Optional protocol options for the subscription. + * + * @returns A Promise that resolves to a function that unsubscribes from the subscription. + * + * @remarks + * This method should not be used directly. + * Instead, use `createSubscription` to create a new subscription. + */ + public async subscribeWithUnsubscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); + + if (uniquePubsubTopics.length === 0) { + throw Error( + "Failed to subscribe: no pubsubTopic found on decoders provided." + ); + } + + if (uniquePubsubTopics.length > 1) { + throw Error( + "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." + ); + } + + const { subscription, error } = await this.createSubscription( + uniquePubsubTopics[0] + ); + + if (error) { + throw Error(`Failed to create subscription: ${error}`); + } + + await subscription.subscribe(decoders, callback, options); + + const contentTopics = Array.from( + groupByContentTopic( + Array.isArray(decoders) ? decoders : [decoders] + ).keys() + ); + + return async () => { + await subscription.unsubscribe(contentTopics); + }; + } + + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[] + ): Promise> { + return toAsyncIterator(this, decoders); + } + + //TODO: move to SubscriptionManager + private getActiveSubscription( + pubsubTopic: PubsubTopic + ): SubscriptionManager | undefined { + return this.activeSubscriptions.get(pubsubTopic); + } + + private setActiveSubscription( + pubsubTopic: PubsubTopic, + subscription: SubscriptionManager + ): SubscriptionManager { + this.activeSubscriptions.set(pubsubTopic, subscription); + return subscription; + } + + private getUniquePubsubTopics( + decoders: IDecoder | IDecoder[] + ): string[] { + if (!Array.isArray(decoders)) { + return [decoders.pubsubTopic]; + } + + if (decoders.length === 0) { + return []; + } + + const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); + + return [...pubsubTopics]; + } +} + +export function wakuFilter( + connectionManager: ConnectionManager, + init?: ProtocolCreateOptions +): (libp2p: Libp2p) => IFilterSDK { + return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); +} diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts similarity index 57% rename from packages/sdk/src/protocols/filter.ts rename to packages/sdk/src/protocols/filter/subscription_manager.ts index 18f1dc7521..b8e814d1e8 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,46 +1,26 @@ import type { Peer } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, FilterCore } from "@waku/core"; +import { FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, - type CreateSubscriptionResult, - type IAsyncIterator, type IDecodedMessage, type IDecoder, - type IFilterSDK, type IProtoMessage, type ISubscriptionSDK, - type Libp2p, - NetworkConfig, type PeerIdStr, - type ProtocolCreateOptions, ProtocolError, - type ProtocolUseOptions, type PubsubTopic, type SDKProtocolResult, type SubscribeOptions, - SubscribeResult, - type Unsubscribe + SubscriptionCallback } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; -import { - ensurePubsubTopicIsConfigured, - groupByContentTopic, - Logger, - shardInfoToPubsubTopics, - toAsyncIterator -} from "@waku/utils"; - -import { BaseProtocolSDK } from "./base_protocol.js"; - -type SubscriptionCallback = { - decoders: IDecoder[]; - callback: Callback; -}; +import { groupByContentTopic, Logger } from "@waku/utils"; +import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; type ReceivedMessageHashes = { all: Set; nodes: { @@ -48,15 +28,11 @@ type ReceivedMessageHashes = { }; }; -const log = new Logger("sdk:filter"); - const DEFAULT_MAX_PINGS = 3; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; -const DEFAULT_KEEP_ALIVE = 30 * 1000; -const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE -}; +const log = new Logger("sdk:filter:subscription_manager"); + export class SubscriptionManager implements ISubscriptionSDK { private readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; @@ -414,279 +390,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } -class FilterSDK extends BaseProtocolSDK implements IFilterSDK { - public readonly protocol: FilterCore; - - private activeSubscriptions = new Map(); - - public constructor( - connectionManager: ConnectionManager, - libp2p: Libp2p, - options?: ProtocolCreateOptions - ) { - super( - new FilterCore( - async (pubsubTopic, wakuMessage, peerIdStr) => { - const subscription = this.getActiveSubscription(pubsubTopic); - if (!subscription) { - log.error( - `No subscription locally registered for topic ${pubsubTopic}` - ); - return; - } - - await subscription.processIncomingMessage(wakuMessage, peerIdStr); - }, - connectionManager.configuredPubsubTopics, - libp2p - ), - connectionManager, - { numPeersToUse: options?.numPeersToUse } - ); - - this.protocol = this.core as FilterCore; - - this.activeSubscriptions = new Map(); - } - - /** - * Opens a subscription with the Filter protocol using the provided decoders and callback. - * This method combines the functionality of creating a subscription and subscribing to it. - * - * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. - * @param {Callback} callback - The callback function to be invoked with decoded messages. - * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. - * - * @returns {Promise} A promise that resolves to an object containing: - * - subscription: The created subscription object if successful, or null if failed. - * - error: A ProtocolError if the subscription creation failed, or null if successful. - * - results: An object containing arrays of failures and successes from the subscription process. - * Only present if the subscription was created successfully. - * - * @throws {Error} If there's an unexpected error during the subscription process. - * - * @remarks - * This method attempts to create a subscription using the pubsub topic derived from the provided decoders, - * then tries to subscribe using the created subscription. The return value should be interpreted as follows: - * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely. - * - If `subscription` is non-null and `error` is null, the subscription was created successfully. - * In this case, check the `results` field for detailed information about successes and failures during the subscription process. - * - Even if the subscription was created successfully, there might be some failures in the results. - * - * @example - * ```typescript - * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback); - * if (!subscription || error) { - * console.error("Failed to create subscription:", error); - * } - * console.log("Subscription created successfully"); - * if (results.failures.length > 0) { - * console.warn("Some errors occurred during subscription:", results.failures); - * } - * console.log("Successful subscriptions:", results.successes); - * - * ``` - */ - public async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - protocolUseOptions?: ProtocolUseOptions, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - - if (uniquePubsubTopics.length !== 1) { - return { - subscription: null, - error: ProtocolError.INVALID_DECODER_TOPICS, - results: null - }; - } - - const pubsubTopic = uniquePubsubTopics[0]; - - const { subscription, error } = await this.createSubscription( - pubsubTopic, - protocolUseOptions - ); - - if (error) { - return { - subscription: null, - error: error, - results: null - }; - } - - const { failures, successes } = await subscription.subscribe( - decoders, - callback, - subscribeOptions - ); - return { - subscription, - error: null, - results: { - failures: failures, - successes: successes - } - }; - } - - /** - * Creates a new subscription to the given pubsub topic. - * The subscription is made to multiple peers for decentralization. - * @param pubsubTopicShardInfo The pubsub topic to subscribe to. - * @returns The subscription object. - */ - private async createSubscription( - pubsubTopicShardInfo: NetworkConfig | PubsubTopic, - options?: ProtocolUseOptions - ): Promise { - options = { - autoRetry: true, - ...options - } as ProtocolUseOptions; - - const pubsubTopic = - typeof pubsubTopicShardInfo == "string" - ? pubsubTopicShardInfo - : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; - - ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - - const hasPeers = await this.hasPeers(options); - if (!hasPeers) { - return { - error: ProtocolError.NO_PEER_AVAILABLE, - subscription: null - }; - } - - log.info( - `Creating filter subscription with ${this.connectedPeers.length} peers: `, - this.connectedPeers.map((peer) => peer.id.toString()) - ); - - const subscription = - this.getActiveSubscription(pubsubTopic) ?? - this.setActiveSubscription( - pubsubTopic, - new SubscriptionManager( - pubsubTopic, - this.protocol, - () => this.connectedPeers, - this.renewPeer.bind(this) - ) - ); - - return { - error: null, - subscription - }; - } - - /** - * This method is used to satisfy the `IReceiver` interface. - * - * @hidden - * - * @param decoders The decoders to use for the subscription. - * @param callback The callback function to use for the subscription. - * @param opts Optional protocol options for the subscription. - * - * @returns A Promise that resolves to a function that unsubscribes from the subscription. - * - * @remarks - * This method should not be used directly. - * Instead, use `createSubscription` to create a new subscription. - */ - public async subscribeWithUnsubscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - - if (uniquePubsubTopics.length === 0) { - throw Error( - "Failed to subscribe: no pubsubTopic found on decoders provided." - ); - } - - if (uniquePubsubTopics.length > 1) { - throw Error( - "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." - ); - } - - const { subscription, error } = await this.createSubscription( - uniquePubsubTopics[0] - ); - - if (error) { - throw Error(`Failed to create subscription: ${error}`); - } - - await subscription.subscribe(decoders, callback, options); - - const contentTopics = Array.from( - groupByContentTopic( - Array.isArray(decoders) ? decoders : [decoders] - ).keys() - ); - - return async () => { - await subscription.unsubscribe(contentTopics); - }; - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[] - ): Promise> { - return toAsyncIterator(this, decoders); - } - - //TODO: move to SubscriptionManager - private getActiveSubscription( - pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { - return this.activeSubscriptions.get(pubsubTopic); - } - - private setActiveSubscription( - pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { - this.activeSubscriptions.set(pubsubTopic, subscription); - return subscription; - } - - private getUniquePubsubTopics( - decoders: IDecoder | IDecoder[] - ): string[] { - if (!Array.isArray(decoders)) { - return [decoders.pubsubTopic]; - } - - if (decoders.length === 0) { - return []; - } - - const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); - - return [...pubsubTopics]; - } -} - -export function wakuFilter( - connectionManager: ConnectionManager, - init?: ProtocolCreateOptions -): (libp2p: Libp2p) => IFilterSDK { - return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); -} - async function pushMessage( subscriptionCallback: SubscriptionCallback, pubsubTopic: PubsubTopic, diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index ae79a71849..266c62494b 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,7 +17,7 @@ import { Protocols } from "@waku/interfaces"; import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; -import { wakuFilter } from "./protocols/filter.js"; +import { wakuFilter } from "./protocols/filter/index.js"; import { wakuLightPush } from "./protocols/light_push.js"; import { wakuStore } from "./protocols/store.js"; From 4f4b91a5a161746fae3d5c11442c19a6aa1936da Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 13:16:07 +0530 Subject: [PATCH 02/16] feat: introduce ReliabilityMonitor --- .../protocols/filter/reliability_monitor.ts | 114 ++++++++++++++++++ .../protocols/filter/subscription_manager.ts | 112 ++++------------- 2 files changed, 139 insertions(+), 87 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/reliability_monitor.ts diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts new file mode 100644 index 0000000000..557e79c440 --- /dev/null +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -0,0 +1,114 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { IProtoMessage, PeerIdStr, PubsubTopic } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +type ReceivedMessageHashes = { + all: Set; + nodes: { + [peerId: PeerIdStr]: Set; + }; +}; + +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; + +const log = new Logger("sdk:filter:reliability_monitor"); + +export class ReliabilityMonitor { + public receivedMessagesHashStr: string[] = []; + public receivedMessagesHashes: ReceivedMessageHashes; + public missedMessagesByPeer: Map = new Map(); + public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + + public constructor( + private getPeers: () => Peer[], + private renewAndSubscribePeer: (peerId: PeerId) => Promise + ) { + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); + + this.receivedMessagesHashes = { + all: new Set(), + nodes: { + ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) + } + }; + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); + } + + public setMaxMissedMessagesThreshold(value: number | undefined): void { + if (value === undefined) { + return; + } + this.maxMissedMessagesThreshold = value; + } + + public get messageHashes(): string[] { + return [...this.receivedMessagesHashes.all]; + } + + public addMessage( + message: WakuMessage, + pubsubTopic: PubsubTopic, + peerIdStr?: string + ): boolean { + const hashedMessageStr = messageHashStr( + pubsubTopic, + message as IProtoMessage + ); + + this.receivedMessagesHashes.all.add(hashedMessageStr); + + if (peerIdStr) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); + } + + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + return false; + } else { + this.receivedMessagesHashStr.push(hashedMessageStr); + return true; + } + } + + public async validateMessage(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } +} diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index b8e814d1e8..909c4604e5 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -16,31 +16,21 @@ import { type SubscribeOptions, SubscriptionCallback } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; -type ReceivedMessageHashes = { - all: Set; - nodes: { - [peerId: PeerIdStr]: Set; - }; -}; +import { ReliabilityMonitor } from "./reliability_monitor.js"; const DEFAULT_MAX_PINGS = 3; -const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscriptionSDK { - private readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; - private readonly receivedMessagesHashes: ReceivedMessageHashes; private peerFailures: Map = new Map(); - private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private reliabilityMonitor: ReliabilityMonitor; private subscriptionCallbacks: Map< ContentTopic, @@ -55,26 +45,10 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - } - - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; - } - - private addHash(hash: string, peerIdStr?: string): void { - this.receivedMessagesHashes.all.add(hash); - - if (peerIdStr) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); - } + this.reliabilityMonitor = new ReliabilityMonitor( + getPeers.bind(this), + this.renewAndSubscribePeer.bind(this) + ); } public async subscribe( @@ -84,9 +58,9 @@ export class SubscriptionManager implements ISubscriptionSDK { ): Promise { this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - this.maxMissedMessagesThreshold = - options.maxMissedMessagesThreshold || - DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + this.reliabilityMonitor.setMaxMissedMessagesThreshold( + options.maxMissedMessagesThreshold + ); const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -194,54 +168,21 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - private async validateMessage(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - } - public async processIncomingMessage( message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const hashedMessageStr = messageHashStr( + const includesMessage = this.reliabilityMonitor.addMessage( + message, this.pubsubTopic, - message as IProtoMessage + peerIdStr ); + void this.reliabilityMonitor.validateMessage(); - this.addHash(hashedMessageStr, peerIdStr); - void this.validateMessage(); - - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + if (includesMessage) { log.info("Message already received, skipping"); return; } - this.receivedMessagesHashStr.push(hashedMessageStr); const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); @@ -340,8 +281,13 @@ export class SubscriptionManager implements ISubscriptionSDK { Array.from(this.subscriptionCallbacks.keys()) ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + this.reliabilityMonitor.receivedMessagesHashes.nodes[ + newPeer.id.toString() + ] = new Set(); + this.reliabilityMonitor.missedMessagesByPeer.set( + newPeer.id.toString(), + 0 + ); return newPeer; } catch (error) { @@ -349,8 +295,10 @@ export class SubscriptionManager implements ISubscriptionSDK { return; } finally { this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString()); + delete this.reliabilityMonitor.receivedMessagesHashes.nodes[ + peerId.toString() + ]; } } @@ -378,16 +326,6 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } async function pushMessage( From 165f48d96a69844e6a0706c2c406afd65440a271 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 13:36:41 +0530 Subject: [PATCH 03/16] fix: peer data updates --- .../sdk/src/protocols/filter/subscription_manager.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 909c4604e5..a0ef6cb2b4 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -289,16 +289,16 @@ export class SubscriptionManager implements ISubscriptionSDK { 0 ); - return newPeer; - } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); - return; - } finally { this.peerFailures.delete(peerId.toString()); this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString()); delete this.reliabilityMonitor.receivedMessagesHashes.nodes[ peerId.toString() ]; + + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; } } From 2b6177463c9b619a2fc77292aaad4dc33a19c4a5 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 13:40:36 +0530 Subject: [PATCH 04/16] fix: logical error when returning includesMessage --- packages/sdk/src/protocols/filter/reliability_monitor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index 557e79c440..b2093f836d 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -64,10 +64,10 @@ export class ReliabilityMonitor { } if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { - return false; + return true; } else { this.receivedMessagesHashStr.push(hashedMessageStr); - return true; + return false; } } From f387f5974d8b45f5145c2fad532cef1e8ee14351 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 15:56:12 +0530 Subject: [PATCH 05/16] chore: handle edge case --- .../sdk/src/protocols/filter/reliability_monitor.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index b2093f836d..a8e3b2dd63 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -6,9 +6,7 @@ import { Logger } from "@waku/utils"; type ReceivedMessageHashes = { all: Set; - nodes: { - [peerId: PeerIdStr]: Set; - }; + nodes: Record>; }; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; @@ -60,6 +58,13 @@ export class ReliabilityMonitor { this.receivedMessagesHashes.all.add(hashedMessageStr); if (peerIdStr) { + const x = this.receivedMessagesHashes.nodes[peerIdStr]; + if (!x) { + log.warn( + `Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.` + ); + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); + } this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); } From bbbeb1ac82c4c1e00eb073eb6c2505377c5ff6d0 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 14:46:59 +0530 Subject: [PATCH 06/16] chore: move ping failures handling inside monitor --- .../protocols/filter/reliability_monitor.ts | 40 ++++++++++++++++++- .../protocols/filter/subscription_manager.ts | 39 +++++------------- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index a8e3b2dd63..0b5564ca05 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -1,5 +1,10 @@ import type { Peer, PeerId } from "@libp2p/interface"; -import { IProtoMessage, PeerIdStr, PubsubTopic } from "@waku/interfaces"; +import { + CoreProtocolResult, + IProtoMessage, + PeerIdStr, + PubsubTopic +} from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -13,11 +18,15 @@ const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const log = new Logger("sdk:filter:reliability_monitor"); +const DEFAULT_MAX_PINGS = 3; + export class ReliabilityMonitor { public receivedMessagesHashStr: string[] = []; public receivedMessagesHashes: ReceivedMessageHashes; public missedMessagesByPeer: Map = new Map(); public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + public peerFailures: Map = new Map(); + private maxPingFailures: number = DEFAULT_MAX_PINGS; public constructor( private getPeers: () => Peer[], @@ -41,6 +50,13 @@ export class ReliabilityMonitor { this.maxMissedMessagesThreshold = value; } + public setMaxPingFailures(value: number | undefined): void { + if (value === undefined) { + return; + } + this.maxPingFailures = value; + } + public get messageHashes(): string[] { return [...this.receivedMessagesHashes.all]; } @@ -107,6 +123,28 @@ export class ReliabilityMonitor { } } + public async handlePingResult( + peerId: PeerId, + result?: CoreProtocolResult + ): Promise { + if (result?.success) { + this.peerFailures.delete(peerId.toString()); + return; + } + + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this.renewAndSubscribePeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } + } + private incrementMissedMessageCount(peerIdStr: string): void { const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index a0ef6cb2b4..4cf51146dd 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -22,16 +22,12 @@ import { groupByContentTopic, Logger } from "@waku/utils"; import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; import { ReliabilityMonitor } from "./reliability_monitor.js"; -const DEFAULT_MAX_PINGS = 3; - const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscriptionSDK { - private keepAliveTimer: number | null = null; - private peerFailures: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; private reliabilityMonitor: ReliabilityMonitor; + private keepAliveTimer: number | null = null; private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -56,11 +52,13 @@ export class SubscriptionManager implements ISubscriptionSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { - this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; this.reliabilityMonitor.setMaxMissedMessagesThreshold( + options.pingsBeforePeerRenewed + ); + this.reliabilityMonitor.setMaxPingFailures( options.maxMissedMessagesThreshold ); + this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -236,16 +234,11 @@ export class SubscriptionManager implements ISubscriptionSDK { }; } + let result; try { - const result = await this.protocol.ping(peer); - if (result.failure) { - await this.handlePeerFailure(peerId); - } else { - this.peerFailures.delete(peerId.toString()); - } + result = await this.protocol.ping(peer); return result; } catch (error) { - await this.handlePeerFailure(peerId); return { success: null, failure: { @@ -253,20 +246,8 @@ export class SubscriptionManager implements ISubscriptionSDK { error: ProtocolError.GENERIC_FAIL } }; - } - } - - private async handlePeerFailure(peerId: PeerId): Promise { - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures > this.maxPingFailures) { - try { - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } + } finally { + await this.reliabilityMonitor.handlePingResult(peerId, result); } } @@ -289,7 +270,7 @@ export class SubscriptionManager implements ISubscriptionSDK { 0 ); - this.peerFailures.delete(peerId.toString()); + this.reliabilityMonitor.peerFailures.delete(peerId.toString()); this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString()); delete this.reliabilityMonitor.receivedMessagesHashes.nodes[ peerId.toString() From e9a1d2d1c2f7f1545b243e90ed6c90b8528bbc6f Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 15:23:44 +0530 Subject: [PATCH 07/16] chore: move renewal logic to monitor --- .../protocols/filter/reliability_monitor.ts | 72 ++++++++++++++++++- .../protocols/filter/subscription_manager.ts | 49 ++++--------- 2 files changed, 82 insertions(+), 39 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index 0b5564ca05..616dacdd49 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -1,5 +1,6 @@ import type { Peer, PeerId } from "@libp2p/interface"; import { + ContentTopic, CoreProtocolResult, IProtoMessage, PeerIdStr, @@ -20,7 +21,42 @@ const log = new Logger("sdk:filter:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; -export class ReliabilityMonitor { +export class ReliabilityMonitorManager { + private static receiverMonitors: Map< + PubsubTopic, + ReceiverReliabilityMonitor + > = new Map(); + + public static createReceiverMonitor( + pubsubTopic: PubsubTopic, + getPeers: () => Peer[], + renewPeer: (peerId: PeerId) => Promise, + getContentTopics: () => ContentTopic[], + protocolSubscribe: ( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ) => Promise + ): ReceiverReliabilityMonitor { + if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { + return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; + } + + const monitor = new ReceiverReliabilityMonitor( + pubsubTopic, + getPeers, + renewPeer, + getContentTopics, + protocolSubscribe + ); + ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); + return monitor; + } + + private constructor() {} +} + +export class ReceiverReliabilityMonitor { public receivedMessagesHashStr: string[] = []; public receivedMessagesHashes: ReceivedMessageHashes; public missedMessagesByPeer: Map = new Map(); @@ -29,8 +65,15 @@ export class ReliabilityMonitor { private maxPingFailures: number = DEFAULT_MAX_PINGS; public constructor( + private readonly pubsubTopic: PubsubTopic, private getPeers: () => Peer[], - private renewAndSubscribePeer: (peerId: PeerId) => Promise + private renewPeer: (peerId: PeerId) => Promise, + private getContentTopics: () => ContentTopic[], + private protocolSubscribe: ( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ) => Promise ) { const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); @@ -145,6 +188,31 @@ export class ReliabilityMonitor { } } + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + await this.protocolSubscribe( + this.pubsubTopic, + newPeer, + this.getContentTopics() + ); + + this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); + this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + + this.peerFailures.delete(peerId.toString()); + this.missedMessagesByPeer.delete(peerId.toString()); + delete this.receivedMessagesHashes.nodes[peerId.toString()]; + + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } + } + private incrementMissedMessageCount(peerIdStr: string): void { const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 4cf51146dd..25ec5519f8 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -20,12 +20,15 @@ import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; -import { ReliabilityMonitor } from "./reliability_monitor.js"; +import { + ReceiverReliabilityMonitor, + ReliabilityMonitorManager +} from "./reliability_monitor.js"; const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscriptionSDK { - private reliabilityMonitor: ReliabilityMonitor; + private reliabilityMonitor: ReceiverReliabilityMonitor; private keepAliveTimer: number | null = null; private subscriptionCallbacks: Map< @@ -41,9 +44,13 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); - this.reliabilityMonitor = new ReliabilityMonitor( - getPeers.bind(this), - this.renewAndSubscribePeer.bind(this) + + this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor( + this.pubsubTopic, + this.getPeers.bind(this), + this.renewPeer.bind(this), + () => Array.from(this.subscriptionCallbacks.keys()), + this.protocol.subscribe.bind(this.protocol) ); } @@ -251,38 +258,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - try { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - - this.reliabilityMonitor.receivedMessagesHashes.nodes[ - newPeer.id.toString() - ] = new Set(); - this.reliabilityMonitor.missedMessagesByPeer.set( - newPeer.id.toString(), - 0 - ); - - this.reliabilityMonitor.peerFailures.delete(peerId.toString()); - this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString()); - delete this.reliabilityMonitor.receivedMessagesHashes.nodes[ - peerId.toString() - ]; - - return newPeer; - } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); - return; - } - } - private startKeepAlivePings(options: SubscribeOptions): void { const { keepAlive } = options; if (this.keepAliveTimer) { From d7933d255d8cdc186ca176abc8e42c7d9b608969 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 15:30:53 +0530 Subject: [PATCH 08/16] chore: improve structuring --- .../protocols/filter/reliability_monitor.ts | 72 +++++++++++-------- .../protocols/filter/subscription_manager.ts | 5 +- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index 616dacdd49..b8c309ec90 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -57,11 +57,11 @@ export class ReliabilityMonitorManager { } export class ReceiverReliabilityMonitor { - public receivedMessagesHashStr: string[] = []; - public receivedMessagesHashes: ReceivedMessageHashes; - public missedMessagesByPeer: Map = new Map(); - public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; - public peerFailures: Map = new Map(); + private receivedMessagesHashStr: string[] = []; + private receivedMessagesHashes: ReceivedMessageHashes; + private missedMessagesByPeer: Map = new Map(); + private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private peerFailures: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; public constructor( @@ -100,11 +100,43 @@ export class ReceiverReliabilityMonitor { this.maxPingFailures = value; } - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; + public async handlePingResult( + peerId: PeerId, + result?: CoreProtocolResult + ): Promise { + if (result?.success) { + this.peerFailures.delete(peerId.toString()); + return; + } + + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this.renewAndSubscribePeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } + } + + public processIncomingMessage( + message: WakuMessage, + pubsubTopic: PubsubTopic, + peerIdStr?: string + ): boolean { + const alreadyReceived = this.addMessageToCache( + message, + pubsubTopic, + peerIdStr + ); + void this.checkAndRenewPeers(); + return alreadyReceived; } - public addMessage( + private addMessageToCache( message: WakuMessage, pubsubTopic: PubsubTopic, peerIdStr?: string @@ -135,7 +167,7 @@ export class ReceiverReliabilityMonitor { } } - public async validateMessage(): Promise { + private async checkAndRenewPeers(): Promise { for (const hash of this.receivedMessagesHashes.all) { for (const [peerIdStr, hashes] of Object.entries( this.receivedMessagesHashes.nodes @@ -166,28 +198,6 @@ export class ReceiverReliabilityMonitor { } } - public async handlePingResult( - peerId: PeerId, - result?: CoreProtocolResult - ): Promise { - if (result?.success) { - this.peerFailures.delete(peerId.toString()); - return; - } - - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures > this.maxPingFailures) { - try { - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - private async renewAndSubscribePeer( peerId: PeerId ): Promise { diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 25ec5519f8..6374bf38a4 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -177,14 +177,13 @@ export class SubscriptionManager implements ISubscriptionSDK { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const includesMessage = this.reliabilityMonitor.addMessage( + const alreadyReceived = this.reliabilityMonitor.processIncomingMessage( message, this.pubsubTopic, peerIdStr ); - void this.reliabilityMonitor.validateMessage(); - if (includesMessage) { + if (alreadyReceived) { log.info("Message already received, skipping"); return; } From 531d8ee7efe2e0189bc2e4671583257b39b010cf Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 15:36:02 +0530 Subject: [PATCH 09/16] chore: update logger --- packages/sdk/src/protocols/filter/reliability_monitor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index b8c309ec90..bc292017ed 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -17,7 +17,7 @@ type ReceivedMessageHashes = { const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; -const log = new Logger("sdk:filter:reliability_monitor"); +const log = new Logger("sdk:receiver:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; From dd93b90e36464254d1d2f6e5e13d1d93e9654831 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 16:22:58 +0530 Subject: [PATCH 10/16] chore: readd connectionListener() logic from merge --- .../protocols/filter/subscription_manager.ts | 58 +++++++++++++++---- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index f809ded10f..fa54ff31f1 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,10 +1,11 @@ import type { Peer } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import { FilterCore } from "@waku/core"; +import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, + EConnectionStateEvents, type IDecodedMessage, type IDecoder, type IProtoMessage, @@ -39,6 +40,7 @@ export class SubscriptionManager implements ISubscriptionSDK { public constructor( private readonly pubsubTopic: PubsubTopic, private readonly protocol: FilterCore, + private readonly connectionManager: ConnectionManager, private readonly getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { @@ -110,9 +112,7 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - if (options.keepAlive) { - this.startKeepAlivePings(options.keepAlive); - } + this.startSubscriptionsMaintenance(this.keepAliveTimer); return finalResult; } @@ -138,9 +138,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const finalResult = this.handleResult(results, "unsubscribe"); if (this.subscriptionCallbacks.size === 0) { - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } + this.stopSubscriptionsMaintenance(); } return finalResult; @@ -166,9 +164,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const finalResult = this.handleResult(results, "unsubscribeAll"); - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } + this.stopSubscriptionsMaintenance(); return finalResult; } @@ -257,6 +253,48 @@ export class SubscriptionManager implements ISubscriptionSDK { } } + private startSubscriptionsMaintenance(interval: number): void { + this.startKeepAlivePings(interval); + this.startConnectionListener(); + } + + private stopSubscriptionsMaintenance(): void { + this.stopKeepAlivePings(); + this.stopConnectionListener(); + } + + private startConnectionListener(): void { + this.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.connectionListener.bind(this) as (v: CustomEvent) => void + ); + } + + private stopConnectionListener(): void { + this.connectionManager.removeEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.connectionListener.bind(this) as (v: CustomEvent) => void + ); + } + + private async connectionListener({ + detail: isConnected + }: CustomEvent): Promise { + if (!isConnected) { + this.stopKeepAlivePings(); + return; + } + + try { + // we do nothing here, as the renewal process is managed internally by `this.ping()` + await this.ping(); + } catch (err) { + log.error(`networkStateListener failed to recover: ${err}`); + } + + this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE); + } + private startKeepAlivePings(interval: number): void { if (this.keepAliveTimer) { log.info("Recurring pings already set up."); From e241aef5f4cfe5b7fffbc85dcc1f75c2b900bc36 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 16:35:57 +0530 Subject: [PATCH 11/16] chore: minor fixes --- packages/sdk/src/protocols/base_protocol.ts | 2 +- packages/sdk/src/protocols/filter/index.ts | 1 + packages/sdk/src/protocols/filter/reliability_monitor.ts | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 85ef746e1d..52009aa05c 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -33,7 +33,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public constructor( protected core: BaseProtocol, - private connectionManager: ConnectionManager, + protected connectionManager: ConnectionManager, options: Options ) { this.log = new Logger(`sdk:${core.multicodec}`); diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index d87cc97468..c8840ea0e4 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -193,6 +193,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { new SubscriptionManager( pubsubTopic, this.protocol, + this.connectionManager, () => this.connectedPeers, this.renewPeer.bind(this) ) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index bc292017ed..b038423449 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -149,8 +149,8 @@ export class ReceiverReliabilityMonitor { this.receivedMessagesHashes.all.add(hashedMessageStr); if (peerIdStr) { - const x = this.receivedMessagesHashes.nodes[peerIdStr]; - if (!x) { + const hashesForPeer = this.receivedMessagesHashes.nodes[peerIdStr]; + if (!hashesForPeer) { log.warn( `Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.` ); From 028c6959e7112c4933f248d1b3baac0a14c44d24 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 12 Sep 2024 12:01:06 +0530 Subject: [PATCH 12/16] chore: improve --- .../sdk/src/protocols/filter/reliability_monitor.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index b038423449..5b11e9df5e 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -57,7 +57,6 @@ export class ReliabilityMonitorManager { } export class ReceiverReliabilityMonitor { - private receivedMessagesHashStr: string[] = []; private receivedMessagesHashes: ReceivedMessageHashes; private missedMessagesByPeer: Map = new Map(); private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; @@ -146,6 +145,8 @@ export class ReceiverReliabilityMonitor { message as IProtoMessage ); + const alreadyReceived = + this.receivedMessagesHashes.all.has(hashedMessageStr); this.receivedMessagesHashes.all.add(hashedMessageStr); if (peerIdStr) { @@ -159,12 +160,7 @@ export class ReceiverReliabilityMonitor { this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); } - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { - return true; - } else { - this.receivedMessagesHashStr.push(hashedMessageStr); - return false; - } + return alreadyReceived; } private async checkAndRenewPeers(): Promise { From 5970e658178a73c53f5d2c7abd8b07213e0640c6 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 12 Sep 2024 12:10:43 +0530 Subject: [PATCH 13/16] chore: setup destruction of ReliabilityMonitors --- .../sdk/src/protocols/filter/reliability_monitor.ts | 12 ++++++++++++ packages/sdk/src/waku.ts | 2 ++ 2 files changed, 14 insertions(+) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index 5b11e9df5e..b2a101cc3f 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -54,6 +54,18 @@ export class ReliabilityMonitorManager { } private constructor() {} + + public static destroy(pubsubTopic: PubsubTopic): void { + this.receiverMonitors.delete(pubsubTopic); + } + + public static destroyAll(): void { + for (const [pubsubTopic, monitor] of this.receiverMonitors) { + monitor.setMaxMissedMessagesThreshold(undefined); + monitor.setMaxPingFailures(undefined); + this.receiverMonitors.delete(pubsubTopic); + } + } } export class ReceiverReliabilityMonitor { diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index d63aa8fc34..57c9c16d2a 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,6 +17,7 @@ import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; +import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js"; import { wakuLightPush } from "./protocols/light_push.js"; import { wakuStore } from "./protocols/store.js"; @@ -195,6 +196,7 @@ export class WakuNode implements Waku { } public async stop(): Promise { + ReliabilityMonitorManager.destroyAll(); this.connectionManager.stop(); await this.libp2p.stop(); } From 2eddaf54af7234953f2bbb45df3d67360d3f72d7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 12 Sep 2024 17:10:53 +0530 Subject: [PATCH 14/16] fix: condition for ping failure --- packages/sdk/src/protocols/filter/reliability_monitor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index b2a101cc3f..d988527cae 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -123,7 +123,7 @@ export class ReceiverReliabilityMonitor { const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; this.peerFailures.set(peerId.toString(), failures); - if (failures > this.maxPingFailures) { + if (failures >= this.maxPingFailures) { try { await this.renewAndSubscribePeer(peerId); this.peerFailures.delete(peerId.toString()); From 38a5e41a476ba984d9241486cd01a505036a8ddf Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 12 Sep 2024 17:36:25 +0530 Subject: [PATCH 15/16] fix: setters --- packages/sdk/src/protocols/filter/subscription_manager.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index fa54ff31f1..6d3d50f13b 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -62,11 +62,9 @@ export class SubscriptionManager implements ISubscriptionSDK { options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { this.reliabilityMonitor.setMaxMissedMessagesThreshold( - options.pingsBeforePeerRenewed - ); - this.reliabilityMonitor.setMaxPingFailures( options.maxMissedMessagesThreshold ); + this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; From 8da3abb09b9c256fb1be4b4b06d80c16529211c7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 13 Sep 2024 14:05:16 +0530 Subject: [PATCH 16/16] chore: handle race condition & fix test --- .../sdk/src/protocols/filter/reliability_monitor.ts | 10 ++++++++++ packages/tests/tests/filter/peer_management.spec.ts | 5 ++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts index d988527cae..84d386f716 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -74,6 +74,7 @@ export class ReceiverReliabilityMonitor { private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; private peerFailures: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; + private peerRenewalLocks: Set = new Set(); public constructor( private readonly pubsubTopic: PubsubTopic, @@ -210,6 +211,13 @@ export class ReceiverReliabilityMonitor { peerId: PeerId ): Promise { try { + if (this.peerRenewalLocks.has(peerId.toString())) { + log.info(`Peer ${peerId.toString()} is already being renewed.`); + return; + } + + this.peerRenewalLocks.add(peerId.toString()); + const newPeer = await this.renewPeer(peerId); await this.protocolSubscribe( this.pubsubTopic, @@ -228,6 +236,8 @@ export class ReceiverReliabilityMonitor { } catch (error) { log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); return; + } finally { + this.peerRenewalLocks.delete(peerId.toString()); } } diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index dee9aa33d9..13e7baba05 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -187,9 +187,12 @@ describe("Waku Filter: Peer Management: E2E", function () { // One more failure should trigger renewal await subscription.ping(targetPeer.id); + // adds delay as renewal happens as an async operation in the bg + await delay(300); + expect( waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id)) - ).to.be.false; + ).to.eq(false); expect(waku.filter.connectedPeers.length).to.equal( waku.filter.numPeersToUse );