Skip to content

Commit

Permalink
fix(connector-iroha): fix review comments and smaller issues
Browse files Browse the repository at this point in the history
- Fix issues spotted by petermetz in review of PR#2048
- Fix iroha functional tests.
- Close socketio connections when they are not needed anymore.
- Update verifier-client README.

Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH authored and petermetz committed Nov 19, 2022
1 parent 47da608 commit b2742e8
Show file tree
Hide file tree
Showing 14 changed files with 846 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import type { Observable } from "rxjs";
*/
export interface ISocketApiClient<BlockType> {
sendAsyncRequest?(
args: any,
method: Record<string, unknown>,
baseConfig?: any,
contract?: Record<string, unknown>,
method?: Record<string, unknown>,
args?: any,
baseConfig?: any,
): void;

sendSyncRequest?(
args: any,
method: Record<string, unknown>,
baseConfig?: any,
contract?: Record<string, unknown>,
method?: Record<string, unknown>,
args?: any,
baseConfig?: any,
): Promise<any>;

watchBlocksV1?(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,16 +983,14 @@
}
}
},
"IrohaSocketSessionEvent": {
"WatchBlocksV1": {
"type": "string",
"enum": [
"org.hyperledger.cactus.api.async.iroha.SocketSession.Subscribe",
"org.hyperledger.cactus.api.async.iroha.SocketSession.Next",
"org.hyperledger.cactus.api.async.iroha.SocketSession.Unsubscribe",
"org.hyperledger.cactus.api.async.iroha.SocketSession.Error",
"org.hyperledger.cactus.api.async.iroha.SocketSession.Complete",
"org.hyperledger.cactus.api.async.iroha.SocketSession.SendAsyncRequest",
"org.hyperledger.cactus.api.async.iroha.SocketSession.SendSyncRequest"
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Subscribe",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Next",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Unsubscribe",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Error",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Complete"
],
"x-enum-varnames": [
"Subscribe",
Expand All @@ -1004,6 +1002,17 @@
"SendSyncRequest"
]
},
"IrohaSocketIOTransactV1": {
"type": "string",
"enum": [
"org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendAsyncRequest",
"org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendSyncRequest"
],
"x-enum-varnames": [
"SendAsyncRequest",
"SendSyncRequest"
]
},
"IrohaBlockResponse": {
"type": "object",
"required": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,37 @@ import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common";
import { Constants, ISocketApiClient } from "@hyperledger/cactus-core-api";
import {
DefaultApi,
IrohaSocketSessionEvent,
WatchBlocksV1,
IrohaSocketIOTransactV1,
IrohaBlockProgress,
IrohaBaseConfig,
} from "../generated/openapi/typescript-axios";
import { Configuration } from "../generated/openapi/typescript-axios/configuration";
import {
Configuration,
ConfigurationParameters,
} from "../generated/openapi/typescript-axios/configuration";
import { RuntimeError } from "run-time-error";

export interface IrohaApiClientParameters extends ConfigurationParameters {
logLevel?: LogLevelDesc;
wsApiHost?: string;
wsApiPath?: string;
timeoutLimit?: number;
}

export class IrohaApiClientOptions extends Configuration {
readonly logLevel?: LogLevelDesc;
readonly wsApiHost?: string;
readonly wsApiPath?: string;
readonly timeoutLimit?: number;

constructor(param: IrohaApiClientParameters = {}) {
super(param);
this.logLevel = param.logLevel;
this.wsApiHost = param.wsApiHost;
this.wsApiPath = param.wsApiPath;
this.timeoutLimit = param.timeoutLimit;
}
}

export class IrohaApiClient
Expand Down Expand Up @@ -45,6 +65,7 @@ export class IrohaApiClient
this.log.debug(`wsApiHost=${this.wsApiHost}`);
this.log.debug(`wsApiPath=${this.wsApiPath}`);
this.log.debug(`basePath=${this.options.basePath}`);
this.log.debug(`timeoutLimit=${this.options.timeoutLimit}`);

Checks.nonBlankString(
this.wsApiHost,
Expand All @@ -67,13 +88,13 @@ export class IrohaApiClient
const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });
const subject = new ReplaySubject<IrohaBlockProgress>(0);
this.log.debug(monitorOptions);
socket.on(IrohaSocketSessionEvent.Next, (data: IrohaBlockProgress) => {
socket.on(WatchBlocksV1.Next, (data: IrohaBlockProgress) => {
subject.next(data);
});

socket.on("connect", () => {
this.log.debug("connected OK...");
socket.emit(IrohaSocketSessionEvent.Subscribe, monitorOptions);
socket.emit(WatchBlocksV1.Subscribe, monitorOptions);
});

socket.connect();
Expand All @@ -90,10 +111,16 @@ export class IrohaApiClient
subject.error(err);
});

socket.on("error", (err: unknown) => {
this.log.error("Error: ", err);
socket.disconnect();
subject.error(err);
});

return subject.pipe(
finalize(() => {
this.log.info("FINALIZE - unsubscribing from the stream...");
socket.emit(IrohaSocketSessionEvent.Unsubscribe);
socket.emit(WatchBlocksV1.Unsubscribe);
socket.disconnect();
}),
share(),
Expand All @@ -105,36 +132,31 @@ export class IrohaApiClient
* @param args - arguments.
* @param method - function / method to be executed by validator.
* @param baseConfig - baseConfig needed to properly connect to ledger
*/
public sendAsyncRequest(
args: any,
method: Record<string, unknown>,
args: any,
baseConfig?: IrohaBaseConfig,
): void {
this.log.debug(`inside: sendAsyncRequest()`);
this.log.debug(`baseConfig=${baseConfig}`);
this.log.debug(`methodName=${method.methodName}`);
this.log.debug(`args=${args}`);

if (baseConfig === undefined || baseConfig === {}) {
if (!baseConfig) {
throw new RuntimeError("baseConfig object must exist and not be empty");
}

if (
baseConfig.privKey === undefined ||
baseConfig.creatorAccountId === undefined ||
baseConfig.irohaHost === undefined ||
baseConfig.irohaPort === undefined ||
baseConfig.quorum === undefined ||
baseConfig.timeoutLimit === undefined
) {
throw new RuntimeError("Some fields in baseConfig are undefined");
}

if (method.methodName === undefined || method.methodName === "") {
throw new RuntimeError("methodName parameter must be specified");
}
Checks.truthy(baseConfig.privKey, "privKey in baseConfig");
Checks.truthy(
baseConfig.creatorAccountId,
"creatorAccountId in baseConfig",
);
Checks.truthy(baseConfig.irohaHost, "irohaHost in baseConfig");
Checks.truthy(baseConfig.irohaPort, "irohaPort in baseConfig");
Checks.truthy(baseConfig.quorum, "quorum in baseConfig");
Checks.nonBlankString(method.methodName, "methodName");

const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });
const asyncRequestData = {
Expand All @@ -146,7 +168,15 @@ export class IrohaApiClient
this.log.debug("requestData:", asyncRequestData);

try {
socket.emit(IrohaSocketSessionEvent.SendAsyncRequest, asyncRequestData);
socket.emit(IrohaSocketIOTransactV1.SendAsyncRequest, asyncRequestData);

// Connector should disconnect us after receiving this request.
// If he doesn't, disconnect after specified amount of time.
setTimeout(() => {
if (socket.connected) {
socket.disconnect();
}
}, this.options.timeoutLimit ?? 10 * 1000);
} catch (err) {
this.log.error("Exception in: sendAsyncRequest(): ", err);
throw err;
Expand All @@ -161,33 +191,28 @@ export class IrohaApiClient
* @returns Promise that will resolve with response from the ledger, or reject when error occurred.
*/
public sendSyncRequest(
args: any,
method: Record<string, unknown>,
args: any,
baseConfig?: IrohaBaseConfig,
): Promise<any> {
this.log.debug(`inside: sendSyncRequest()`);
this.log.debug(`baseConfig=${baseConfig}`);
this.log.debug(`method=${method}`);
this.log.debug(`args=${args}`);

if (baseConfig === undefined || baseConfig === {}) {
if (!baseConfig) {
throw new RuntimeError("baseConfig object must exist and not be empty");
}

if (
baseConfig.privKey === undefined ||
baseConfig.creatorAccountId === undefined ||
baseConfig.irohaHost === undefined ||
baseConfig.irohaPort === undefined ||
baseConfig.quorum === undefined ||
baseConfig.timeoutLimit === undefined
) {
throw new RuntimeError("Some fields in baseConfig are undefined");
}

if (method.methodName === undefined || method.methodName === "") {
throw new RuntimeError("methodName parameter must be specified");
}
Checks.truthy(baseConfig.privKey, "privKey in baseConfig");
Checks.truthy(
baseConfig.creatorAccountId,
"creatorAccountId in baseConfig",
);
Checks.truthy(baseConfig.irohaHost, "irohaHost in baseConfig");
Checks.truthy(baseConfig.irohaPort, "irohaPort in baseConfig");
Checks.truthy(baseConfig.quorum, "quorum in baseConfig");
Checks.nonBlankString(method.methodName, "methodName");

const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });

Expand All @@ -207,14 +232,20 @@ export class IrohaApiClient
reject(err);
});

socket.on("error", (err: unknown) => {
socket.disconnect();
reject(err);
});

socket.on("response", (result: any) => {
this.log.debug("#[recv]response, res:", result);
responseFlag = true;
this.log.debug("#[recv]response, res:", result);
const resultObj = {
status: result.status,
data: result.txHash,
};
this.log.debug("resultObj =", resultObj);
socket.disconnect();
resolve(resultObj);
});

Expand All @@ -227,17 +258,18 @@ export class IrohaApiClient
this.log.debug("requestData:", syncRequestData);

try {
socket.emit(IrohaSocketSessionEvent.SendSyncRequest, syncRequestData);
socket.emit(IrohaSocketIOTransactV1.SendSyncRequest, syncRequestData);
} catch (err) {
this.log.error("Exception in: sendAsyncRequest(): ", err);
throw err;
}

setTimeout(() => {
if (responseFlag === false) {
socket.disconnect();
resolve({ status: 504 });
}
}, baseConfig.timeoutLimit);
}, this.options.timeoutLimit);
} catch (err) {
this.log.error("Exception in: sendSyncRequest(): ", err);
reject(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,14 +950,9 @@ export enum IrohaQuery {
* @enum {string}
*/

export enum IrohaSocketSessionEvent {
Subscribe = 'org.hyperledger.cactus.api.async.iroha.SocketSession.Subscribe',
Next = 'org.hyperledger.cactus.api.async.iroha.SocketSession.Next',
Unsubscribe = 'org.hyperledger.cactus.api.async.iroha.SocketSession.Unsubscribe',
Error = 'org.hyperledger.cactus.api.async.iroha.SocketSession.Error',
Complete = 'org.hyperledger.cactus.api.async.iroha.SocketSession.Complete',
SendAsyncRequest = 'org.hyperledger.cactus.api.async.iroha.SocketSession.SendAsyncRequest',
SendSyncRequest = 'org.hyperledger.cactus.api.async.iroha.SocketSession.SendSyncRequest'
export enum IrohaSocketIOTransactV1 {
SendAsyncRequest = 'org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendAsyncRequest',
SendSyncRequest = 'org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendSyncRequest'
}

/**
Expand Down Expand Up @@ -1187,6 +1182,20 @@ export interface TransferAssetRequestParameters {
*/
amount: number;
}
/**
*
* @export
* @enum {string}
*/

export enum WatchBlocksV1 {
Subscribe = 'org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Subscribe',
Next = 'org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Next',
Unsubscribe = 'org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Unsubscribe',
Error = 'org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Error',
Complete = 'org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Complete'
}


/**
* DefaultApi - axios parameter creator
Expand Down
Loading

0 comments on commit b2742e8

Please sign in to comment.