Skip to content

Commit

Permalink
feat: add msgIdToStrFn option (#270)
Browse files Browse the repository at this point in the history
* chore: add msgIdToStrFn option

* chore: correct comments

* chore: msgIdToStrFn as the last param of mcache constructor
  • Loading branch information
twoeths committed Jun 13, 2022
1 parent 840883d commit 7f475c5
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 22 deletions.
26 changes: 17 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ import {
AddrInfo,
DataTransform,
TopicValidatorFn,
rejectReasonFromAcceptance
rejectReasonFromAcceptance,
MsgIdToStrFn
} from './types.js'
import { buildRawMessage, validateToRawMessage } from './utils/buildRawMessage.js'
import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
Expand Down Expand Up @@ -113,10 +114,12 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
/** For a single RPC, await processing each message before processing the next */
awaitRpcMessageHandler: boolean

// Extra modules, config
/** message id function */
msgIdFn: MsgIdFn
/** fast message id function */
fastMsgIdFn: FastMsgIdFn
/** Uint8Array message id to string function */
msgIdToStrFn: MsgIdToStrFn
/** override the default MessageCache */
messageCache: MessageCache
/** peer score parameters */
Expand Down Expand Up @@ -274,6 +277,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*/
private readonly fastMsgIdFn: FastMsgIdFn | undefined

private readonly msgIdToStrFn: MsgIdToStrFn

/** Maps fast message-id to canonical message-id */
private readonly fastMsgIdCache: SimpleTimeCache<MsgIdStr> | undefined

Expand Down Expand Up @@ -370,8 +375,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.seenCache = new SimpleTimeCache<void>({ validityMs: opts.seenTTL })
this.publishedMessageIds = new SimpleTimeCache<void>({ validityMs: opts.seenTTL })

this.mcache = options.messageCache || new MessageCache(opts.mcacheGossip, opts.mcacheLength)

if (options.msgIdFn) {
// Use custom function
this.msgIdFn = options.msgIdFn
Expand All @@ -391,6 +394,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.fastMsgIdCache = new SimpleTimeCache<string>({ validityMs: opts.seenTTL })
}

// By default, gossipsub only provide a browser friendly function to convert Uint8Array message id to string.
this.msgIdToStrFn = options.msgIdToStrFn ?? messageIdToString

this.mcache = options.messageCache || new MessageCache(opts.mcacheGossip, opts.mcacheLength, this.msgIdToStrFn)

if (options.dataTransform) {
this.dataTransform = options.dataTransform
}
Expand Down Expand Up @@ -424,7 +432,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.metrics = null
}

this.gossipTracer = new IWantTracer(this.opts.gossipsubIWantFollowupMs, this.metrics)
this.gossipTracer = new IWantTracer(this.opts.gossipsubIWantFollowupMs, this.msgIdToStrFn, this.metrics)

/**
* libp2p
Expand Down Expand Up @@ -1004,7 +1012,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// - reject messages claiming to be from ourselves but not locally published

// Calculate the message id on the transformed data.
const msgIdStr = msgIdCached ?? messageIdToString(await this.msgIdFn(msg))
const msgIdStr = msgIdCached ?? this.msgIdToStrFn(await this.msgIdFn(msg))

// Add the message to the duplicate caches
if (fastMsgIdStr) this.fastMsgIdCache?.put(fastMsgIdStr, msgIdStr)
Expand Down Expand Up @@ -1154,7 +1162,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
let idonthave = 0

messageIDs.forEach((msgId) => {
const msgIdStr = messageIdToString(msgId)
const msgIdStr = this.msgIdToStrFn(msgId)
if (!this.seenCache.has(msgIdStr)) {
iwant.set(msgIdStr, msgId)
idonthave++
Expand Down Expand Up @@ -1214,7 +1222,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

iwant.forEach(({ messageIDs }) => {
messageIDs.forEach((msgId) => {
const msgIdStr = messageIdToString(msgId)
const msgIdStr = this.msgIdToStrFn(msgId)
const entry = this.mcache.getWithIWantCount(msgIdStr, id)
if (entry == null) {
iwantDonthave++
Expand Down Expand Up @@ -1851,7 +1859,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
key: rawMsg.key
}
const msgId = await this.msgIdFn(msg)
const msgIdStr = messageIdToString(msgId)
const msgIdStr = this.msgIdToStrFn(msgId)

if (this.seenCache.has(msgIdStr)) {
// This message has already been seen. We don't re-publish messages that have already
Expand Down
16 changes: 10 additions & 6 deletions src/message-cache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { RPC } from './message/rpc.js'
import type { MsgIdStr, PeerIdStr, TopicStr } from './types.js'
import { messageIdFromString, messageIdToString } from './utils/index.js'
import type { MsgIdStr, PeerIdStr, TopicStr, MsgIdToStrFn } from './types.js'
import { messageIdFromString } from './utils/messageIdToString.js'

export interface CacheEntry {
msgId: Uint8Array
Expand All @@ -26,20 +26,24 @@ interface MessageCacheEntry {
export class MessageCache {
msgs = new Map<MsgIdStr, MessageCacheEntry>()

msgIdToStrFn: MsgIdToStrFn

history: CacheEntry[][] = []

/**
* Holds history of messages in timebounded history arrays
*/
constructor(
/**
* he number of indices in the cache history used for gossiping. That means that a message
* The number of indices in the cache history used for gossiping. That means that a message
* won't get gossiped anymore when shift got called `gossip` many times after inserting the
* message in the cache.
*/
private readonly gossip: number,
historyCapacity: number
historyCapacity: number,
msgIdToStrFn: MsgIdToStrFn
) {
this.msgIdToStrFn = msgIdToStrFn
for (let i = 0; i < historyCapacity; i++) {
this.history[i] = []
}
Expand Down Expand Up @@ -89,7 +93,7 @@ export class MessageCache {
* Retrieves a message from the cache by its ID, if it is still present
*/
get(msgId: Uint8Array): RPC.Message | undefined {
return this.msgs.get(messageIdToString(msgId))?.message
return this.msgs.get(this.msgIdToStrFn(msgId))?.message
}

/**
Expand Down Expand Up @@ -149,7 +153,7 @@ export class MessageCache {
shift(): void {
const last = this.history[this.history.length - 1]
last.forEach((entry) => {
const msgIdStr = messageIdToString(entry.msgId)
const msgIdStr = this.msgIdToStrFn(entry.msgId)
this.msgs.delete(msgIdStr)
})

Expand Down
11 changes: 7 additions & 4 deletions src/tracer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { messageIdToString } from './utils/index.js'
import { MsgIdStr, PeerIdStr, RejectReason } from './types.js'
import { MsgIdStr, MsgIdToStrFn, PeerIdStr, RejectReason } from './types.js'
import type { Metrics } from './metrics.js'

/**
Expand All @@ -23,7 +22,11 @@ export class IWantTracer {
private readonly requestMsByMsg = new Map<MsgIdStr, number>()
private readonly requestMsByMsgExpire: number

constructor(private readonly gossipsubIWantFollowupMs: number, private readonly metrics: Metrics | null) {
constructor(
private readonly gossipsubIWantFollowupMs: number,
private readonly msgIdToStrFn: MsgIdToStrFn,
private readonly metrics: Metrics | null
) {
this.requestMsByMsgExpire = 10 * gossipsubIWantFollowupMs
}

Expand All @@ -42,7 +45,7 @@ export class IWantTracer {
// pick msgId randomly from the list
const ix = Math.floor(Math.random() * msgIds.length)
const msgId = msgIds[ix]
const msgIdStr = messageIdToString(msgId)
const msgIdStr = this.msgIdToStrFn(msgId)

let expireByPeer = this.promises.get(msgIdStr)
if (!expireByPeer) {
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ export interface AddrInfo {
*/
export type FastMsgIdFn = (msg: RPC.Message) => string

/**
* By default, gossipsub only provide a browser friendly function to convert Uint8Array message id to string.
* Application could use this option to provide a more efficient function.
*/
export type MsgIdToStrFn = (msgId: Uint8Array) => string

/**
* Compute spec'ed msg-id. Used for IHAVE / IWANT messages
*/
Expand Down
6 changes: 6 additions & 0 deletions src/utils/messageIdToString.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { fromString } from 'uint8arrays/from-string'
import { toString } from 'uint8arrays/to-string'

/**
* Browser friendly function to convert Uint8Array message id to base64 string.
*/
export function messageIdToString(msgId: Uint8Array): string {
return toString(msgId, 'base64')
}

/**
* Browser friendly function to convert base64 message id string to Uint8Array
*/
export function messageIdFromString(msgId: string): Uint8Array {
return fromString(msgId, 'base64')
}
2 changes: 1 addition & 1 deletion test/message-cache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getMsgId } from './utils/index.js'
import type { RPC } from '../src/message/rpc.js'

describe('Testing Message Cache Operations', () => {
const messageCache = new MessageCache(3, 5)
const messageCache = new MessageCache(3, 5, messageIdToString)
const testMessages: RPC.Message[] = []

before(async () => {
Expand Down
5 changes: 3 additions & 2 deletions test/tracer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { IWantTracer } from '../src/tracer.js'
import * as constants from '../src/constants.js'
import { makeTestMessage, getMsgId, getMsgIdStr } from './utils/index.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { messageIdToString } from '../src/utils/messageIdToString.js'

describe('IWantTracer', () => {
it('should track broken promises', async function () {
// tests that unfulfilled promises are tracked correctly
this.timeout(6000)
const t = new IWantTracer(constants.GossipsubIWantFollowupTime, null)
const t = new IWantTracer(constants.GossipsubIWantFollowupTime, messageIdToString, null)
const peerA = (await createEd25519PeerId()).toString()
const peerB = (await createEd25519PeerId()).toString()

Expand Down Expand Up @@ -37,7 +38,7 @@ describe('IWantTracer', () => {
it('should track unbroken promises', async function () {
// like above, but this time we deliver messages to fullfil the promises
this.timeout(6000)
const t = new IWantTracer(constants.GossipsubIWantFollowupTime, null)
const t = new IWantTracer(constants.GossipsubIWantFollowupTime, messageIdToString, null)
const peerA = (await createEd25519PeerId()).toString()
const peerB = (await createEd25519PeerId()).toString()

Expand Down

0 comments on commit 7f475c5

Please sign in to comment.