Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: support peer queries (#88)
Browse files Browse the repository at this point in the history
Support filtering and ordering peers in the datastore.
  • Loading branch information
achingbrain committed Jun 11, 2023
1 parent c9ca1dc commit 6b780fe
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 32 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,18 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/crypto": "^1.0.15",
"@libp2p/interface-libp2p": "^3.1.0",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-peer-store": "^2.0.1",
"@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.7",
"@libp2p/peer-collections": "^3.0.1",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/peer-record": "^5.0.3",
"@multiformats/multiaddr": "^12.0.0",
"interface-datastore": "^8.0.0",
"it-all": "^3.0.2",
"mortice": "^3.0.1",
"multiformats": "^11.0.0",
"protons-runtime": "^5.0.0",
Expand Down
17 changes: 6 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { logger } from '@libp2p/logger'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
import all from 'it-all'
import { PersistentStore, type PeerUpdate } from './store.js'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerStore, Peer, PeerData } from '@libp2p/interface-peer-store'
import type { PeerStore, Peer, PeerData, PeerQuery } from '@libp2p/interface-peer-store'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -41,13 +42,13 @@ export class PersistentPeerStore implements PeerStore {
this.store = new PersistentStore(components, init)
}

async forEach (fn: (peer: Peer) => void): Promise<void> {
async forEach (fn: (peer: Peer,) => void, query?: PeerQuery): Promise<void> {
log.trace('forEach await read lock')
const release = await this.store.lock.readLock()
log.trace('forEach got read lock')

try {
for await (const peer of this.store.all()) {
for await (const peer of this.store.all(query)) {
fn(peer)
}
} finally {
Expand All @@ -56,19 +57,13 @@ export class PersistentPeerStore implements PeerStore {
}
}

async all (): Promise<Peer[]> {
async all (query?: PeerQuery): Promise<Peer[]> {
log.trace('all await read lock')
const release = await this.store.lock.readLock()
log.trace('all got read lock')

try {
const output: Peer[] = []

for await (const peer of this.store.all()) {
output.push(peer)
}

return output
return await all(this.store.all(query))
} finally {
log.trace('all release read lock')
release()
Expand Down
63 changes: 48 additions & 15 deletions src/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromBytes } from '@libp2p/peer-id'
import mortice, { type Mortice } from 'mortice'
import { base32 } from 'multiformats/bases/base32'
Expand All @@ -11,8 +12,8 @@ import { toPeerPB } from './utils/to-peer-pb.js'
import type { AddressFilter, PersistentPeerStoreComponents, PersistentPeerStoreInit } from './index.js'
import type { PeerUpdate as PeerUpdateExternal } from '@libp2p/interface-libp2p'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Peer, PeerData } from '@libp2p/interface-peer-store'
import type { Datastore } from 'interface-datastore'
import type { Peer, PeerData, PeerQuery } from '@libp2p/interface-peer-store'
import type { Datastore, Key, Query } from 'interface-datastore'

/**
* Event detail emitted when peer data changes
Expand All @@ -21,6 +22,41 @@ export interface PeerUpdate extends PeerUpdateExternal {
updated: boolean
}

function decodePeer (key: Key, value: Uint8Array, cache: PeerMap<Peer>): Peer {
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32}
const base32Str = key.toString().split('/')[2]
const buf = base32.decode(base32Str)
const peerId = peerIdFromBytes(buf)

const cached = cache.get(peerId)

if (cached != null) {
return cached
}

const peer = bytesToPeer(peerId, value)

cache.set(peerId, peer)

return peer
}

function mapQuery (query: PeerQuery, cache: PeerMap<Peer>): Query {
if (query == null) {
return {}
}

return {
prefix: NAMESPACE_COMMON,
filters: (query.filters ?? []).map(fn => ({ key, value }) => {
return fn(decodePeer(key, value, cache))
}),
orders: (query.orders ?? []).map(fn => (a, b) => {
return fn(decodePeer(a.key, a.value, cache), decodePeer(b.key, b.value, cache))
})
}
}

export class PersistentStore {
private readonly peerId: PeerId
private readonly datastore: Datastore
Expand Down Expand Up @@ -96,28 +132,25 @@ export class PersistentStore {
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer)
}

async * all (): AsyncGenerator<Peer, void, unknown> {
for await (const { key, value } of this.datastore.query({
prefix: NAMESPACE_COMMON
})) {
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32}
const base32Str = key.toString().split('/')[2]
const buf = base32.decode(base32Str)
const peerId = peerIdFromBytes(buf)
async * all (query?: PeerQuery): AsyncGenerator<Peer, void, unknown> {
const peerCache = new PeerMap<Peer>()

for await (const { key, value } of this.datastore.query(mapQuery(query ?? {}, peerCache))) {
const peer = decodePeer(key, value, peerCache)

if (peerId.equals(this.peerId)) {
if (peer.id.equals(this.peerId)) {
// Skip self peer if present
continue
}

yield bytesToPeer(peerId, value)
yield peer
}
}

async #findExistingPeer (peerId: PeerId): Promise<{ existingBuf?: Uint8Array, existingPeer?: Peer }> {
try {
const existingBuf = await this.datastore.get(peerIdToDatastoreKey(peerId))
const existingPeer = await bytesToPeer(peerId, existingBuf)
const existingPeer = bytesToPeer(peerId, existingBuf)

return {
existingBuf,
Expand All @@ -137,7 +170,7 @@ export class PersistentStore {

if (existingBuf != null && uint8ArrayEquals(buf, existingBuf)) {
return {
peer: await bytesToPeer(peerId, buf),
peer: bytesToPeer(peerId, buf),
previous: existingPeer,
updated: false
}
Expand All @@ -146,7 +179,7 @@ export class PersistentStore {
await this.datastore.put(peerIdToDatastoreKey(peerId), buf)

return {
peer: await bytesToPeer(peerId, buf),
peer: bytesToPeer(peerId, buf),
previous: existingPeer,
updated: true
}
Expand Down
10 changes: 6 additions & 4 deletions src/utils/bytes-to-peer.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { unmarshalPublicKey } from '@libp2p/crypto/keys'
import { createFromPubKey } from '@libp2p/peer-id-factory'
import { peerIdFromPeerId } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { Peer as PeerPB } from '../pb/peer.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Peer, Tag } from '@libp2p/interface-peer-store'

export async function bytesToPeer (peerId: PeerId, buf: Uint8Array): Promise<Peer> {
export function bytesToPeer (peerId: PeerId, buf: Uint8Array): Peer {
const peer = PeerPB.decode(buf)

if (peer.publicKey != null && peerId.publicKey == null) {
peerId = await createFromPubKey(unmarshalPublicKey(peer.publicKey))
peerId = peerIdFromPeerId({
...peerId,
publicKey: peerId.publicKey
})
}

const tags = new Map<string, Tag>()
Expand Down
24 changes: 24 additions & 0 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,5 +259,29 @@ describe('PersistentPeerStore', () => {
await expect(peerStore.consumePeerRecord(signedPeerRecord.marshal(), otherPeerId)).to.eventually.equal(false)
await expect(peerStore.has(peerId)).to.eventually.be.false()
})

it('allows queries', async () => {
await peerStore.save(otherPeerId, {
multiaddrs: [
addr1
]
})

const allPeers = await peerStore.all({
filters: [
() => true
]
})

expect(allPeers).to.not.be.empty()

const noPeers = await peerStore.all({
filters: [
() => false
]
})

expect(noPeers).to.be.empty()
})
})
})

0 comments on commit 6b780fe

Please sign in to comment.