Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: performance in allocations and retained size #48

Merged
merged 3 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ jobs:
run: make build
- name: test
run: make test
- name: performance
run: make cheap-perf
- name: Publish
uses: menduz/oddish-action@master
with:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ test/codegen/*_pb.*
**/*.log

src/protocol/index.ts
test/codegen/client.ts
test/codegen/client.ts
test/benchmarks/compilated
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ build:
./node_modules/.bin/api-extractor run $(LOCAL_ARG) --typescript-compiler-folder ./node_modules/typescript

cheap-perf:
@time node_modules/.bin/ts-node test/bench.ts
./perf.sh

inspect:
node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
node --inspect-brk test/benchmarks/compilated/test/benchmarks/bench.js

integration-example:
@cd example; ./build.sh
@TS_NODE_PROJECT="example/tsconfig.json" node_modules/.bin/ts-node ./example/integration.ts

.PHONY: build test cheap-perf integration-example
.PHONY: build test cheap-perf integration-example inspect
8 changes: 8 additions & 0 deletions perf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
rm ./*.log || true
time node --prof test/benchmarks/compilated/test/benchmarks/bench.js
EXIT_CODE=$?
for f in *.log; do node --prof-process "$f"; done
exit $EXIT_CODE
26 changes: 18 additions & 8 deletions src/ack-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,38 @@ export type AckDispatcher = {
}

export function createAckHelper(transport: Transport): AckDispatcher {
const oneTimeCallbacks = new Map<string, (msg: StreamMessage) => void>()
const oneTimeCallbacks = new Map<string, [(msg: StreamMessage) => void, (err: Error) => void]>()

const bb = new Writer()

transport.on("close", () => {
const err = new Error("Transport closed while waiting the ACK")
oneTimeCallbacks.forEach(([_resolve, reject]) => reject(err))
oneTimeCallbacks.clear()
})

return {
receiveAck(data: StreamMessage, messageNumber: number) {
const key = `${messageNumber},${data.sequenceId}`
const fut = oneTimeCallbacks.get(key)
if (fut) {
fut(data)
oneTimeCallbacks.delete(key)
fut[0](data)
}
},
async sendWithAck(data: StreamMessage): Promise<StreamMessage> {
return new Promise<StreamMessage>((ret) => {
const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier)
oneTimeCallbacks.set(`${messageNumber},${data.sequenceId}`, ret)
const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier)
const key = `${messageNumber},${data.sequenceId}`

bb.reset()
StreamMessage.encode(data, bb)
transport.sendMessage(bb.finish())
const ret = new Promise<StreamMessage>(function ackPromise(ret, rej) {
oneTimeCallbacks.set(key, [ret, rej])
})

bb.reset()
StreamMessage.encode(data, bb)
transport.sendMessage(bb.finish())

return ret
},
}
}
29 changes: 13 additions & 16 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,31 @@ export function streamFromDispatcher(
dispatcher.removeListener(messageNumber)
}

function processMessage(message: StreamMessage, messageNumber: number) {
function sendAck() {
const closed = channel.isClosed()
if (!closed && !isRemoteClosed) {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
}
}

function processMessage(message: StreamMessage) {
lastReceivedSequenceId = message.sequenceId

if (message.closed) {
isRemoteClosed = true
channel.close()
} else {
const payload = message.payload
const portId = message.portId
channel
.push(payload)
.then(() => {
const closed = channel.isClosed()
if (!closed && !isRemoteClosed) {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, portId))
}
})
.catch(channel.failAndClose)
channel.push(message.payload).then(sendAck).catch(channel.failAndClose)
}
}

dispatcher.addListener(messageNumber, (reader) => {
const ret = parseProtocolMessage(reader)

if (ret) {
const [messageType, message, messageNumber] = ret
const [messageType, message] = ret
if (messageType == RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE) {
processMessage(message, messageNumber)
processMessage(message)
} else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) {
isRemoteClosed = true
channel.failAndClose(
Expand All @@ -162,7 +159,7 @@ export function streamFromDispatcher(
}
})

processMessage(streamMessage, messageNumber)
processMessage(streamMessage)

return channel.iterable
}
Expand Down Expand Up @@ -264,7 +261,7 @@ export async function createRpcClient(transport: Transport): Promise<RpcClient>

const port = await portFuture

transport.on('close', () => {
transport.on("close", () => {
port.close()
})

Expand Down
13 changes: 8 additions & 5 deletions src/protocol/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import {
DestroyPort,
} from "./index"

const bb = new Writer()
const EMPTY_U8A = Uint8Array.of()

export function closeStreamMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber),
sequenceId,
portId,
ack: false,
closed: true,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
},
bb
)
Expand All @@ -35,7 +38,7 @@ export function streamMessage(
portId: number,
payload: Uint8Array
): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber),
Expand All @@ -51,15 +54,15 @@ export function streamMessage(
}

export function streamAckMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_ACK, messageNumber),
sequenceId,
portId,
ack: true,
closed: false,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
},
bb
)
Expand Down
17 changes: 8 additions & 9 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type RpcServerState = {
portsByTransport: Map<Transport, Map<number, RpcServerPort<any>>>
}

const EMPTY_U8A = Uint8Array.from([])

/**
* @public
*/
Expand Down Expand Up @@ -265,7 +267,7 @@ export async function handleRequest<Context>(
const result = await port.callProcedure(request.procedureId, request.payload, context)
const response = Response.fromJSON({
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_RESPONSE, messageNumber),
payload: Uint8Array.from([]),
payload: EMPTY_U8A,
})

if (result instanceof Uint8Array) {
Expand All @@ -282,7 +284,7 @@ export async function handleRequest<Context>(
ack: false,
sequenceId: 0,
messageIdentifier: 0,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
portId: request.portId,
})

Expand All @@ -297,13 +299,10 @@ export async function handleRequest<Context>(
)
reusedStreamMessage.payload = elem
reusedStreamMessage.portId = request.portId
// we use Promise.race to react to the transport close events
const ret = await Promise.race([
ackDispatcher.sendWithAck(reusedStreamMessage),
new Promise<StreamMessage>((_, reject) =>
transport.on("close", () => reject(new Error("Transport closed while sending stream")))
),
])

// sendWithAck may fail if the transport is closed, effectively
// ending this iterator.
const ret = await ackDispatcher.sendWithAck(reusedStreamMessage)

if (ret.ack) {
continue
Expand Down
2 changes: 1 addition & 1 deletion src/transports/Memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export function MemoryTransport() {
return {
...sender,
sendMessage(message) {
receiver.emit("message", new Uint8Array(message))
receiver.emit("message", message)
},
close() {
if (!isClosed) {
Expand Down
73 changes: 57 additions & 16 deletions test/bench.ts → test/benchmarks/bench.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Suite } from "benchmark"
import * as helpers from "./helpers"
import { BookServiceDefinition } from "./codegen/client"
import { loadService, registerService } from "../src/codegen"
import * as helpers from "../helpers"
import { BookServiceDefinition } from "../codegen/client"
import { loadService, registerService } from "../../src/codegen"

const ITER_MULTIPLIER = 400

const books = [
{ author: "mr menduz", isbn: 1234, title: "1001 reasons to write your own OS" },
Expand All @@ -21,12 +23,12 @@ async function test() {
}
},
async *queryBooks(req) {
for (let i = 0; i < 100; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
yield* books
}
},
async *queryBooksNoAck(req) {
for (let i = 0; i < 100; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
yield* books
}
},
Expand All @@ -46,7 +48,7 @@ async function test() {
results.push(book)
}

if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length)
if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length)
else deferred.resolve()
}

Expand All @@ -57,42 +59,81 @@ async function test() {
results.push(book)
}

if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length)
if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length)
else deferred.resolve()
}

let memory: ReturnType<typeof process.memoryUsage> = process.memoryUsage()

function printMemory() {
const newMemory = process.memoryUsage()

function toMb(num: number) {
return (num / 1024 / 1024).toFixed(2) + "MB"
}

console.log(`
heapTotal: ${toMb(newMemory.heapTotal - memory.heapTotal)}
heapUsed: ${toMb(newMemory.heapUsed - memory.heapUsed)}
rss: ${toMb(newMemory.rss - memory.rss)}
arrayBuffers: ${toMb((newMemory as any).arrayBuffers - (memory as any).arrayBuffers)}
`)

memory = newMemory
}

suite
.add("GetBook", {
.add("PREWARM GetBook", {
defer: true,
async fn(deferred) {
for (let i = 0; i < 400; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
const ret = await service.getBook({ isbn: 1234 })
if (ret.isbn != 1234) deferred.reject(new Error("invalid number"))
if (ret.isbn != 1234) throw new Error("invalid number")
}

deferred.resolve()
},
})
.add("QueryBooks", {
.add("PREWARM QueryBooks", {
defer: true,
fn: benchBooks,
})
.add("QueryBooksNoAck", {
.add("QPREWARM ueryBooksNoAck", {
defer: true,
fn: benchBooksNoAck,
})
.add("QueryBooks 2", {
.add("QueryBooks", {
defer: true,
fn: benchBooks,
})
.add("QueryBooksNoAck 2", {
.add("GetBook", {
defer: true,
async fn(deferred) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
const ret = await service.getBook({ isbn: 1234 })
if (ret.isbn != 1234) throw new Error("invalid number")
}

deferred.resolve()
},
})
.add("QueryBooksNoAck", {
defer: true,
fn: benchBooksNoAck,
})
.on("cycle", function (event) {
console.log(String(event.target))

console.log("Relative mean error: ±" + event.target.stats.rme.toFixed(2) + "%")
if (event.target.stats.rme > 5 && !event.target.name.includes("PREWARM")) {
console.log("❌ FAILED, should be less than 5%")
process.exitCode = 1
}

printMemory()
})
.on("complete", function () {
console.log("Fastest is " + this.filter("fastest").map("name"))
.on("complete", function (event) {
printMemory()
})
.run({ async: true })
}
Expand Down
Loading