Skip to content

Commit

Permalink
fix: performance in allocations and retained size (#48)
Browse files Browse the repository at this point in the history
* fix performance

* fix benchmark

* remove extra log
  • Loading branch information
menduz authored May 24, 2022
1 parent 1b22d47 commit 1cc95bc
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 61 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ jobs:
run: make build
- name: test
run: make test
- name: performance
run: make cheap-perf
- name: Publish
uses: menduz/oddish-action@master
with:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ test/codegen/*_pb.*
**/*.log

src/protocol/index.ts
test/codegen/client.ts
test/codegen/client.ts
test/benchmarks/compilated
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ build:
./node_modules/.bin/api-extractor run $(LOCAL_ARG) --typescript-compiler-folder ./node_modules/typescript

cheap-perf:
@time node_modules/.bin/ts-node test/bench.ts
./perf.sh

inspect:
node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
node --inspect-brk test/benchmarks/compilated/test/benchmarks/bench.js

integration-example:
@cd example; ./build.sh
@TS_NODE_PROJECT="example/tsconfig.json" node_modules/.bin/ts-node ./example/integration.ts

.PHONY: build test cheap-perf integration-example
.PHONY: build test cheap-perf integration-example inspect
8 changes: 8 additions & 0 deletions perf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

node_modules/.bin/tsc -p test/benchmarks/tsconfig.json
rm ./*.log || true
time node --prof test/benchmarks/compilated/test/benchmarks/bench.js
EXIT_CODE=$?
for f in *.log; do node --prof-process "$f"; done
exit $EXIT_CODE
26 changes: 18 additions & 8 deletions src/ack-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,38 @@ export type AckDispatcher = {
}

export function createAckHelper(transport: Transport): AckDispatcher {
const oneTimeCallbacks = new Map<string, (msg: StreamMessage) => void>()
const oneTimeCallbacks = new Map<string, [(msg: StreamMessage) => void, (err: Error) => void]>()

const bb = new Writer()

transport.on("close", () => {
const err = new Error("Transport closed while waiting the ACK")
oneTimeCallbacks.forEach(([_resolve, reject]) => reject(err))
oneTimeCallbacks.clear()
})

return {
receiveAck(data: StreamMessage, messageNumber: number) {
const key = `${messageNumber},${data.sequenceId}`
const fut = oneTimeCallbacks.get(key)
if (fut) {
fut(data)
oneTimeCallbacks.delete(key)
fut[0](data)
}
},
async sendWithAck(data: StreamMessage): Promise<StreamMessage> {
return new Promise<StreamMessage>((ret) => {
const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier)
oneTimeCallbacks.set(`${messageNumber},${data.sequenceId}`, ret)
const [_, messageNumber] = parseMessageIdentifier(data.messageIdentifier)
const key = `${messageNumber},${data.sequenceId}`

bb.reset()
StreamMessage.encode(data, bb)
transport.sendMessage(bb.finish())
const ret = new Promise<StreamMessage>(function ackPromise(ret, rej) {
oneTimeCallbacks.set(key, [ret, rej])
})

bb.reset()
StreamMessage.encode(data, bb)
transport.sendMessage(bb.finish())

return ret
},
}
}
29 changes: 13 additions & 16 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,31 @@ export function streamFromDispatcher(
dispatcher.removeListener(messageNumber)
}

function processMessage(message: StreamMessage, messageNumber: number) {
function sendAck() {
const closed = channel.isClosed()
if (!closed && !isRemoteClosed) {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, streamMessage.portId))
}
}

function processMessage(message: StreamMessage) {
lastReceivedSequenceId = message.sequenceId

if (message.closed) {
isRemoteClosed = true
channel.close()
} else {
const payload = message.payload
const portId = message.portId
channel
.push(payload)
.then(() => {
const closed = channel.isClosed()
if (!closed && !isRemoteClosed) {
dispatcher.transport.sendMessage(streamAckMessage(messageNumber, lastReceivedSequenceId, portId))
}
})
.catch(channel.failAndClose)
channel.push(message.payload).then(sendAck).catch(channel.failAndClose)
}
}

dispatcher.addListener(messageNumber, (reader) => {
const ret = parseProtocolMessage(reader)

if (ret) {
const [messageType, message, messageNumber] = ret
const [messageType, message] = ret
if (messageType == RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE) {
processMessage(message, messageNumber)
processMessage(message)
} else if (messageType == RpcMessageTypes.RpcMessageTypes_REMOTE_ERROR_RESPONSE) {
isRemoteClosed = true
channel.failAndClose(
Expand All @@ -162,7 +159,7 @@ export function streamFromDispatcher(
}
})

processMessage(streamMessage, messageNumber)
processMessage(streamMessage)

return channel.iterable
}
Expand Down Expand Up @@ -264,7 +261,7 @@ export async function createRpcClient(transport: Transport): Promise<RpcClient>

const port = await portFuture

transport.on('close', () => {
transport.on("close", () => {
port.close()
})

Expand Down
13 changes: 8 additions & 5 deletions src/protocol/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import {
DestroyPort,
} from "./index"

const bb = new Writer()
const EMPTY_U8A = Uint8Array.of()

export function closeStreamMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber),
sequenceId,
portId,
ack: false,
closed: true,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
},
bb
)
Expand All @@ -35,7 +38,7 @@ export function streamMessage(
portId: number,
payload: Uint8Array
): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_MESSAGE, messageNumber),
Expand All @@ -51,15 +54,15 @@ export function streamMessage(
}

export function streamAckMessage(messageNumber: number, sequenceId: number, portId: number): Uint8Array {
const bb = new Writer()
bb.reset()
StreamMessage.encode(
{
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_STREAM_ACK, messageNumber),
sequenceId,
portId,
ack: true,
closed: false,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
},
bb
)
Expand Down
17 changes: 8 additions & 9 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type RpcServerState = {
portsByTransport: Map<Transport, Map<number, RpcServerPort<any>>>
}

const EMPTY_U8A = Uint8Array.from([])

/**
* @public
*/
Expand Down Expand Up @@ -265,7 +267,7 @@ export async function handleRequest<Context>(
const result = await port.callProcedure(request.procedureId, request.payload, context)
const response = Response.fromJSON({
messageIdentifier: calculateMessageIdentifier(RpcMessageTypes.RpcMessageTypes_RESPONSE, messageNumber),
payload: Uint8Array.from([]),
payload: EMPTY_U8A,
})

if (result instanceof Uint8Array) {
Expand All @@ -282,7 +284,7 @@ export async function handleRequest<Context>(
ack: false,
sequenceId: 0,
messageIdentifier: 0,
payload: Uint8Array.of(),
payload: EMPTY_U8A,
portId: request.portId,
})

Expand All @@ -297,13 +299,10 @@ export async function handleRequest<Context>(
)
reusedStreamMessage.payload = elem
reusedStreamMessage.portId = request.portId
// we use Promise.race to react to the transport close events
const ret = await Promise.race([
ackDispatcher.sendWithAck(reusedStreamMessage),
new Promise<StreamMessage>((_, reject) =>
transport.on("close", () => reject(new Error("Transport closed while sending stream")))
),
])

// sendWithAck may fail if the transport is closed, effectively
// ending this iterator.
const ret = await ackDispatcher.sendWithAck(reusedStreamMessage)

if (ret.ack) {
continue
Expand Down
2 changes: 1 addition & 1 deletion src/transports/Memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export function MemoryTransport() {
return {
...sender,
sendMessage(message) {
receiver.emit("message", new Uint8Array(message))
receiver.emit("message", message)
},
close() {
if (!isClosed) {
Expand Down
73 changes: 57 additions & 16 deletions test/bench.ts → test/benchmarks/bench.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Suite } from "benchmark"
import * as helpers from "./helpers"
import { BookServiceDefinition } from "./codegen/client"
import { loadService, registerService } from "../src/codegen"
import * as helpers from "../helpers"
import { BookServiceDefinition } from "../codegen/client"
import { loadService, registerService } from "../../src/codegen"

const ITER_MULTIPLIER = 400

const books = [
{ author: "mr menduz", isbn: 1234, title: "1001 reasons to write your own OS" },
Expand All @@ -21,12 +23,12 @@ async function test() {
}
},
async *queryBooks(req) {
for (let i = 0; i < 100; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
yield* books
}
},
async *queryBooksNoAck(req) {
for (let i = 0; i < 100; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
yield* books
}
},
Expand All @@ -46,7 +48,7 @@ async function test() {
results.push(book)
}

if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length)
if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length)
else deferred.resolve()
}

Expand All @@ -57,42 +59,81 @@ async function test() {
results.push(book)
}

if (results.length != 400) deferred.reject("Invalid number of results, got: " + results.length)
if (results.length != ITER_MULTIPLIER * 4) throw new Error("Invalid number of results, got: " + results.length)
else deferred.resolve()
}

let memory: ReturnType<typeof process.memoryUsage> = process.memoryUsage()

function printMemory() {
const newMemory = process.memoryUsage()

function toMb(num: number) {
return (num / 1024 / 1024).toFixed(2) + "MB"
}

console.log(`
heapTotal: ${toMb(newMemory.heapTotal - memory.heapTotal)}
heapUsed: ${toMb(newMemory.heapUsed - memory.heapUsed)}
rss: ${toMb(newMemory.rss - memory.rss)}
arrayBuffers: ${toMb((newMemory as any).arrayBuffers - (memory as any).arrayBuffers)}
`)

memory = newMemory
}

suite
.add("GetBook", {
.add("PREWARM GetBook", {
defer: true,
async fn(deferred) {
for (let i = 0; i < 400; i++) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
const ret = await service.getBook({ isbn: 1234 })
if (ret.isbn != 1234) deferred.reject(new Error("invalid number"))
if (ret.isbn != 1234) throw new Error("invalid number")
}

deferred.resolve()
},
})
.add("QueryBooks", {
.add("PREWARM QueryBooks", {
defer: true,
fn: benchBooks,
})
.add("QueryBooksNoAck", {
.add("QPREWARM ueryBooksNoAck", {
defer: true,
fn: benchBooksNoAck,
})
.add("QueryBooks 2", {
.add("QueryBooks", {
defer: true,
fn: benchBooks,
})
.add("QueryBooksNoAck 2", {
.add("GetBook", {
defer: true,
async fn(deferred) {
for (let i = 0; i < ITER_MULTIPLIER; i++) {
const ret = await service.getBook({ isbn: 1234 })
if (ret.isbn != 1234) throw new Error("invalid number")
}

deferred.resolve()
},
})
.add("QueryBooksNoAck", {
defer: true,
fn: benchBooksNoAck,
})
.on("cycle", function (event) {
console.log(String(event.target))

console.log("Relative mean error: ±" + event.target.stats.rme.toFixed(2) + "%")
if (event.target.stats.rme > 5 && !event.target.name.includes("PREWARM")) {
console.log("❌ FAILED, should be less than 5%")
process.exitCode = 1
}

printMemory()
})
.on("complete", function () {
console.log("Fastest is " + this.filter("fastest").map("name"))
.on("complete", function (event) {
printMemory()
})
.run({ async: true })
}
Expand Down
Loading

0 comments on commit 1cc95bc

Please sign in to comment.