Skip to content

Commit

Permalink
test(verifier-client): add stress test and fix a memory leak
Browse files Browse the repository at this point in the history
- Add a new package `@hyperledger/cactus-test-verifier-client` for verifier-client stress and
  functional tests that involve multiple packages.
- Add stress test for verifier-client that reports memory usage of repeated operation
  on `cactus-plugin-ledger-connector-go-ethereum-socketio` connector plugin.
- Fix a memory leak in `SocketIOApiClient` - free socket listeners when they are no longer needed.
- Fix `cactus-plugin-ledger-connector-go-ethereum-socketio` - use single a web3 connection
  (with keep-alive/reconnect) instead of spawning new one for each request.
  Previous solution was causing connection issues in stress testing.

Depends on: #2089
Closes: #2189

Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH authored and petermetz committed Jan 17, 2023
1 parent 25f2f54 commit b0eb921
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 44 deletions.
118 changes: 84 additions & 34 deletions packages/cactus-api-client/src/main/typescript/socketio-api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
method: Record<string, unknown>,
args: any,
): Promise<any> {
let timeout: ReturnType<typeof setTimeout> | undefined;
// `Function` is used by socketio `socket.off()` method
// eslint-disable-next-line @typescript-eslint/ban-types
const freeableListeners = new Map<string, Function>();

return new Promise((resolve, reject) => {
this.log.debug("call : sendSyncRequest");

Expand All @@ -213,22 +218,31 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
const reqID = this.genarateReqID();
this.log.debug(`##sendSyncRequest, reqID = ${reqID}`);

this.socket.on("connect_error", (err: Error) => {
const connectErrorHandler = (err: Error) => {
this.log.error("##connect_error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
};
this.socket.on("connect_error", connectErrorHandler);
freeableListeners.set("connect_error", connectErrorHandler);

const connectTimeoutHandler = (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("error", (err: Record<string, unknown>) => {
};
this.socket.on("connect_timeout", connectTimeoutHandler);
freeableListeners.set("connect_timeout", connectTimeoutHandler);

const errorHandler = (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("response", (result: any) => {
};
this.socket.on("error", errorHandler);
freeableListeners.set("error", errorHandler);

const responseHandler = (result: any) => {
this.log.debug("#[recv]response, res:", result);
if (reqID === result.id) {
responseFlag = true;
Expand All @@ -253,12 +267,17 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
})
.catch((err) => {
responseFlag = false;
this.log.debug("checkValidator error:", err);
this.log.error(err);
this.log.error("checkValidator error:", err);
reject({
status: 504,
error: err,
});
});
}
}
});
};
this.socket.on("response", responseHandler);
freeableListeners.set("response", responseHandler);

// Call Validator
const requestData = {
Expand All @@ -275,7 +294,7 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
const timeoutMilliseconds =
this.options.syncFunctionTimeoutMillisecond ||
defaultSyncFunctionTimeoutMillisecond;
setTimeout(() => {
timeout = setTimeout(() => {
if (responseFlag === false) {
this.log.debug("requestTimeout reqID:", reqID);
resolve({ status: 504 });
Expand All @@ -285,6 +304,13 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
this.log.error("##Error: sendSyncRequest:", err);
reject(err);
}
}).finally(() => {
freeableListeners.forEach((listener, eventName) =>
this.socket.off(eventName, listener),
);
if (timeout) {
clearTimeout(timeout);
}
});
}

Expand All @@ -307,6 +333,14 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
} else {
this.log.debug("Create new observable subject...");

// `Function` is used by socketio `socket.off()` method
// eslint-disable-next-line @typescript-eslint/ban-types
const freeableListeners = new Map<string, Function>();
const freeListeners = () =>
freeableListeners.forEach((listener, eventName) =>
this.socket.off(eventName, listener),
);

this.monitorSubject = new ReplaySubject<SocketLedgerEvent>(0);

this.log.debug("call : startMonitor");
Expand All @@ -315,38 +349,46 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
`##in startMonitor, validatorUrl = ${this.options.validatorURL}`,
);

this.socket.on("connect_error", (err: Error) => {
const connectErrorHandler = (err: Error) => {
this.log.error("##connect_error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});
};
this.socket.on("connect_error", connectErrorHandler);
freeableListeners.set("connect_error", connectErrorHandler);

this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
const connectTimeoutHandler = (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});
};
this.socket.on("connect_timeout", connectTimeoutHandler);
freeableListeners.set("connect_timeout", connectTimeoutHandler);

this.socket.on("error", (err: Record<string, unknown>) => {
const errorHandler = (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});
};
this.socket.on("error", errorHandler);
freeableListeners.set("error", errorHandler);

this.socket.on("monitor_error", (err: Record<string, unknown>) => {
const monitorErrorHandler = (err: Record<string, unknown>) => {
this.log.error("#### Monitor Error:", err);
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});
};
this.socket.on("monitor_error", monitorErrorHandler);
freeableListeners.set("monitor_error", monitorErrorHandler);

this.socket.on("eventReceived", (res: any) => {
const eventReceivedHandler = (res: any) => {
// output the data received from the client
this.log.debug("#[recv]eventReceived, res:", res);

Expand All @@ -364,7 +406,9 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
.catch((err) => {
this.log.error(err);
});
});
};
this.socket.on("eventReceived", eventReceivedHandler);
freeableListeners.set("eventReceived", eventReceivedHandler);

const emitStartMonitor = () => {
this.log.debug("##emit: startMonitor");
Expand All @@ -378,27 +422,33 @@ export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
if (this.socket.connected) {
emitStartMonitor();
} else {
this.socket.on("connect", () => {
const connectHandler = () => {
this.log.debug("#connect");
emitStartMonitor();
});
};
this.socket.on("connect", connectHandler);
freeableListeners.set("connect", connectHandler);
}

return this.monitorSubject.pipe(
finalize(() => {
if (this.monitorSubject && !this.monitorSubject.observed) {
// Last observer finished
this.log.debug("##emit: stopMonitor");
this.socket.emit("stopMonitor");
freeListeners();
this.monitorSubject = undefined;
}
}),
);
} catch (err) {
this.log.error(`##Error: startMonitor, ${err}`);
freeListeners();
this.monitorSubject.error(err);
}

return this.monitorSubject.pipe(
finalize(() => {
if (this.monitorSubject && !this.monitorSubject.observed) {
// Last observer finished
this.log.debug("##emit: stopMonitor");
this.socket.emit("stopMonitor");
this.monitorSubject = undefined;
}
}),
);
}

return this.monitorSubject;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,13 @@ describe("SocketIOApiClient Tests", function () {
});

const verifyMock = jest.fn();
verifyMock.mockRejectedValue({ message: "mock verify error" });
const verifyError = { message: "mock verify error" };
verifyMock.mockRejectedValue(verifyError);
sut.checkValidator = verifyMock;

return expect(
sut.sendSyncRequest(reqContract, reqMethod, reqArgs),
).resolves.toEqual({ status: 504 }); // timeout
).rejects.toEqual({ status: 504, error: verifyError });
});

test("Process only requests with matching ID", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,41 @@ import Web3 from "web3";
import { AbiItem } from "web3-utils";
import { safeStringifyException } from "@hyperledger/cactus-common";

var WEB3_HTTP_PROVIDER_OPTIONS = {
keepAlive: true,
};

const WEB3_WS_PROVIDER_OPTIONS = {
// Enable auto reconnection
reconnect: {
auto: true,
delay: 3000, // ms
maxAttempts: 30,
onTimeout: false,
},
};

function getWeb3Provider(host: string) {
const hostUrl = new URL(host);

switch (hostUrl.protocol) {
case "http:":
case "https:":
return new Web3.providers.HttpProvider(host, WEB3_HTTP_PROVIDER_OPTIONS);
case "ws:":
return new Web3.providers.WebsocketProvider(
host,
WEB3_WS_PROVIDER_OPTIONS,
);
default:
throw new Error(
`Unknown host protocol ${hostUrl.protocol} in URL ${host}`,
);
}
}

const web3 = new Web3(getWeb3Provider(configRead("ledgerUrl")));

/*
* ServerPlugin
* Class definition for server plugins
Expand Down Expand Up @@ -95,7 +130,6 @@ export class ServerPlugin {

// Handling exceptions to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));
const balance = await web3.eth.getBalance(ethargs);
const amountVal = parseInt(balance, 10);
const retObj = {
Expand Down Expand Up @@ -178,7 +212,6 @@ export class ServerPlugin {

// Handle the exception once to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));
const res = web3.eth[sendFunction](sendArgs);

retObj = {
Expand Down Expand Up @@ -252,7 +285,6 @@ export class ServerPlugin {

// Handling exceptions to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));
const txnCount = await web3.eth.getTransactionCount(ethargs);
logger.info(`getNonce(): txnCount: ${txnCount}`);
const hexStr = web3.utils.toHex(txnCount);
Expand Down Expand Up @@ -324,7 +356,6 @@ export class ServerPlugin {

// Handling exceptions to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));
const hexStr = web3.utils.toHex(targetValue);
logger.info(`toHex(): hexStr: ${hexStr}`);
const result = {
Expand Down Expand Up @@ -396,7 +427,6 @@ export class ServerPlugin {
const serializedTx = funcParam["serializedTx"];
logger.info("serializedTx :" + serializedTx);

const web3 = new Web3(configRead("ledgerUrl"));
const res = await web3.eth.sendSignedTransaction(serializedTx);
const result = {
txid: res.transactionHash,
Expand Down Expand Up @@ -453,7 +483,6 @@ export class ServerPlugin {

// Handle the exception once to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));
const looseWeb3Eth = web3.eth as any;

const isSafeToCall =
Expand Down Expand Up @@ -521,12 +550,11 @@ export class ServerPlugin {

// Handle the exception once to absorb the difference of interest.
try {
const web3 = new Web3(configRead("ledgerUrl"));

const contract = new web3.eth.Contract(
args.contract.abi as AbiItem[],
args.contract.address,
);
(contract as any).setProvider(web3.currentProvider);

const isSafeToCall =
Object.prototype.hasOwnProperty.call(contract.methods, sendCommand) &&
Expand Down
35 changes: 35 additions & 0 deletions packages/cactus-test-verifier-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# `@hyperledger/cactus-test-verifier-client`

## Usage

### Stress test
- Used for manual leak analysis.
- The test will execute with exposed GC and disabled optimizations.
- Results will be written to `./integration-with-verifier-client-stress.log` for further analysis (in the `cwd` where you execute the command). Columns (in order):
- `rss`
- `heapTotal`
- `heapUsed`
- `external`
- `arrayBuffers`
- Report lines:
- Initial memory usage (before running the tests).
- Usage after running the stress test.
- Usage after freeing the verifier client.
- You can uncomment the `checkMemory` call in test file for more step-by-step report, but it will make the test run longer.
- To investigate with the node debugger use `stress-test-inspect` script.

``` bash
# Make sure the build was successful before executing these commands.

# Execute stress test
yarn run stress-test

# Execute stress test with debugger inspect break
yarn run stress-test-inspect
```

## FAQ

### **What is a dedicated test package for?**

This is a dedicated test package meaning that it verifies the integration between two packages that are somehow dependent on each other and therefore these tests cannot be added properly in the child package due to circular dependency issues and it would not be fitting to add it in the parent because the child package's tests should not be held by the parent as a matter of principle.
Loading

0 comments on commit b0eb921

Please sign in to comment.