Skip to content

Commit

Permalink
TSK-742: Use partial binary protocol with ability on/off (#3153)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo authored May 10, 2023
1 parent 4e70eae commit 84b7df4
Show file tree
Hide file tree
Showing 58 changed files with 837 additions and 692 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ pods/front/dist
*.cpuprofile
*.pyc
metrics.txt
dev/tool/report.csv
dev/tool/report*.csv
2 changes: 1 addition & 1 deletion common/config/rush/build-cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* See https://rushjs.io/pages/maintainer/build_cache/ for details about this experimental feature.
*/
"buildCacheEnabled": true,
"buildCacheEnabled": false,

/**
* (Required) Choose where project build outputs will be cached.
Expand Down
381 changes: 205 additions & 176 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

12 changes: 3 additions & 9 deletions dev/account/src/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@
// limitations under the License.
//

import type { Request, Response } from '@hcengineering/platform'
import platform, { Severity, Status } from '@hcengineering/platform'

import { getWorkspaceId } from '@hcengineering/core'
import { generateToken } from '@hcengineering/server-token'

interface LoginInfo {
token: string
endpoint: string
}

function login (endpoint: string, email: string, password: string, workspace: string): Response<LoginInfo> {
function login (endpoint: string, email: string, password: string, workspace: string): any {
if (email !== 'rosamund@hc.engineering' && email !== 'elon@hc.engineering') {
return { error: new Status(Severity.ERROR, platform.status.Unauthorized, {}) }
}
Expand All @@ -42,9 +36,9 @@ function login (endpoint: string, email: string, password: string, workspace: st
return { result: { token, endpoint } }
}

export function handleRequest (req: Request<any[]>, serverEndpoint: string): Response<any> {
export function handleRequest (req: any, serverEndpoint: string): any {
if (req.method === 'login') {
return login(serverEndpoint, ...(req as Request<[string, string, string]>).params)
return login(serverEndpoint, req.params[0], req.params[1], req.params[2])
}

return { error: new Status(Severity.ERROR, platform.status.BadRequest, {}) }
Expand Down
3 changes: 2 additions & 1 deletion dev/client-resources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@hcengineering/dev-storage": "^0.6.6",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/model-all": "^0.6.0",
"@hcengineering/devmodel": "^0.6.0"
"@hcengineering/devmodel": "^0.6.0",
"@hcengineering/rpc": "^0.6.0"
}
}
17 changes: 13 additions & 4 deletions dev/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
//

import {
import core, {
Class,
ClientConnection,
Doc,
Expand All @@ -27,13 +27,15 @@ import {
MeasureMetricsContext,
Ref,
ServerStorage,
Timestamp,
Tx,
TxHandler,
TxResult
} from '@hcengineering/core'
import { createInMemoryTxAdapter } from '@hcengineering/dev-storage'
import devmodel from '@hcengineering/devmodel'
import { protoDeserialize, protoSerialize, setMetadata } from '@hcengineering/platform'
import { setMetadata } from '@hcengineering/platform'
import { protoDeserialize, protoSerialize } from '@hcengineering/rpc'
import {
ContentTextAdapter,
createInMemoryAdapter,
Expand All @@ -52,12 +54,19 @@ class ServerStorageWrapper implements ClientConnection {
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const [c, q, o] = protoDeserialize(protoSerialize([_class, query, options]))
const [c, q, o] = protoDeserialize(protoSerialize([_class, query, options], false), false)
return this.storage.findAll(this.measureCtx, c, q, o)
}

async loadModel (lastModelTx: Timestamp): Promise<Tx[]> {
return await this.storage.findAll(this.measureCtx, core.class.Tx, {
space: core.space.Model,
modifiedOn: { $gt: lastModelTx }
})
}

async tx (tx: Tx): Promise<TxResult> {
const _tx = protoDeserialize(protoSerialize(tx))
const _tx = protoDeserialize(protoSerialize(tx, false), false)
const [result, derived] = await this.storage.tx(this.measureCtx, _tx)
for (const tx of derived) {
this.handler(tx)
Expand Down
5 changes: 5 additions & 0 deletions dev/prod/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,10 @@ export async function configurePlatform() {
setMetadata(client.metadata.FilterModel, true)
setMetadata(client.metadata.ExtraPlugins, ['preference' as Plugin])

// Use binary response transfer for faster performance and small transfer sizes.
setMetadata(client.metadata.UseBinaryProtocol, true)
// Disable for now, since it causes performance issues on linux/docker/kubernetes boxes for now.
setMetadata(client.metadata.UseProtocolCompression, false)

setMetadata(workbench.metadata.PlatformTitle, 'Platform')
}
3 changes: 1 addition & 2 deletions dev/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ export async function start (port: number, host?: string): Promise<void> {
sessionFactory: (token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
port,
productId: '',
serverFactory: startHttpServer,
chunking: -1
serverFactory: startHttpServer
})
}
2 changes: 1 addition & 1 deletion dev/tool/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"docker:build": "docker build -t hardcoreeng/tool .",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool",
"run-local": "cross-env SERVER_SECRET=secret MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 node -r ts-node/register --max-old-space-size=8000 ./src/__start.ts",
"run-local": "cross-env SERVER_SECRET=secret MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 node -r ts-node/register --max-old-space-size=18000 ./src/__start.ts",
"run": "cross-env node -r ts-node/register --max-old-space-size=8000 ./src/__start.ts",
"upgrade": "rushx run-local upgrade",
"lint": "eslint src",
Expand Down
123 changes: 98 additions & 25 deletions dev/tool/src/benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@

import contact from '@hcengineering/contact'
import core, {
Account,
BackupClient,
ClassifierKind,
Client,
DOMAIN_TX,
Doc,
DocumentUpdate,
MeasureMetricsContext,
Ref,
TxOperations,
WorkspaceId,
generateId,
metricsToString,
newMetrics,
systemAccountEmail
} from '@hcengineering/core'
import { RateLimitter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'

import client from '@hcengineering/client'
import { setMetadata } from '@hcengineering/platform'
import os from 'os'
import { Worker, isMainThread, parentPort } from 'worker_threads'
import { CSVWriter } from './csv'
Expand All @@ -45,8 +53,12 @@ interface StartMessage {
min: number
rand: number
}
// If enabled, will perform write tx for same values and Derived format.
write: boolean
sleep: number
}
binary: boolean
compression: boolean
}
interface Msg {
type: 'complete' | 'operate'
Expand All @@ -62,16 +74,21 @@ interface CompleteMsg extends Msg {
// workId: string
// }

const benchAccount = (core.account.System + '_benchmark') as Ref<Account>

export async function benchmark (
workspaceId: WorkspaceId[],
transactorUrl: string,
cmd: { from: number, steps: number, sleep: number }
cmd: { from: number, steps: number, sleep: number, binary: boolean, write: boolean, compression: boolean }
): Promise<void> {
let operating = 0
const workers: Worker[] = []

const works = new Map<string, () => void>()

setMetadata(client.metadata.UseBinaryProtocol, cmd.binary)
setMetadata(client.metadata.UseProtocolCompression, cmd.compression)

os.cpus().forEach(() => {
/* Spawn a new thread running this source file */
const worker = new Worker(__filename)
Expand Down Expand Up @@ -134,29 +151,55 @@ export async function benchmark (
const token = generateToken(systemAccountEmail, workspaceId[0])

const monitorConnection = isMainThread
? await ctx.with('connect', {}, async () => await connect(transactorUrl, workspaceId[0], undefined))
? ((await ctx.with(
'connect',
{},
async () => await connect(transactorUrl, workspaceId[0], undefined, { mode: 'backup' })
)) as BackupClient & Client)
: undefined

if (monitorConnection !== undefined) {
// We need to cleanup all transactions from our benchmark account.
const txes = await monitorConnection.findAll(
core.class.Tx,
{ modifiedBy: benchAccount },
{ projection: { _id: 1 } }
)
await monitorConnection.clean(DOMAIN_TX, Array.from(txes.map((it) => it._id)))
}

let running = false

let timer: any
if (isMainThread) {
timer = setInterval(() => {
const st = Date.now()

void fetch(transactorUrl.replace('ws:/', 'http:/') + '/' + token).then((res) => {
void res.json().then((json) => {
memUsed = json.statistics.memoryUsed
memTotal = json.statistics.memoryTotal
cpu = json.statistics.cpuUsage
const r = json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.call
operations = r?.operations ?? 0
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
transfer =
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.send?.measurements?.[
'#send-data'
]?.value ?? 0
})
})
try {
void fetch(transactorUrl.replace('ws:/', 'http:/') + '/' + token)
.then((res) => {
void res
.json()
.then((json) => {
memUsed = json.statistics.memoryUsed
memTotal = json.statistics.memoryTotal
cpu = json.statistics.cpuUsage
const r =
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.call?.measurements?.[
'find-all'
]
operations = r?.operations ?? 0
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
transfer =
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.['#send-data']
?.value ?? 0
})
.catch((err) => console.log(err))
})
.catch((err) => console.log(err))
} catch (err) {
console.log(err)
}

if (!running) {
running = true
Expand Down Expand Up @@ -189,7 +232,7 @@ export async function benchmark (
},
true
)
void csvWriter.write('report.csv')
void csvWriter.write(`report${cmd.binary ? '-bin' : ''}${cmd.write ? '-wr' : ''}.csv`)
}, 1000)
}
for (let i = cmd.from; i < cmd.from + cmd.steps; i++) {
Expand All @@ -213,8 +256,11 @@ export async function benchmark (
min: 10,
rand: 1000
},
sleep: cmd.sleep
}
sleep: cmd.sleep,
write: cmd.write
},
binary: cmd.binary,
compression: cmd.compression
}
workers[i % workers.length].postMessage(msg)

Expand Down Expand Up @@ -247,15 +293,15 @@ if (!isMainThread) {
})
}

const connectLimitter = new RateLimitter(() => ({ rate: 50 }))

async function perform (msg: StartMessage): Promise<void> {
let connection: Client | undefined
try {
setMetadata(client.metadata.UseBinaryProtocol, msg.binary)
setMetadata(client.metadata.UseProtocolCompression, msg.compression)
console.log('connecting to', msg.workspaceId)

connection = await connectLimitter.exec(async () => await connect(msg.transactorUrl, msg.workspaceId, undefined))

connection = await connect(msg.transactorUrl, msg.workspaceId, undefined)
const opt = new TxOperations(connection, (core.account.System + '_benchmark') as Ref<Account>)
parentPort?.postMessage({
type: 'operate',
workId: msg.workId
Expand All @@ -269,19 +315,46 @@ async function perform (msg: StartMessage): Promise<void> {
await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } })
msg.options.modelRequests--
}
let doc: Doc | undefined
if (msg.options.readRequests > 0) {
const cl = classes[randNum(classes.length - 1)]
if (cl !== undefined) {
await connection?.findAll(
const docs = await connection?.findAll(
cl._id,
{},
{
sort: { _id: -1 },
limit: msg.options.limit.min + randNum(msg.options.limit.rand)
}
)
if (docs.length > 0) {
doc = docs[randNum(docs.length - 1)]
}
msg.options.readRequests--
}
if (msg.options.write && doc !== undefined) {
const attrs = connection.getHierarchy().getAllAttributes(doc._class)
const upd: DocumentUpdate<Doc> = {}
for (const [key, value] of attrs.entries()) {
if (value.type._class === core.class.TypeString || value.type._class === core.class.TypeBoolean) {
if (
key !== '_id' &&
key !== '_class' &&
key !== 'space' &&
key !== 'attachedTo' &&
key !== 'attachedToClass'
) {
const v = (doc as any)[key]
if (v != null) {
;(upd as any)[key] = v
}
}
}
}
if (Object.keys(upd).length > 0) {
await opt.update(doc, upd)
}
}
}
if (msg.options.sleep > 0) {
await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep)))
Expand Down
Loading

0 comments on commit 84b7df4

Please sign in to comment.