Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: add peer tagging (#12)
Browse files Browse the repository at this point in the history
Allows tagging peers to mark some important or ones we should keep
connections open to, etc.

Refs: libp2p/js-libp2p#369
  • Loading branch information
achingbrain committed Jun 24, 2022
1 parent a50f109 commit c360e41
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 4 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"generate": "protons src/pb/peer.proto",
"generate": "protons src/pb/peer.proto src/pb/tags.proto",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser --cov",
Expand All @@ -142,7 +142,7 @@
"@libp2p/components": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interface-peer-store": "^1.1.0",
"@libp2p/interface-record": "^1.0.1",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
Expand All @@ -165,8 +165,9 @@
"@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/peer-id-factory": "^1.0.0",
"@libp2p/utils": "^2.0.0",
"aegir": "^37.3.0",
"aegir": "^37.4.0",
"datastore-core": "^7.0.1",
"delay": "^5.0.0",
"p-defer": "^4.0.0",
"p-wait-for": "^4.1.0",
"protons": "^3.0.4",
Expand Down
70 changes: 69 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { PeerStoreKeyBook } from './key-book.js'
import { PeerStoreMetadataBook } from './metadata-book.js'
import { PeerStoreProtoBook } from './proto-book.js'
import { PersistentStore, Store } from './store.js'
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer } from '@libp2p/interface-peer-store'
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer, TagOptions } from '@libp2p/interface-peer-store'
import type { PeerId } from '@libp2p/interface-peer-id'
import { Components, Initializable } from '@libp2p/components'
import errCode from 'err-code'
import { Tag, Tags } from './pb/tags.js'

const log = logger('libp2p:peer-store')

Expand Down Expand Up @@ -115,4 +117,70 @@ export class PersistentPeerStore extends EventEmitter<PeerStoreEvents> implement
release()
}
}

async tagPeer (peerId: PeerId, tag: string, options: TagOptions = {}) {
const providedValue = options.value ?? 0
const value = Math.round(providedValue)
const ttl = options.ttl ?? undefined

if (value !== providedValue || value < 0 || value > 100) {
throw errCode(new Error('Tag value must be between 0-100'), 'ERR_TAG_VALUE_OUT_OF_BOUNDS')
}

const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

for (const t of tags) {
if (t.name === tag) {
throw errCode(new Error('Peer already tagged'), 'ERR_DUPLICATE_TAG')
}
}

tags.push({
name: tag,
value,
expiry: ttl == null ? undefined : BigInt(Date.now() + ttl)
})

await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }))
}

async unTagPeer (peerId: PeerId, tag: string) {
const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

tags = tags.filter(t => t.name !== tag)

await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }))
}

async getTags (peerId: PeerId) {
const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

const now = BigInt(Date.now())
const unexpiredTags = tags.filter(tag => tag.expiry == null || tag.expiry > now)

if (unexpiredTags.length !== tags.length) {
// remove any expired tags
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags: unexpiredTags }))
}

return unexpiredTags.map(t => ({
name: t.name,
value: t.value ?? 0
}))
}
}
11 changes: 11 additions & 0 deletions src/pb/tags.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

message Tags {
repeated Tag tags = 1;
}

message Tag {
string name = 1; // e.g. 'priority'
optional uint32 value = 2; // tag value 0-100
optional uint64 expiry = 3; // ms timestamp after which the tag is no longer valid
}
49 changes: 49 additions & 0 deletions src/pb/tags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */

import { encodeMessage, decodeMessage, message, string, uint32, uint64 } from 'protons-runtime'
import type { Codec } from 'protons-runtime'

export interface Tags {
tags: Tag[]
}

export namespace Tags {
export const codec = (): Codec<Tags> => {
return message<Tags>({
1: { name: 'tags', codec: Tag.codec(), repeats: true }
})
}

export const encode = (obj: Tags): Uint8Array => {
return encodeMessage(obj, Tags.codec())
}

export const decode = (buf: Uint8Array): Tags => {
return decodeMessage(buf, Tags.codec())
}
}

export interface Tag {
name: string
value?: number
expiry?: bigint
}

export namespace Tag {
export const codec = (): Codec<Tag> => {
return message<Tag>({
1: { name: 'name', codec: string },
2: { name: 'value', codec: uint32, optional: true },
3: { name: 'expiry', codec: uint64, optional: true }
})
}

export const encode = (obj: Tag): Uint8Array => {
return encodeMessage(obj, Tag.codec())
}

export const decode = (buf: Uint8Array): Tag => {
return decodeMessage(buf, Tag.codec())
}
}
104 changes: 104 additions & 0 deletions test/peer-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { MemoryDatastore } from 'datastore-core/memory'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import { Components } from '@libp2p/components'
import delay from 'delay'

const addr1 = new Multiaddr('/ip4/127.0.0.1/tcp/8000')
const addr2 = new Multiaddr('/ip4/127.0.0.1/tcp/8001')
Expand Down Expand Up @@ -214,4 +215,107 @@ describe('peer-store', () => {
expect(peerData.metadata.get(metadataKey)).to.equalBytes(metadataValue)
})
})

describe('tags', () => {
let peerStore: PersistentPeerStore

beforeEach(() => {
peerStore = new PersistentPeerStore()
peerStore.init(new Components({ peerId: peerIds[4], datastore: new MemoryDatastore() }))
})

it('tags a peer', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag for peer')
.to.eventually.deep.include.members([{
name,
value: 0
}])
})

it('tags a peer with a value', async () => {
const name = 'a-tag'
const value = 50
await peerStore.tagPeer(peerIds[0], name, {
value
})

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag for peer with a value')
.to.eventually.deep.include.members([{
name,
value
}])
})

it('tags a peer with a valid value', async () => {
const name = 'a-tag'

await expect(peerStore.tagPeer(peerIds[0], name, {
value: -1
}), 'PeerStore contain tag for peer where value was too small')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')

await expect(peerStore.tagPeer(peerIds[0], name, {
value: 101
}), 'PeerStore contain tag for peer where value was too large')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')

await expect(peerStore.tagPeer(peerIds[0], name, {
value: 5.5
}), 'PeerStore contain tag for peer where value was not an integer')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')
})

it('tags a peer with an expiring value', async () => {
const name = 'a-tag'
const value = 50
await peerStore.tagPeer(peerIds[0], name, {
value,
ttl: 50
})

await expect(peerStore.getTags(peerIds[0]))
.to.eventually.deep.include.members([{
name,
value
}], 'PeerStore did not contain expiring value')

await delay(100)

await expect(peerStore.getTags(peerIds[0]))
.to.eventually.not.deep.include.members([{
name,
value
}], 'PeerStore contained expired value')
})

it('does not tag a peer twice', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.tagPeer(peerIds[0], name), 'PeerStore allowed duplicate tags')
.to.eventually.be.rejected().with.property('code', 'ERR_DUPLICATE_TAG')
})

it('untags a peer', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag')
.to.eventually.deep.include.members([{
name,
value: 0
}])

await peerStore.unTagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore contained untagged tag')
.to.eventually.not.deep.include.members([{
name,
value: 0
}])
})
})
})

0 comments on commit c360e41

Please sign in to comment.