Skip to content

Commit

Permalink
fix(xrpc): support BodyInit as request body
Browse files Browse the repository at this point in the history
fix(xrpc-server): allow upload of empty files
  • Loading branch information
matthieusieben committed May 2, 2024
1 parent c148fd5 commit a966ee5
Show file tree
Hide file tree
Showing 14 changed files with 456 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats/cid'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface CallOptions {
headers?: Headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats/cid'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface OutputSchema {
blob: BlobRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface HandlerInput {
encoding: 'application/vnd.ipld.car'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface OutputSchema {
blob: BlobRef
Expand Down
4 changes: 2 additions & 2 deletions packages/lex-cli/src/codegen/lex-gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ export function genXrpcInput(
)
}
} else if (def.type === 'procedure' && def.input?.encoding) {
//= export type InputSchema = string | Uint8Array
//= export type InputSchema = string | Uint8Array | Blob
file.addTypeAlias({
isExported: true,
name: 'InputSchema',
type: 'string | Uint8Array',
type: 'string | Uint8Array | Blob',
})
} else {
//= export type InputSchema = undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface HandlerInput {
encoding: 'application/vnd.ipld.car'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface OutputSchema {
blob: BlobRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface HandlerInput {
encoding: 'application/vnd.ipld.car'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'

export interface QueryParams {}

export type InputSchema = string | Uint8Array
export type InputSchema = string | Uint8Array | Blob

export interface OutputSchema {
blob: BlobRef
Expand Down
2 changes: 1 addition & 1 deletion packages/pds/tests/crud.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ describe('crud operations', () => {
const result = await defaultFetchHandler(
aliceAgent.service.origin + `/xrpc/com.atproto.repo.createRecord`,
'post',
{ ...aliceAgent.api.xrpc.headers, 'Content-Type': 'application/json' },
{ ...aliceAgent.api.xrpc.headers, 'content-type': 'application/json' },
passthroughBody({
repo: alice.did,
collection: 'app.bsky.feed.post',
Expand Down
21 changes: 17 additions & 4 deletions packages/xrpc-server/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,23 @@ function isValidEncoding(possibleStr: string, value: string) {
return possible.includes(normalized)
}

export function hasBody(req: express.Request) {
const contentLength = req.headers['content-length']
const transferEncoding = req.headers['transfer-encoding']
return (contentLength && parseInt(contentLength, 10) > 0) || transferEncoding
function parseContentLength(value: string): number {
if (/^\s*\d+\s*$/.test(value)) return Number(value)
throw new InvalidRequestError('invalid content-length header')
}

function hasBody(req: express.Request): boolean {
if (req.headers['transfer-encoding']) return true

if (req.headers['content-length']) {
const contentLength = parseContentLength(req.headers['content-length'])
if (contentLength > 0) return true
// A content-length of 0 is still a body if there is a content-type (e.g.
// an empty text file)
if (req.headers['content-type']) return true
}

return false
}

export function processBodyAsBytes(req: express.Request): Promise<Uint8Array> {
Expand Down
172 changes: 165 additions & 7 deletions packages/xrpc-server/tests/bodies.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Readable } from 'stream'
import { gzipSync } from 'zlib'
import getPort from 'get-port'
import { LexiconDoc } from '@atproto/lexicon'
import { ReadableStream } from 'stream/web'
import xrpc, { ServiceClient } from '@atproto/xrpc'
import { bytesToStream, cidForCbor } from '@atproto/common'
import { randomBytes } from '@atproto/crypto'
Expand Down Expand Up @@ -88,6 +89,20 @@ const LEXICONS: LexiconDoc[] = [

const BLOB_LIMIT = 5000

async function consumeInput(
input: Readable | string | object,
): Promise<Buffer> {
if (typeof input === 'string') return Buffer.from(input)
if (input instanceof Readable) {
const buffers: Buffer[] = []
for await (const data of input) {
buffers.push(data)
}
return Buffer.concat(buffers)
}
throw new Error('Invalid input')
}

describe('Bodies', () => {
let s: http.Server
const server = xrpcServer.createServer(LEXICONS, {
Expand All @@ -109,13 +124,8 @@ describe('Bodies', () => {
server.method(
'io.example.blobTest',
async (ctx: { input?: xrpcServer.HandlerInput }) => {
if (!(ctx.input?.body instanceof Readable))
throw new Error('Input not readable')
const buffers: Buffer[] = []
for await (const data of ctx.input.body) {
buffers.push(data)
}
const cid = await cidForCbor(Buffer.concat(buffers))
const buffer = await consumeInput(ctx.input?.body)
const cid = await cidForCbor(buffer)
return {
encoding: 'json',
body: { cid: cid.toString() },
Expand Down Expand Up @@ -165,7 +175,65 @@ describe('Bodies', () => {
{ foo: 'hello', bar: 123 },
{ encoding: 'image/jpeg' },
),
).rejects.toThrow(`Unable to encode object as image/jpeg data`)
await expect(
client.call(
'io.example.validationTest',
{},
// Does not need to be a valid jpeg
new Blob([randomBytes(123)], { type: 'image/jpeg' }),
),
).rejects.toThrow(`Wrong request encoding (Content-Type): image/jpeg`)
await expect(
client.call(
'io.example.validationTest',
{},
(() => {
const formData = new FormData()
formData.append('foo', 'bar')
return formData
})(),
),
).rejects.toThrow(
`Wrong request encoding (Content-Type): multipart/form-data`,
)
await expect(
client.call(
'io.example.validationTest',
{},
new URLSearchParams([['foo', 'bar']]),
),
).rejects.toThrow(
`Wrong request encoding (Content-Type): application/x-www-form-urlencoded`,
)
await expect(
client.call(
'io.example.validationTest',
{},
new Blob([new Uint8Array([1])]),
),
).rejects.toThrow(
`Wrong request encoding (Content-Type): application/octet-stream`,
)
await expect(
client.call(
'io.example.validationTest',
{},
new ReadableStream({
pull(ctrl) {
ctrl.enqueue(new Uint8Array([1]))
ctrl.close()
},
}),
),
).rejects.toThrow(
`Wrong request encoding (Content-Type): application/octet-stream`,
)
await expect(
client.call('io.example.validationTest', {}, new Uint8Array([1])),
).rejects.toThrow(
`Wrong request encoding (Content-Type): application/octet-stream`,
)

// 500 responses don't include details, so we nab details from the logger.
let error: string | undefined
Expand All @@ -182,6 +250,96 @@ describe('Bodies', () => {
expect(error).toEqual(`Output must have the property "foo"`)
})

it('supports ArrayBuffers', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const bytesResponse = await client.call('io.example.blobTest', {}, bytes, {
encoding: 'application/octet-stream',
})
expect(bytesResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports empty payload on procedues with encoding', async () => {
const bytes = new Uint8Array(0)
const expectedCid = await cidForCbor(bytes)
const bytesResponse = await client.call('io.example.blobTest', {}, bytes)
expect(bytesResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports upload of empty txt file', async () => {
const txtFile = new Blob([], { type: 'text/plain' })
const expectedCid = await cidForCbor(await txtFile.arrayBuffer())
const fileResponse = await client.call('io.example.blobTest', {}, txtFile)
expect(fileResponse.data.cid).toEqual(expectedCid.toString())
})

// This does not work because the xrpc-server will add a json middleware
// regardless of the "input" definition. This is probably a behavior that
// should be fixed in the xrpc-server.
it.skip('supports upload of json data', async () => {
const jsonFile = new Blob([Buffer.from(`{"foo":"bar","baz":[3, null]}`)], {
type: 'application/json',
})
const expectedCid = await cidForCbor(await jsonFile.arrayBuffer())
const fileResponse = await client.call('io.example.blobTest', {}, jsonFile)
expect(fileResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports ArrayBufferView', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const bufferResponse = await client.call(
'io.example.blobTest',
{},
Buffer.from(bytes),
)
expect(bufferResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports Blob', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const blobResponse = await client.call(
'io.example.blobTest',
{},
new Blob([bytes], { type: 'application/octet-stream' }),
)
expect(blobResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports Blob without explicit type', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const blobResponse = await client.call(
'io.example.blobTest',
{},
new Blob([bytes]),
)
expect(blobResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports ReadableStream', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const streamResponse = await client.call(
'io.example.blobTest',
{},
// ReadableStream.from not available in node < 20
new ReadableStream({
pull(ctrl) {
ctrl.enqueue(bytes)
ctrl.close()
},
}),
)
expect(streamResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports blobs and compression', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)
Expand Down
29 changes: 15 additions & 14 deletions packages/xrpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
encodeMethodCallBody,
httpResponseCodeToEnum,
httpResponseBodyParse,
normalizeHeaders,
} from './util'
import {
FetchHandler,
Expand Down Expand Up @@ -72,11 +71,11 @@ export class ServiceClient {
}

setHeader(key: string, value: string): void {
this.headers[key] = value
this.headers[key.toLowerCase()] = value
}

unsetHeader(key: string): void {
delete this.headers[key]
delete this.headers[key.toLowerCase()]
}

async call(
Expand All @@ -94,13 +93,13 @@ export class ServiceClient {

const httpMethod = getMethodSchemaHTTPMethod(def)
const httpUri = constructMethodCallUri(methodNsid, def, this.uri, params)
const httpHeaders = constructMethodCallHeaders(def, data, {
headers: {
...this.headers,
...opts?.headers,
},
encoding: opts?.encoding,
})
const httpHeaders = constructMethodCallHeaders(def, data, opts)

for (const [k, v] of Object.entries(this.headers)) {
if (v != null && !Object.hasOwn(httpHeaders, k)) {
httpHeaders[k] = v
}
}

const res = await this.baseClient.fetch(
httpUri,
Expand Down Expand Up @@ -145,11 +144,10 @@ export async function defaultFetchHandler(
try {
// The duplex field is now required for streaming bodies, but not yet reflected
// anywhere in docs or types. See whatwg/fetch#1438, nodejs/node#46221.
const headers = normalizeHeaders(httpHeaders)
const reqInit: RequestInit & { duplex: string } = {
method: httpMethod,
headers,
body: encodeMethodCallBody(headers, httpReqBody),
headers: httpHeaders,
body: encodeMethodCallBody(httpHeaders, httpReqBody),
duplex: 'half',
}
const res = await fetch(httpUri, reqInit)
Expand All @@ -160,7 +158,10 @@ export async function defaultFetchHandler(
body: httpResponseBodyParse(res.headers.get('content-type'), resBody),
}
} catch (e) {
throw new XRPCError(ResponseType.Unknown, String(e))
if (e instanceof XRPCError) throw e
const err = new XRPCError(ResponseType.Unknown, String(e))
err.cause = e
throw err
}
}

Expand Down
Loading

0 comments on commit a966ee5

Please sign in to comment.