Skip to content

Commit

Permalink
feat!: versioned wire transport (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gozala authored Apr 11, 2023
1 parent ce95504 commit 25abb67
Show file tree
Hide file tree
Showing 39 changed files with 1,324 additions and 973 deletions.
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"scripts": {
"test:web": "playwright-test test/**/*.spec.js --cov && nyc report",
"test:node": "c8 --check-coverage --branches 100 --functions 100 --lines 100 mocha test/**/*.spec.js",
"test": "npm run test:node",
"test": "c8 --check-coverage --branches 100 --functions 100 --lines 100 mocha --bail test/**/*.spec.js",
"coverage": "c8 --reporter=html mocha test/test-*.js && npm_config_yes=true npx st -d coverage -p 8080",
"check": "tsc --build",
"build": "tsc --build"
Expand Down
25 changes: 14 additions & 11 deletions packages/client/src/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as API from '@ucanto/interface'
import { Receipt, Signature, sha256 } from '@ucanto/core'
import { Signature, Message, Receipt, sha256 } from '@ucanto/core'

/**
* Creates a connection to a service.
Expand Down Expand Up @@ -29,8 +29,9 @@ class Connection {
* @template {API.Capability} C
* @template {API.Tuple<API.ServiceInvocation<C, T>>} I
* @param {I} invocations
* @returns {Promise<API.InferReceipts<I, T>>}
*/
execute(...invocations) {
async execute(...invocations) {
return execute(invocations, this)
}
}
Expand All @@ -40,28 +41,30 @@ class Connection {
* @template {Record<string, any>} T
* @template {API.Tuple<API.ServiceInvocation<C, T>>} I
* @param {API.Connection<T>} connection
* @param {I} workflow
* @returns {Promise<API.InferWorkflowReceipts<I, T>>}
* @param {I} invocations
* @returns {Promise<API.InferReceipts<I, T>>}
*/
export const execute = async (workflow, connection) => {
const request = await connection.codec.encode(workflow, connection)
export const execute = async (invocations, connection) => {
const input = await Message.build({ invocations })
const request = await connection.codec.encode(input, connection)
const response = await connection.channel.request(request)
// We may fail to decode the response if content type is not supported
// or if data was corrupted. We do not want to throw in such case however,
// because client will get an Error object as opposed to a receipt, to retain
// consistent client API with two kinds of errors we encode caught error as
// a receipts per workflow invocation.
try {
return await connection.codec.decode(response)
const output = await connection.codec.decode(response)
const receipts = input.invocationLinks.map(link => output.get(link))
return /** @type {API.InferReceipts<I, T>} */ (receipts)
} catch (error) {
// No third party code is run during decode and we know
// we only throw an Error
const { message, ...cause } = /** @type {Error} */ (error)
const receipts = []
for await (const invocation of workflow) {
const { cid } = await invocation.delegate()
for await (const ran of input.invocationLinks) {
const receipt = await Receipt.issue({
ran: cid,
ran,
result: { error: { ...cause, message } },
// @ts-expect-error - we can not really sign a receipt without having
// an access to a signer which client does not have. In the future
Expand All @@ -80,6 +83,6 @@ export const execute = async (workflow, connection) => {
receipts.push(receipt)
}

return /** @type {any} */ (receipts)
return /** @type {API.InferReceipts<I, T>} */ (receipts)
}
}
50 changes: 28 additions & 22 deletions packages/client/test/client.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Client from '../src/lib.js'
import * as HTTP from '@ucanto/transport/http'
import { CAR, Codec } from '@ucanto/transport'
import * as Service from './service.js'
import { Receipt, CBOR } from '@ucanto/core'
import { Receipt, Message, CBOR } from '@ucanto/core'
import { alice, bob, mallory, service as w3 } from './fixtures.js'
import fetch from '@web-std/fetch'

Expand All @@ -30,18 +30,19 @@ test('encode invocation', async () => {
proofs: [],
})

const payload = await connection.codec.encode([add])
const message = await Message.build({ invocations: [add] })
const payload = await connection.codec.encode(message)

assert.deepEqual(payload.headers, {
'content-type': 'application/car',
accept: 'application/car',
'content-type': 'application/vnd.ipld.car',
accept: 'application/vnd.ipld.car',
})
assert.ok(payload.body instanceof Uint8Array)

const request = await CAR.decode(payload)
const request = await CAR.request.decode(payload)

const [invocation] = request
assert.equal(request.length, 1)
const [invocation] = request.invocations
assert.equal(request.invocations.length, 1)
assert.equal(invocation.issuer.did(), alice.did())
assert.equal(invocation.audience.did(), w3.did())
assert.deepEqual(invocation.proofs, [])
Expand Down Expand Up @@ -98,11 +99,12 @@ test('encode delegated invocation', async () => {
},
})

const payload = await connection.codec.encode([add, remove])
const request = await CAR.decode(payload)
const message = await Message.build({ invocations: [add, remove] })
const payload = await connection.codec.encode(message)
const request = await CAR.request.decode(payload)
{
const [add, remove] = request
assert.equal(request.length, 2)
const [add, remove] = request.invocations
assert.equal(request.invocations.length, 2)

assert.equal(add.issuer.did(), bob.did())
assert.equal(add.audience.did(), w3.did())
Expand All @@ -125,13 +127,16 @@ test('encode delegated invocation', async () => {
assert.equal(remove.issuer.did(), alice.did())
assert.equal(remove.audience.did(), w3.did())
assert.deepEqual(remove.proofs, [])
assert.deepEqual(remove.capabilities, [
{
can: 'store/remove',
with: alice.did(),
link: car.cid,
},
])
assert.deepEqual(
[
Object({
can: 'store/remove',
with: alice.did(),
link: car.cid,
}),
],
remove.capabilities
)
}
})

Expand All @@ -140,8 +145,7 @@ const service = Service.create()
const channel = HTTP.open({
url: new URL('about:blank'),
fetch: async (url, input) => {
/** @type {Client.Tuple<Client.Invocation>} */
const invocations = await CAR.request.decode(input)
const { invocations } = await CAR.request.decode(input)
const promises = invocations.map(async invocation => {
const [capability] = invocation.capabilities
switch (capability.can) {
Expand Down Expand Up @@ -172,7 +176,9 @@ const channel = HTTP.open({
await Promise.all(promises)
)

const { headers, body } = await CAR.response.encode(receipts)
const message = await Message.build({ receipts })

const { headers, body } = await CAR.response.encode(message)

return {
ok: true,
Expand Down Expand Up @@ -317,7 +323,7 @@ test('decode error', async () => {
error: {
error: true,
message:
"Can not decode response with content-type 'application/car' because no matching transport decoder is configured.",
"Can not decode response with content-type 'application/vnd.ipld.car' because no matching transport decoder is configured.",
},
})
})
18 changes: 10 additions & 8 deletions packages/core/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import { base32 } from 'multiformats/bases/base32'
import { create as createLink } from './link.js'
import { sha256 } from 'multiformats/hashes/sha2'

// @see https://www.iana.org/assignments/media-types/application/vnd.ipld.car
export const contentType = 'application/vnd.ipld.car'
export const name = 'CAR'

/** @type {API.MulticodecCode<0x0202, 'CAR'>} */
export const code = 0x0202

/**
* @typedef {API.Block<unknown, number, number, 0|1>} Block
* @typedef {{
* roots: Block[]
* blocks: Map<string, Block>
* roots: API.IPLDBlock[]
* blocks: Map<string, API.IPLDBlock>
* }} Model
*/

class Writer {
/**
* @param {Block[]} blocks
* @param {API.IPLDBlock[]} blocks
* @param {number} byteLength
*/
constructor(blocks = [], byteLength = 0) {
Expand All @@ -29,23 +30,23 @@ class Writer {
this.byteLength = byteLength
}
/**
* @param {Block[]} blocks
* @param {API.IPLDBlock[]} blocks
*/
write(...blocks) {
for (const block of blocks) {
const id = block.cid.toString(base32)
if (!this.written.has(id)) {
this.blocks.push(block)
this.byteLength += CarBufferWriter.blockLength(
/** @type {CarBufferWriter.Block} */ (block)
/** @type {any} */ (block)
)
this.written.add(id)
}
}
return this
}
/**
* @param {Block[]} rootBlocks
* @param {API.IPLDBlock[]} rootBlocks
*/
flush(...rootBlocks) {
const roots = []
Expand Down Expand Up @@ -99,11 +100,12 @@ export const encode = ({ roots = [], blocks }) => {
*/
export const decode = bytes => {
const reader = CarBufferReader.fromBytes(bytes)
/** @type {API.IPLDBlock[]} */
const roots = []
const blocks = new Map()

for (const root of reader.getRoots()) {
const block = reader.get(root)
const block = /** @type {API.IPLDBlock} */ (reader.get(root))
if (block) {
roots.push(block)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/cbor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ export { code, name, decode } from '@ipld/dag-cbor'
import { sha256 } from 'multiformats/hashes/sha2'
import { create as createLink, isLink } from 'multiformats/link'

// @see https://www.iana.org/assignments/media-types/application/vnd.ipld.dag-cbor
export const contentType = 'application/vnd.ipld.dag-cbor'

/**
* @param {unknown} data
* @param {Set<unknown>} seen
Expand Down
28 changes: 20 additions & 8 deletions packages/core/src/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import * as MF from 'multiformats/interface'
import * as CBOR from './cbor.js'
import { identity } from 'multiformats/hashes/identity'

export { CBOR, sha256, identity }

/**
* Function takes arbitrary value and if it happens to be an `IPLDView`
* it will iterate over it's blocks. It is just a convenience for traversing
Expand All @@ -27,15 +29,20 @@ export const iterate = function* (value) {
}

/**
* @template T
* @typedef {Map<API.ToString<API.Link>, API.Block<T>>} BlockStore
* @template [T=unknown]
* @typedef {Map<API.ToString<API.Link>, API.Block<T, number, number, 0>|API.Block<T, number, number, 1>>} BlockStore
*/

/**
* @template [T=unknown]
* @param {API.Block<T>[]} blocks
* @returns {BlockStore<T>}
*/
export const createStore = () => new Map()
export const createStore = (blocks = []) => {
const store = new Map()
addEveryInto(blocks, store)
return store
}

/** @type {API.MulticodecCode<typeof identity.code, typeof identity.name>} */
const EMBED_CODE = identity.code
Expand All @@ -45,21 +52,26 @@ const EMBED_CODE = identity.code
* contain the block, `fallback` is returned. If `fallback` is not provided, it
* will throw an error.
*
* @template {0|1} V
* @template {T} U
* @template T
* @template {API.MulticodecCode} Format
* @template {API.MulticodecCode} Alg
* @template [E=never]
* @param {API.Link<U>} cid
* @param {API.Link<U, Format, Alg, V>} cid
* @param {BlockStore<T>} store
* @param {E} [fallback]
* @returns {API.Block<U>|E}
* @returns {API.Block<U, Format, Alg, V>|E}
*/
export const get = (cid, store, fallback) => {
// If CID uses identity hash, we can return the block data directly
if (cid.multihash.code === EMBED_CODE) {
return { cid, bytes: cid.multihash.digest }
}

const block = /** @type {API.Block<U>|undefined} */ (store.get(`${cid}`))
const block = /** @type {API.Block<U, Format, Alg, V>|undefined} */ (
store.get(`${cid}`)
)
return block ? block : fallback === undefined ? notFound(cid) : fallback
}

Expand All @@ -84,10 +96,10 @@ export const embed = (source, { codec } = {}) => {
}

/**
* @param {API.Link} link
* @param {API.Link<*, *, *, *>} link
* @returns {never}
*/
const notFound = link => {
export const notFound = link => {
throw new Error(`Block for the ${link} is not found`)
}

Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/delegation.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ const matchAbility = (provided, claimed) => {
export class Delegation {
/**
* @param {API.UCANBlock<C>} root
* @param {Map<string, API.Block>} [blocks]
* @param {DAG.BlockStore} [blocks]
*/
constructor(root, blocks = new Map()) {
this.root = root
Expand Down Expand Up @@ -364,14 +364,14 @@ export const delegate = async (
/**
* @template {API.Capabilities} C
* @param {API.UCANBlock<C>} root
* @param {Map<string, API.Block>} blocks
* @param {DAG.BlockStore} blocks
* @returns {IterableIterator<API.Block>}
*/

export const exportDAG = function* (root, blocks) {
for (const link of decode(root).proofs) {
// Check if block is included in this delegation
const root = /** @type {UCAN.Block} */ (blocks.get(link.toString()))
const root = /** @type {UCAN.Block} */ (blocks.get(`${link}`))
if (root) {
yield* exportDAG(root, blocks)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ export const importDAG = dag => {
* @template {API.Capabilities} C
* @param {object} dag
* @param {API.UCANBlock<C>} dag.root
* @param {Map<string, API.Block<unknown>>} [dag.blocks]
* @param {DAG.BlockStore} [dag.blocks]
* @returns {API.Delegation<C>}
*/
export const create = ({ root, blocks }) => new Delegation(root, blocks)
Expand All @@ -419,7 +419,7 @@ export const create = ({ root, blocks }) => new Delegation(root, blocks)
* @template [T=undefined]
* @param {object} dag
* @param {API.UCANLink<C>} dag.root
* @param {Map<string, API.Block>} dag.blocks
* @param {DAG.BlockStore} dag.blocks
* @param {T} [fallback]
* @returns {API.Delegation<C>|T}
*/
Expand Down
Loading

0 comments on commit 25abb67

Please sign in to comment.