Skip to content

Commit

Permalink
fix!: accept Uint8Arrays as keys (#2909)
Browse files Browse the repository at this point in the history
To allow use cases like fetching IPNS records in a way compatible with go-libp2p we need to send binary as fetch identifiers.

JavaScript strings are UTF-16 so we can't round-trip binary reliably since some byte sequences are interpreted as multi-byte or otherwise non-printable characters.

Instead we need to accept Uint8Arrays and send them over the wire as-is.

This is a backwards compatible change as far as interop goes since protobuf `bytes` and `string` types are identical on the wire, but it's breaking for API consumers in that the lookup function now needs to accept a `Uint8Array` identifier instead of a `string`.

Refs: libp2p/specs#656

BREAKING CHANGE: registered lookup functions now receive a Uint8Array identifier instead of a string
  • Loading branch information
achingbrain authored Jan 9, 2025
1 parent 60ccf1a commit b56d918
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 56 deletions.
5 changes: 3 additions & 2 deletions packages/integration-tests/test/fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { type Fetch, fetch } from '@libp2p/fetch'
import { expect } from 'aegir/chai'
import { createLibp2p } from 'libp2p'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { isWebWorker } from 'wherearewe'
import { createBaseOptions } from './fixtures/base-options.js'
import type { Libp2p } from '@libp2p/interface'
Expand Down Expand Up @@ -31,9 +32,9 @@ describe('fetch', () => {
const DATA_B = { foobar: 'goodnight moon' }

const generateLookupFunction = function (prefix: string, data: Record<string, string>) {
return async function (key: string): Promise<Uint8Array | undefined> {
return async function (key: Uint8Array): Promise<Uint8Array | undefined> {
key = key.slice(prefix.length) // strip prefix from key
const val = data[key]
const val = data[uint8ArrayToString(key)]
if (val != null) {
return (new TextEncoder()).encode(val)
}
Expand Down
7 changes: 4 additions & 3 deletions packages/protocol-fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ const libp2p = await createLibp2p({
}
})

// Given a key (as a string) returns a value (as a Uint8Array), or undefined
// if the key isn't found.
// Given a key (as a Uint8Array) returns a value (as a Uint8Array), or
// undefined if the key isn't found.
//
// All keys must be prefixed by the same prefix, which will be used to find
// the appropriate key lookup function.
async function my_subsystem_key_lookup (key: string): Promise<Uint8Array | undefined> {
async function my_subsystem_key_lookup (key: Uint8Array): Promise<Uint8Array | undefined> {
// app specific callback to lookup key-value pairs.
return Uint8Array.from([0, 1, 2, 3, 4])
}
Expand Down
74 changes: 48 additions & 26 deletions packages/protocol-fetch/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class Fetch implements Startable, FetchInterface {
await data.stream.close()
})
.catch(err => {
this.log.error(err)
this.log.error('error handling message - %e', err)
})
}, {
maxInboundStreams: this.init.maxInboundStreams,
Expand All @@ -64,8 +64,12 @@ export class Fetch implements Startable, FetchInterface {
/**
* Sends a request to fetch the value associated with the given key from the given peer
*/
async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise<Uint8Array | undefined> {
this.log('dialing %s to %p', this.protocol, peer)
async fetch (peer: PeerId, key: string | Uint8Array, options: AbortOptions = {}): Promise<Uint8Array | undefined> {
if (typeof key === 'string') {
key = uint8arrayFromString(key)
}

this.log.trace('dialing %s to %p', this.protocol, peer)

const connection = await this.components.connectionManager.openConnection(peer, options)
let signal = options.signal
Expand All @@ -75,7 +79,7 @@ export class Fetch implements Startable, FetchInterface {
// create a timeout if no abort signal passed
if (signal == null) {
const timeout = this.init.timeout ?? DEFAULT_TIMEOUT
this.log('using default timeout of %d ms', timeout)
this.log.trace('using default timeout of %d ms', timeout)
signal = AbortSignal.timeout(timeout)

setMaxListeners(Infinity, signal)
Expand All @@ -93,7 +97,7 @@ export class Fetch implements Startable, FetchInterface {
// make stream abortable
signal.addEventListener('abort', onAbort, { once: true })

this.log('fetch %s', key)
this.log.trace('fetch %m', key)

const pb = pbStream(stream)
await pb.write({
Expand All @@ -105,20 +109,20 @@ export class Fetch implements Startable, FetchInterface {

switch (response.status) {
case (FetchResponse.StatusCode.OK): {
this.log('received status for %s ok', key)
this.log.trace('received status OK for %m', key)
return response.data
}
case (FetchResponse.StatusCode.NOT_FOUND): {
this.log('received status for %s not found', key)
this.log('received status NOT_FOUND for %m', key)
return
}
case (FetchResponse.StatusCode.ERROR): {
this.log('received status for %s error', key)
this.log('received status ERROR for %m', key)
const errmsg = uint8arrayToString(response.data)
throw new ProtocolError('Error in fetch protocol response: ' + errmsg)
}
default: {
this.log('received status for %s unknown', key)
this.log('received status unknown for %m', key)
throw new InvalidMessageError('Unknown response status')
}
}
Expand Down Expand Up @@ -149,21 +153,32 @@ export class Fetch implements Startable, FetchInterface {
})

let response: FetchResponse
const lookup = this._getLookupFunction(request.identifier)
if (lookup != null) {
this.log('look up data with identifier %s', request.identifier)
const data = await lookup(request.identifier)
if (data != null) {
this.log('sending status for %s ok', request.identifier)
response = { status: FetchResponse.StatusCode.OK, data }
} else {
this.log('sending status for %s not found', request.identifier)
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
}
} else {
this.log('sending status for %s error', request.identifier)
const errmsg = uint8arrayFromString(`No lookup function registered for key: ${request.identifier}`)
const key = uint8arrayToString(request.identifier)

const lookup = this._getLookupFunction(key)

if (lookup == null) {
this.log.trace('sending status ERROR for %m', request.identifier)
const errmsg = uint8arrayFromString('No lookup function registered for key')
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
} else {
this.log.trace('lookup data with identifier %s', lookup.prefix)

try {
const data = await lookup.fn(request.identifier)

if (data == null) {
this.log.trace('sending status NOT_FOUND for %m', request.identifier)
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
} else {
this.log.trace('sending status OK for %m', request.identifier)
response = { status: FetchResponse.StatusCode.OK, data }
}
} catch (err: any) {
this.log.error('error during lookup of %m - %e', request.identifier, err)
const errmsg = uint8arrayFromString(err.message)
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
}
}

await pb.write(response, FetchResponse, {
Expand All @@ -174,7 +189,7 @@ export class Fetch implements Startable, FetchInterface {
signal
})
} catch (err: any) {
this.log('error answering fetch request', err)
this.log.error('error answering fetch request - %e', err)
stream.abort(err)
}
}
Expand All @@ -183,10 +198,17 @@ export class Fetch implements Startable, FetchInterface {
* Given a key, finds the appropriate function for looking up its corresponding value, based on
* the key's prefix.
*/
_getLookupFunction (key: string): LookupFunction | undefined {
_getLookupFunction (key: string): { fn: LookupFunction, prefix: string } | undefined {
for (const prefix of this.lookupFunctions.keys()) {
if (key.startsWith(prefix)) {
return this.lookupFunctions.get(prefix)
const fn = this.lookupFunctions.get(prefix)

if (fn != null) {
return {
fn,
prefix
}
}
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions packages/protocol-fetch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
* }
* })
*
* // Given a key (as a string) returns a value (as a Uint8Array), or undefined
* // if the key isn't found.
* // Given a key (as a Uint8Array) returns a value (as a Uint8Array), or
* // undefined if the key isn't found.
* //
* // All keys must be prefixed by the same prefix, which will be used to find
* // the appropriate key lookup function.
* async function my_subsystem_key_lookup (key: string): Promise<Uint8Array | undefined> {
* async function my_subsystem_key_lookup (key: Uint8Array): Promise<Uint8Array | undefined> {
* // app specific callback to lookup key-value pairs.
* return Uint8Array.from([0, 1, 2, 3, 4])
* }
Expand Down Expand Up @@ -56,8 +57,15 @@ export interface FetchInit {
timeout?: number
}

/**
* A lookup function is registered against a specific identifier prefix and is
* invoked when a remote peer requests a value with that prefix
*/
export interface LookupFunction {
(key: string): Promise<Uint8Array | undefined>
/**
* The key is the identifier requested by the remote peer
*/
(key: Uint8Array): Promise<Uint8Array | undefined>
}

export interface FetchComponents {
Expand All @@ -70,7 +78,7 @@ export interface Fetch {
/**
* Sends a request to fetch the value associated with the given key from the given peer
*/
fetch(peer: PeerId, key: string, options?: AbortOptions): Promise<Uint8Array | undefined>
fetch(peer: PeerId, key: string | Uint8Array, options?: AbortOptions): Promise<Uint8Array | undefined>

/**
* Registers a new lookup callback that can map keys to values, for a given set of keys that
Expand Down
17 changes: 9 additions & 8 deletions packages/protocol-fetch/src/pb/proto.proto
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
syntax = "proto3";

message FetchRequest {
string identifier = 1;
bytes identifier = 1;
}

message FetchResponse {
StatusCode status = 1;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}
bytes data = 2;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}

StatusCode status = 1;
bytes data = 2;
}
10 changes: 5 additions & 5 deletions packages/protocol-fetch/src/pb/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface FetchRequest {
identifier: string
identifier: Uint8Array
}

export namespace FetchRequest {
Expand All @@ -22,17 +22,17 @@ export namespace FetchRequest {
w.fork()
}

if ((obj.identifier != null && obj.identifier !== '')) {
if ((obj.identifier != null && obj.identifier.byteLength > 0)) {
w.uint32(10)
w.string(obj.identifier)
w.bytes(obj.identifier)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
identifier: ''
identifier: uint8ArrayAlloc(0)
}

const end = length == null ? reader.len : reader.pos + length
Expand All @@ -42,7 +42,7 @@ export namespace FetchRequest {

switch (tag >>> 3) {
case 1: {
obj.identifier = reader.string()
obj.identifier = reader.bytes()
break
}
default: {
Expand Down
16 changes: 9 additions & 7 deletions packages/protocol-fetch/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-protobuf-stream'
import sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
import { toString as uint8arrayToString } from 'uint8arrays/to-string'
import { Fetch } from '../src/fetch.js'
import { FetchRequest, FetchResponse } from '../src/pb/proto.js'
import type { ComponentLogger, Connection, Stream, PeerId } from '@libp2p/interface'
Expand Down Expand Up @@ -89,7 +91,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.OK,
Expand All @@ -112,7 +114,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.NOT_FOUND
Expand All @@ -134,7 +136,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.ERROR
Expand Down Expand Up @@ -177,7 +179,7 @@ describe('fetch', () => {
} = createStreams(components)

fetch.registerLookupFunction('/test', async (k) => {
expect(k).to.equal(key)
expect(k).to.equalBytes(uint8arrayFromString(key))
return value
})

Expand All @@ -189,7 +191,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand Down Expand Up @@ -218,7 +220,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand All @@ -242,7 +244,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand Down

0 comments on commit b56d918

Please sign in to comment.