Skip to content

Commit

Permalink
✅ fix(xrpc-server): properly parse & process content-encoding (#2464)
Browse files Browse the repository at this point in the history
* fix(xrpc-server): properly parse & process content-encoding

* Minor optimization

* code style
  • Loading branch information
matthieusieben committed Sep 11, 2024
1 parent bcefbdb commit 98711a1
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 57 deletions.
5 changes: 5 additions & 0 deletions .changeset/new-mice-cheer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/common": patch
---

Minor optimization
5 changes: 5 additions & 0 deletions .changeset/perfect-radios-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/xrpc-server": patch
---

Properly decode request body encoding
5 changes: 5 additions & 0 deletions .changeset/strange-dragons-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/xrpc": patch
---

Add UnsupportedMediaType response type
11 changes: 5 additions & 6 deletions packages/common/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import {
} from 'stream'

export const forwardStreamErrors = (...streams: Stream[]) => {
for (let i = 0; i < streams.length; ++i) {
const stream = streams[i]
const next = streams[i + 1]
if (next) {
stream.once('error', (err) => next.emit('error', err))
}
for (let i = 1; i < streams.length; ++i) {
const prev = streams[i - 1]
const next = streams[i]

prev.once('error', (err) => next.emit('error', err))
}
}

Expand Down
8 changes: 7 additions & 1 deletion packages/xrpc-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ export * from './stream'
export * from './rate-limiter'

export type { ServerTiming } from './util'
export { parseReqNsid, serverTimingHeader, ServerTimer } from './util'
export {
createDecoders,
createDecoder,
parseReqNsid,
serverTimingHeader,
ServerTimer,
} from './util'
5 changes: 0 additions & 5 deletions packages/xrpc-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,6 @@ export class Server {
}
const input = validateReqInput(req)

if (input?.body instanceof Readable) {
// If the body stream errors at any time, abort the request
input.body.once('error', next)
}

const locals: RequestLocals = req[kRequestLocals]

const reqCtx: XRPCReqContext = {
Expand Down
100 changes: 76 additions & 24 deletions packages/xrpc-server/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'assert'
import { Readable, Transform } from 'stream'
import { IncomingMessage } from 'http'
import { createDeflate, createGunzip } from 'zlib'
import assert from 'node:assert'
import { Duplex, PassThrough, pipeline, Readable } from 'node:stream'
import { IncomingMessage } from 'node:http'
import * as zlib from 'node:zlib'
import express from 'express'
import mime from 'mime-types'
import {
Expand All @@ -11,7 +11,9 @@ import {
LexXrpcQuery,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { forwardStreamErrors, MaxSizeChecker } from '@atproto/common'
import { MaxSizeChecker } from '@atproto/common'
import { ResponseType } from '@atproto/xrpc'

import {
UndecodedParams,
Params,
Expand Down Expand Up @@ -237,40 +239,90 @@ function decodeBodyStream(
req: express.Request,
maxSize: number | undefined,
): Readable {
let stream: Readable = req
const contentEncoding = req.headers['content-encoding']
const contentLength = req.headers['content-length']

const contentLengthParsed = contentLength
? parseInt(contentLength, 10)
: undefined

if (Number.isNaN(contentLengthParsed)) {
throw new XRPCError(ResponseType.InvalidRequest, 'invalid content-length')
}

if (
maxSize !== undefined &&
contentLength &&
parseInt(contentLength, 10) > maxSize
contentLengthParsed !== undefined &&
contentLengthParsed > maxSize
) {
throw new XRPCError(413, 'request entity too large')
}

let decoder: Transform | undefined
if (contentEncoding === 'gzip') {
decoder = createGunzip()
} else if (contentEncoding === 'deflate') {
decoder = createDeflate()
throw new XRPCError(
ResponseType.PayloadTooLarge,
'request entity too large',
)
}

if (decoder) {
forwardStreamErrors(stream, decoder)
stream = stream.pipe(decoder)
}
const transforms: Duplex[] = createDecoders(contentEncoding)

if (maxSize !== undefined) {
const maxSizeChecker = new MaxSizeChecker(
maxSize,
() => new XRPCError(413, 'request entity too large'),
() =>
new XRPCError(ResponseType.PayloadTooLarge, 'request entity too large'),
)
forwardStreamErrors(stream, maxSizeChecker)
stream = stream.pipe(maxSizeChecker)
transforms.push(maxSizeChecker)
}

return stream
return transforms.length > 0
? (pipeline([req, ...transforms], () => {}) as Duplex)
: req
}

export function createDecoders(contentEncoding?: string | string[]): Duplex[] {
return parseContentEncoding(contentEncoding).reverse().map(createDecoder)
}

export function parseContentEncoding(
contentEncoding?: string | string[],
): string[] {
// undefined, empty string, and empty array
if (!contentEncoding?.length) return []

// Non empty string
if (typeof contentEncoding === 'string') {
return contentEncoding
.split(',')
.map((x) => x.trim().toLowerCase())
.filter((x) => x && x !== 'identity')
}

// content-encoding should never be an array
return contentEncoding.flatMap(parseContentEncoding)
}

export function createDecoder(encoding: string): Duplex {
// https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1
// > All content-coding values are case-insensitive...
switch (encoding.trim().toLowerCase()) {
// https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2
case 'gzip':
case 'x-gzip':
return zlib.createGunzip({
// using Z_SYNC_FLUSH (cURL default) to be less strict when decoding
flush: zlib.constants.Z_SYNC_FLUSH,
finishFlush: zlib.constants.Z_SYNC_FLUSH,
})
case 'deflate':
return zlib.createInflate()
case 'br':
return zlib.createBrotliDecompress()
case 'identity':
return new PassThrough()
default:
throw new XRPCError(
ResponseType.UnsupportedMediaType,
'unsupported content-encoding',
)
}
}

export function serverTimingHeader(timings: ServerTiming[]) {
Expand Down
141 changes: 120 additions & 21 deletions packages/xrpc-server/tests/bodies.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as http from 'http'
import { Readable } from 'stream'
import { gzipSync } from 'zlib'
import { brotliCompressSync, deflateSync, gzipSync } from 'zlib'
import getPort from 'get-port'
import { LexiconDoc } from '@atproto/lexicon'
import { XrpcClient } from '@atproto/xrpc'
import { ResponseType, XrpcClient } from '@atproto/xrpc'
import { cidForCbor } from '@atproto/common'
import { randomBytes } from '@atproto/crypto'
import { createServer, closeServer } from './_util'
Expand Down Expand Up @@ -91,13 +91,25 @@ const BLOB_LIMIT = 5000
async function consumeInput(
input: Readable | string | object,
): Promise<Buffer> {
if (typeof input === 'string') return Buffer.from(input)
if (Buffer.isBuffer(input)) {
return input
}
if (typeof input === 'string') {
return Buffer.from(input)
}
if (input instanceof Readable) {
const buffers: Buffer[] = []
for await (const data of input) {
buffers.push(data)
try {
return Buffer.concat(await input.toArray())
} catch (err) {
if (err instanceof xrpcServer.XRPCError) {
throw err
} else {
throw new xrpcServer.XRPCError(
ResponseType.InvalidRequest,
'unable to read input',
)
}
}
return Buffer.concat(buffers)
}
throw new Error('Invalid input')
}
Expand All @@ -111,10 +123,16 @@ describe('Bodies', () => {
})
server.method(
'io.example.validationTest',
(ctx: { params: xrpcServer.Params; input?: xrpcServer.HandlerInput }) => ({
encoding: 'json',
body: ctx.input?.body,
}),
(ctx: { params: xrpcServer.Params; input?: xrpcServer.HandlerInput }) => {
if (ctx.input?.body instanceof Readable) {
throw new Error('Input is readable')
}

return {
encoding: 'json',
body: ctx.input?.body ?? null,
}
},
)
server.method('io.example.validationTestTwo', () => ({
encoding: 'json',
Expand Down Expand Up @@ -338,32 +356,115 @@ describe('Bodies', () => {
expect(streamResponse.data.cid).toEqual(expectedCid.toString())
})

it('supports blobs and compression', async () => {
it('supports blob uploads', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data } = await client.call('io.example.blobTest', {}, bytes, {
encoding: 'application/octet-stream',
})
expect(data.cid).toEqual(expectedCid.toString())
})

it(`supports identity encoding`, async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data } = await client.call('io.example.blobTest', {}, bytes, {
encoding: 'application/octet-stream',
headers: { 'content-encoding': 'identity' },
})
expect(data.cid).toEqual(expectedCid.toString())
})

it('supports gzip encoding', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data: uncompressed } = await client.call(
const { data } = await client.call(
'io.example.blobTest',
{},
bytes,
gzipSync(bytes),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'gzip',
},
},
)
expect(uncompressed.cid).toEqual(expectedCid.toString())
expect(data.cid).toEqual(expectedCid.toString())
})

const { data: compressed } = await client.call(
it('supports deflate encoding', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data } = await client.call(
'io.example.blobTest',
{},
gzipSync(bytes),
deflateSync(bytes),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'deflate',
},
},
)
expect(data.cid).toEqual(expectedCid.toString())
})

it('supports br encoding', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data } = await client.call(
'io.example.blobTest',
{},
brotliCompressSync(bytes),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'br',
},
},
)
expect(data.cid).toEqual(expectedCid.toString())
})

it('supports multiple encodings', async () => {
const bytes = randomBytes(1024)
const expectedCid = await cidForCbor(bytes)

const { data } = await client.call(
'io.example.blobTest',
{},
brotliCompressSync(deflateSync(gzipSync(bytes))),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'gzip, identity, deflate, identity, br, identity',
},
},
)
expect(data.cid).toEqual(expectedCid.toString())
})

it('fails gracefully on invalid encodings', async () => {
const bytes = randomBytes(1024)

const promise = client.call(
'io.example.blobTest',
{},
brotliCompressSync(bytes),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'gzip',
},
},
)
expect(compressed.cid).toEqual(expectedCid.toString())

await expect(promise).rejects.toThrow('unable to read input')
})

it('supports empty payload', async () => {
Expand Down Expand Up @@ -428,9 +529,7 @@ describe('Bodies', () => {
})
})

// @TODO: figure out why this is failing dependent on the prev test being run
// https://github.com/bluesky-social/atproto/pull/550/files#r1106400413
it.skip('errors on an empty Content-type on blob upload', async () => {
it('errors on an empty Content-type on blob upload', async () => {
// empty mimetype, but correct syntax
const res = await fetch(`${url}/xrpc/io.example.blobTest`, {
method: 'post',
Expand Down
Loading

0 comments on commit 98711a1

Please sign in to comment.