diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f328b19..3cd330a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,8 @@ jobs: run: make build - name: test run: make test + - name: performance + run: make cheap-perf - name: Publish uses: menduz/oddish-action@master with: diff --git a/.gitignore b/.gitignore index de42666..1354cf3 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ test/codegen/*_pb.* **/*.log src/protocol/index.ts -test/codegen/client.ts \ No newline at end of file +test/codegen/client.ts +test/benchmarks/compilated \ No newline at end of file diff --git a/Makefile b/Makefile index 902b3e0..51131a6 100644 --- a/Makefile +++ b/Makefile @@ -62,10 +62,14 @@ build: ./node_modules/.bin/api-extractor run $(LOCAL_ARG) --typescript-compiler-folder ./node_modules/typescript cheap-perf: - @time node_modules/.bin/ts-node test/bench.ts + ./perf.sh + +inspect: + node_modules/.bin/tsc -p test/benchmarks/tsconfig.json + node --inspect-brk test/benchmarks/compilated/test/benchmarks/bench.js integration-example: @cd example; ./build.sh @TS_NODE_PROJECT="example/tsconfig.json" node_modules/.bin/ts-node ./example/integration.ts -.PHONY: build test cheap-perf integration-example +.PHONY: build test cheap-perf integration-example inspect diff --git a/perf.sh b/perf.sh new file mode 100755 index 0000000..af59cc1 --- /dev/null +++ b/perf.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +node_modules/.bin/tsc -p test/benchmarks/tsconfig.json +rm ./*.log || true +time node --prof test/benchmarks/compilated/test/benchmarks/bench.js +EXIT_CODE=$? +for f in *.log; do node --prof-process "$f"; done +exit $EXIT_CODE \ No newline at end of file diff --git a/src/ack-helper.ts b/src/ack-helper.ts index ed95e78..11e14b5 100644 --- a/src/ack-helper.ts +++ b/src/ack-helper.ts @@ -9,28 +9,38 @@ export type AckDispatcher = { } export function createAckHelper(transport: Transport): AckDispatcher { - const oneTimeCallbacks = new Map void>() + const oneTimeCallbacks = new Map void, (err: Error) => void]>() const bb = new Writer() + transport.on("close", () => { + const err = new Error("Transport closed while waiting the ACK") + oneTimeCallbacks.forEach(([_resolve, reject]) => reject(err)) + oneTimeCallbacks.clear() + }) + return { receiveAck(data: StreamMessage, messageNumber: number) { const key = `${messageNumber},${data.sequenceId}` const fut = oneTimeCallbacks.get(key) if (fut) { - fut(data) oneTimeCallbacks.delete(key) + fut[0](data) } }, async sendWithAck(data: StreamMessage): Promise { - return new Promise((ret) => { - const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier) - oneTimeCallbacks.set(`${messageNumber},${data.sequenceId}`, ret) + const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier) + const key = `${messageNumber},${data.sequenceId}` - bb.reset() - StreamMessage.encode(data, bb) - transport.sendMessage(bb.finish()) + const ret = new Promise(function ackPromise(ret, rej) { + oneTimeCallbacks.set(key, [ret, rej]) }) + + bb.reset() + StreamMessage.encode(data, bb) + transport.sendMessage(bb.finish()) + + return ret }, } } diff --git a/src/client.ts b/src/client.ts index 74c6ecf..d9fa55b 100644 --- a/src/client.ts +++ b/src/client.ts @@ -121,24 +121,21 @@ export function streamFromDispatcher( dispatcher.removeListener(messageNumber) } - function processMessage(message: StreamMessage, messageNumber: number) { + function sendAck() { + const closed = channel.isClosed() + if (!closed && !isRemoteClosed) { + dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId)) + } + } + + function processMessage(message: StreamMessage) { lastReceivedSequenceId = message.sequenceId if (message.closed) { isRemoteClosed = true channel.close() } else { - const payload = message.payload - const portId = message.portId - channel - .push(payload) - .then(() => { - const closed = channel.isClosed() - if (!closed && !isRemoteClosed) { - dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, portId)) - } - }) - .catch(channel.failAndClose) + channel.push(message.payload).then(sendAck).catch(channel.failAndClose) } } @@ -146,9 +143,9 @@ export function streamFromDispatcher( const ret = parseProtocolMessage(reader) if (ret) { - const [messageType, message, messageNumber] = ret + const [messageType, message] = ret if (messageType == RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE) { - processMessage(message, messageNumber) + processMessage(message) } else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) { isRemoteClosed = true channel.failAndClose( @@ -162,7 +159,7 @@ export function streamFromDispatcher( } }) - processMessage(streamMessage, messageNumber) + processMessage(streamMessage) return channel.iterable } @@ -264,7 +261,7 @@ export async function createRpcClient(transport: Transport): Promise const port = await portFuture - transport.on('close', () => { + transport.on("close", () => { port.close() }) diff --git a/src/protocol/helpers.ts b/src/protocol/helpers.ts index d3a1548..a2fb5d0 100644 --- a/src/protocol/helpers.ts +++ b/src/protocol/helpers.ts @@ -13,8 +13,11 @@ import { DestroyPort, } from "./index" +const bb = new Writer() +const EMPTY_U8A = Uint8Array.of() + export function closeStreamMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array { - const bb = new Writer() + bb.reset() StreamMessage.encode( { messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber), @@ -22,7 +25,7 @@ export function closeStreamMessage(messageNumber: number, sequenceId: number, po portId, ack: false, closed: true, - payload: Uint8Array.of(), + payload: EMPTY_U8A, }, bb ) @@ -35,7 +38,7 @@ export function streamMessage( portId: number, payload: Uint8Array ): Uint8Array { - const bb = new Writer() + bb.reset() StreamMessage.encode( { messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber), @@ -51,7 +54,7 @@ export function streamMessage( } export function streamAckMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array { - const bb = new Writer() + bb.reset() StreamMessage.encode( { messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_ACK, messageNumber), @@ -59,7 +62,7 @@ export function streamAckMessage(messageNumber: number, sequenceId: number, port portId, ack: true, closed: false, - payload: Uint8Array.of(), + payload: EMPTY_U8A, }, bb ) diff --git a/src/server.ts b/src/server.ts index 790787a..ef10a53 100644 --- a/src/server.ts +++ b/src/server.ts @@ -36,6 +36,8 @@ type RpcServerState = { portsByTransport: Map>> } +const EMPTY_U8A = Uint8Array.from([]) + /** * @public */ @@ -265,7 +267,7 @@ export async function handleRequest( const result = await port.callProcedure(request.procedureId, request.payload, context) const response = Response.fromJSON({ messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_RESPONSE, messageNumber), - payload: Uint8Array.from([]), + payload: EMPTY_U8A, }) if (result instanceof Uint8Array) { @@ -282,7 +284,7 @@ export async function handleRequest( ack: false, sequenceId: 0, messageIdentifier: 0, - payload: Uint8Array.of(), + payload: EMPTY_U8A, portId: request.portId, }) @@ -297,13 +299,10 @@ export async function handleRequest( ) reusedStreamMessage.payload = elem reusedStreamMessage.portId = request.portId - // we use Promise.race to react to the transport close events - const ret = await Promise.race([ - ackDispatcher.sendWithAck(reusedStreamMessage), - new Promise((_, reject) => - transport.on("close", () => reject(new Error("Transport closed while sending stream"))) - ), - ]) + + // sendWithAck may fail if the transport is closed, effectively + // ending this iterator. + const ret = await ackDispatcher.sendWithAck(reusedStreamMessage) if (ret.ack) { continue diff --git a/src/transports/Memory.ts b/src/transports/Memory.ts index 5e16ab5..03d3193 100644 --- a/src/transports/Memory.ts +++ b/src/transports/Memory.ts @@ -10,7 +10,7 @@ export function MemoryTransport() { return { ...sender, sendMessage(message) { - receiver.emit("message", new Uint8Array(message)) + receiver.emit("message", message) }, close() { if (!isClosed) { diff --git a/test/bench.ts b/test/benchmarks/bench.ts similarity index 51% rename from test/bench.ts rename to test/benchmarks/bench.ts index 340c67f..d4526ef 100644 --- a/test/bench.ts +++ b/test/benchmarks/bench.ts @@ -1,7 +1,9 @@ import { Suite } from "benchmark" -import * as helpers from "./helpers" -import { BookServiceDefinition } from "./codegen/client" -import { loadService, registerService } from "../src/codegen" +import * as helpers from "../helpers" +import { BookServiceDefinition } from "../codegen/client" +import { loadService, registerService } from "../../src/codegen" + +const ITER_MULTIPLIER = 400 const books = [ { author: "mr menduz", isbn: 1234, title: "1001 reasons to write your own OS" }, @@ -21,12 +23,12 @@ async function test() { } }, async *queryBooks(req) { - for (let i = 0; i < 100; i++) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { yield* books } }, async *queryBooksNoAck(req) { - for (let i = 0; i < 100; i++) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { yield* books } }, @@ -46,7 +48,7 @@ async function test() { results.push(book) } - if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length) + if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) else deferred.resolve() } @@ -57,42 +59,81 @@ async function test() { results.push(book) } - if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length) + if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length) else deferred.resolve() } + let memory: ReturnType = process.memoryUsage() + + function printMemory() { + const newMemory = process.memoryUsage() + + function toMb(num: number) { + return (num / 1024 / 1024).toFixed(2) + "MB" + } + + console.log(` + heapTotal: ${toMb(newMemory.heapTotal - memory.heapTotal)} + heapUsed: ${toMb(newMemory.heapUsed - memory.heapUsed)} + rss: ${toMb(newMemory.rss - memory.rss)} + arrayBuffers: ${toMb((newMemory as any).arrayBuffers - (memory as any).arrayBuffers)} + `) + + memory = newMemory + } + suite - .add("GetBook", { + .add("PREWARM GetBook", { defer: true, async fn(deferred) { - for (let i = 0; i < 400; i++) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { const ret = await service.getBook({ isbn: 1234 }) - if (ret.isbn != 1234) deferred.reject(new Error("invalid number")) + if (ret.isbn != 1234) throw new Error("invalid number") } + deferred.resolve() }, }) - .add("QueryBooks", { + .add("PREWARM QueryBooks", { defer: true, fn: benchBooks, }) - .add("QueryBooksNoAck", { + .add("QPREWARM ueryBooksNoAck", { defer: true, fn: benchBooksNoAck, }) - .add("QueryBooks 2", { + .add("QueryBooks", { defer: true, fn: benchBooks, }) - .add("QueryBooksNoAck 2", { + .add("GetBook", { + defer: true, + async fn(deferred) { + for (let i = 0; i < ITER_MULTIPLIER; i++) { + const ret = await service.getBook({ isbn: 1234 }) + if (ret.isbn != 1234) throw new Error("invalid number") + } + + deferred.resolve() + }, + }) + .add("QueryBooksNoAck", { defer: true, fn: benchBooksNoAck, }) .on("cycle", function (event) { console.log(String(event.target)) + + console.log("Relative mean error: ±" + event.target.stats.rme.toFixed(2) + "%") + if (event.target.stats.rme > 5 && !event.target.name.includes("PREWARM")) { + console.log("❌ FAILED, should be less than 5%") + process.exitCode = 1 + } + + printMemory() }) - .on("complete", function () { - console.log("Fastest is " + this.filter("fastest").map("name")) + .on("complete", function (event) { + printMemory() }) .run({ async: true }) } diff --git a/test/benchmarks/tsconfig.json b/test/benchmarks/tsconfig.json new file mode 100644 index 0000000..11da6fc --- /dev/null +++ b/test/benchmarks/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2020", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */ + "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ + "declaration": true, /* Generates corresponding '.d.ts' file. */ + "declarationMap": true, /* Generates a sourcemap for each corresponding '.d.ts' file. */ + "sourceMap": true, /* Generates corresponding '.map' file. */ + "types": [ + "node", + "jest" + ], + "outDir": "./compilated", + "esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */ + "skipLibCheck": true, /* Skip type checking of declaration files. */ + "forceConsistentCasingInFileNames": true, /* Disallow inconsistently-cased references to the same file. */ + } +} \ No newline at end of file diff --git a/test/helpers.ts b/test/helpers.ts index 076c37d..33e9113 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -83,3 +83,7 @@ export function createSimpleTestEnvironment(handler: RpcServerHa start } } + +export function delay(ms: number) { + return new Promise((ret) => setTimeout(ret, ms)) +} diff --git a/test/push-channel.spec.ts b/test/push-channel.spec.ts index 116fcf2..9594e4d 100644 --- a/test/push-channel.spec.ts +++ b/test/push-channel.spec.ts @@ -1,3 +1,4 @@ +import mitt from "mitt" import { pushableChannel } from "../src/push-channel" import { takeAsync } from "./helpers" @@ -29,7 +30,10 @@ describe.only("push channel", () => { }) it("break in the iterator closes the channel", async () => { - const chan = pushableChannel(() => void 0) + let closedCalled = false + const chan = pushableChannel(() => { + closedCalled = true + }) expect(chan.isClosed()).toEqual(false) void chan.push(0) @@ -42,6 +46,37 @@ describe.only("push channel", () => { } expect(chan.isClosed()).toEqual(true) + expect(closedCalled).toEqual(true) + }) + + it("breaking the channel as generator should finish execution", async () => { + let closedCalled = false + let didCompleteTestFunction = false + const events = mitt() + + async function* test() { + const chan = pushableChannel(() => { + closedCalled = true + events.off("*", chan.push) + }) + events.on("*", chan.push) + for await (const num of chan) { + yield num + } + } + + const ret = takeAsync(test(), 3) + + await new Promise((ret) => setTimeout(ret, 100)) + + void events.emit("a", 0) + void events.emit("a", 0) + void events.emit("a", 0) + + await new Promise((ret) => setTimeout(ret, 100)) + + expect(await ret).toEqual(["a", "a", "a"]) + expect(closedCalled).toEqual(true) }) it("it works as a job queue", async () => { @@ -155,7 +190,7 @@ describe.only("push channel", () => { }) it("generator yield basic case", async () => { - const chan = pushableChannel(() => void 0) + let chan = pushableChannel(() => void 0) let values: number[] = [] async function* generator() { @@ -172,6 +207,8 @@ describe.only("push channel", () => { await chan.push(3) }, 10) + expect(chan.isClosed()).toEqual(false) + await expect(async () => { for await (const val of generator()) { values.push(val) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 5830857..939897a 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -1,6 +1,8 @@ import { RpcClient } from "../src" import { log } from "./logger" -import { createSimpleTestEnvironment } from "./helpers" +import { createSimpleTestEnvironment, delay } from "./helpers" +import { pushableChannel } from "../src/push-channel" +import mitt from "mitt" async function testPort(rpcClient: RpcClient, portName: string) { log(`> Creating Port ${portName}`) @@ -31,6 +33,8 @@ async function testPort(rpcClient: RpcClient, portName: string) { describe("Helpers simple req/res", () => { let remoteCallCounter = 0 + const events = mitt<{ a: Uint8Array }>() + let channel: ReturnType const testEnv = createSimpleTestEnvironment(async function (port) { log(`! Initializing port ${port.portId} ${port.portName}`) port.registerModule("echo", async (port) => ({ @@ -56,6 +60,22 @@ describe("Helpers simple req/res", () => { yield new Uint8Array([counter % 0xff]) } }, + async *manualHackWithPushableChannel() { + channel = pushableChannel(() => deferCloseChannel) + // subscribe to room message + events.on("a", channel.push) + // forward all messages + for await (const message of channel) { + yield message as Uint8Array + } + + // then close the channel + channel.close() + + function deferCloseChannel() { + events.off("a", channel.push) + } + }, async *parameterCounter(data) { let total = data[0] while (total > 0) { @@ -158,4 +178,55 @@ describe("Helpers simple req/res", () => { expect(remoteCallCounter).toEqual(localCallCounter) }) + + it("a remote infiniteCounter is halted IF the transport is forcefully closed", async () => { + const { rpcClient, transportServer } = await testEnv.start() + const port = await rpcClient.createPort("test1") + const module = (await port.loadModule("echo")) as { + infiniteCounter(): Promise> + } + const values: Uint8Array[] = [] + const FINAL_RESULT = new Uint8Array([1, 2, 3]) + let localCallCounter = 0 + remoteCallCounter = 0 + + await expect(async () => { + for await (const u8a of await module.infiniteCounter()) { + values.push(u8a) + localCallCounter++ + if (localCallCounter == FINAL_RESULT.length) { + transportServer.close() + } + } + }).rejects.toThrow("RPC Transport closed") + + expect(new Uint8Array(Buffer.concat(values))).toEqual(FINAL_RESULT) + + expect(remoteCallCounter).toEqual(localCallCounter) + }) + + it("a remote manualHackWithPushableChannel is gracefully stopped from client side on third iteration", async () => { + const { rpcClient } = await testEnv.start() + const port = await rpcClient.createPort("test1") + const module = (await port.loadModule("echo")) as { + manualHackWithPushableChannel(): Promise> + } + + async function test() { + for await (const u8a of await module.manualHackWithPushableChannel()) { + expect(channel.isClosed()).toEqual(false) + return u8a + } + } + + const ret = test() + + await delay(100) + + events.emit("a", new Uint8Array([1])) + expect(await ret).toEqual(new Uint8Array([1])) + + await delay(100) + expect(channel.isClosed()).toEqual(true) + }) })