Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nodesGetAll handler #611

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions src/client/handlers/NodesGetAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,18 @@ class NodesGetAll extends ServerHandler<
): AsyncGenerator<ClientRPCResponseResult<NodesGetMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { nodeGraph, keyRing } = this.container;
for await (const bucket of nodeGraph.getBuckets()) {
let index;
for (const id of Object.keys(bucket)) {
const encodedId = nodesUtils.encodeNodeId(
IdInternal.fromString<NodeId>(id),
);
for await (const [index, bucket] of nodeGraph.getBuckets()) {
for (const [id, info] of bucket) {
const encodedId = nodesUtils.encodeNodeId(id);
// For every node in every bucket, add it to our message
if (!index) {
index = nodesUtils.bucketIndex(
keyRing.getNodeId(),
IdInternal.fromString<NodeId>(id),
);
if (ctx.signal.aborted) {
throw ctx.signal.reason;
}
if (ctx.signal.aborted) throw ctx.signal.reason;
yield {
bucketIndex: index,
nodeIdEncoded: encodedId,
host: bucket[id].address.host,
port: bucket[id].address.port,
host: info.address.host,
port: info.address.port,
};
}
}
Expand Down
314 changes: 312 additions & 2 deletions tests/client/handlers/nodes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type GestaltGraph from '@/gestalts/GestaltGraph';
import type { NodeIdEncoded } from '@/ids/types';
import type { TLSConfig, Host, Port } from '@/network/types';
import type { Notification } from '@/notifications/types';
import type { NodeAddress } from '@/nodes/types';
import fs from 'fs';
import path from 'path';
import os from 'os';
Expand All @@ -18,12 +19,27 @@ import Sigchain from '@/sigchain/Sigchain';
import NotificationsManager from '@/notifications/NotificationsManager';
import NodeConnectionManager from '@/nodes/NodeConnectionManager';
import ClientService from '@/client/ClientService';
import { NodesAdd, NodesClaim, NodesFind, NodesPing } from '@/client/handlers';
import { nodesAdd, nodesClaim, nodesFind, nodesPing } from '@/client/callers';
import {
NodesAdd,
NodesClaim,
NodesFind,
NodesPing,
NodesGetAll,
NodesListConnections,
} from '@/client/handlers';
import {
nodesAdd,
nodesClaim,
nodesFind,
nodesPing,
nodesGetAll,
nodesListConnections,
} from '@/client/callers';
import * as keysUtils from '@/keys/utils';
import * as nodesUtils from '@/nodes/utils';
import * as networkUtils from '@/network/utils';
import * as validationErrors from '@/validation/errors';
import { parseNodeId } from '@/ids';
import * as testsUtils from '../../utils';

describe('nodesAdd', () => {
Expand Down Expand Up @@ -680,3 +696,297 @@ describe('nodesPing', () => {
);
});
});
describe('nodesGetAll', () => {
const logger = new Logger('nodesGetAll test', LogLevel.WARN, [
new StreamHandler(
formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
),
]);
const password = 'helloWorld';
const localhost = '127.0.0.1';
let dataDir: string;
let db: DB;
let keyRing: KeyRing;
let tlsConfig: TLSConfig;
let clientService: ClientService;
let webSocketClient: WebSocketClient;
let rpcClient: RPCClient<{
nodesGetAll: typeof nodesGetAll;
}>;
let nodeGraph: NodeGraph;
let taskManager: TaskManager;
let nodeConnectionManager: NodeConnectionManager;
let nodeManager: NodeManager;
let sigchain: Sigchain;
beforeEach(async () => {
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
const keysPath = path.join(dataDir, 'keys');
keyRing = await KeyRing.createKeyRing({
password,
keysPath,
passwordOpsLimit: keysUtils.passwordOpsLimits.min,
passwordMemLimit: keysUtils.passwordMemLimits.min,
strictMemoryLock: false,
logger,
});
tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair);
const dbPath = path.join(dataDir, 'db');
db = await DB.createDB({
dbPath,
logger,
});
sigchain = await Sigchain.createSigchain({
db,
keyRing,
logger,
});
nodeGraph = await NodeGraph.createNodeGraph({
db,
keyRing,
logger: logger.getChild('NodeGraph'),
});
taskManager = await TaskManager.createTaskManager({
db,
logger,
lazy: true,
});
nodeConnectionManager = new NodeConnectionManager({
keyRing,
nodeGraph,
// TLS not needed for this test
tlsConfig: {} as TLSConfig,
connectionConnectTimeoutTime: 2000,
connectionIdleTimeoutTime: 2000,
logger: logger.getChild('NodeConnectionManager'),
});
nodeManager = new NodeManager({
db,
keyRing,
nodeConnectionManager,
nodeGraph,
sigchain,
taskManager,
gestaltGraph: {} as GestaltGraph,
logger,
});
await nodeManager.start();
await nodeConnectionManager.start({ host: localhost as Host });
await taskManager.startProcessing();
clientService = new ClientService({
tlsConfig,
logger: logger.getChild(ClientService.name),
});
await clientService.start({
manifest: {
nodesGetAll: new NodesGetAll({
nodeGraph,
keyRing,
}),
},
host: localhost,
});
webSocketClient = await WebSocketClient.createWebSocketClient({
config: {
verifyPeer: false,
},
host: localhost,
logger: logger.getChild(WebSocketClient.name),
port: clientService.port,
});
rpcClient = new RPCClient({
manifest: {
nodesGetAll,
},
streamFactory: () => webSocketClient.connection.newStream(),
toError: networkUtils.toError,
logger: logger.getChild(RPCClient.name),
});
});
afterEach(async () => {
await taskManager.stopProcessing();
await taskManager.stopTasks();
await clientService.stop({ force: true });
await webSocketClient.destroy({ force: true });
await sigchain.stop();
await nodeGraph.stop();
await nodeConnectionManager.stop();
await db.stop();
await keyRing.stop();
await taskManager.stop();
await fs.promises.rm(dataDir, {
force: true,
recursive: true,
});
});
test('gets all nodes', async () => {
await nodeManager.setNode(
parseNodeId(
'vrsc24a1er424epq77dtoveo93meij0pc8ig4uvs9jbeld78n9nl0' as NodeIdEncoded,
),
{
host: networkUtils.parseHostOrHostname('127.0.0.1'),
port: networkUtils.parsePort(1111),
} as NodeAddress,
);
const values: Array<any> = [];
const response = await rpcClient.methods.nodesGetAll({});
for await (const respons of response) {
values.push(respons);
}
expect(values[0].nodeIdEncoded).toEqual(
'vrsc24a1er424epq77dtoveo93meij0pc8ig4uvs9jbeld78n9nl0',
);
});
});
describe('nodesListConnections', () => {
const logger = new Logger('nodesConnections test', LogLevel.WARN, [
new StreamHandler(
formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
),
]);
const password = 'helloWorld';
const localhost = '127.0.0.1';
let dataDir: string;
let db: DB;
let keyRing: KeyRing;
let tlsConfig: TLSConfig;
let clientService: ClientService;
let webSocketClient: WebSocketClient;
let rpcClient: RPCClient<{
nodesListConnections: typeof nodesListConnections;
}>;
let nodeGraph: NodeGraph;
let taskManager: TaskManager;
let nodeConnectionManager: NodeConnectionManager;
let nodeManager: NodeManager;
let sigchain: Sigchain;
let mockedConnection: jest.SpyInstance;
beforeEach(async () => {
mockedConnection = jest.spyOn(
NodeConnectionManager.prototype,
'listConnections',
);
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
const keysPath = path.join(dataDir, 'keys');
keyRing = await KeyRing.createKeyRing({
password,
keysPath,
passwordOpsLimit: keysUtils.passwordOpsLimits.min,
passwordMemLimit: keysUtils.passwordMemLimits.min,
strictMemoryLock: false,
logger,
});
tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair);
const dbPath = path.join(dataDir, 'db');
db = await DB.createDB({
dbPath,
logger,
});
sigchain = await Sigchain.createSigchain({
db,
keyRing,
logger,
});
nodeGraph = await NodeGraph.createNodeGraph({
db,
keyRing,
logger: logger.getChild('NodeGraph'),
});
taskManager = await TaskManager.createTaskManager({
db,
logger,
lazy: true,
});
nodeConnectionManager = new NodeConnectionManager({
keyRing,
nodeGraph,
// TLS not needed for this test
tlsConfig: {} as TLSConfig,
connectionConnectTimeoutTime: 2000,
connectionIdleTimeoutTime: 2000,
logger: logger.getChild('NodeConnectionManager'),
});
nodeManager = new NodeManager({
db,
keyRing,
nodeConnectionManager,
nodeGraph,
sigchain,
taskManager,
gestaltGraph: {} as GestaltGraph,
logger,
});
await nodeManager.start();
await nodeConnectionManager.start({ host: localhost as Host });
await taskManager.startProcessing();
clientService = new ClientService({
tlsConfig,
logger: logger.getChild(ClientService.name),
});
await clientService.start({
manifest: {
nodesListConnections: new NodesListConnections({
nodeConnectionManager,
}),
},
host: localhost,
});
webSocketClient = await WebSocketClient.createWebSocketClient({
config: {
verifyPeer: false,
},
host: localhost,
logger: logger.getChild(WebSocketClient.name),
port: clientService.port,
});
rpcClient = new RPCClient({
manifest: {
nodesListConnections,
},
streamFactory: () => webSocketClient.connection.newStream(),
toError: networkUtils.toError,
logger: logger.getChild(RPCClient.name),
});
});
afterEach(async () => {
mockedConnection.mockRestore();
await taskManager.stopProcessing();
await taskManager.stopTasks();
await clientService.stop({ force: true });
await webSocketClient.destroy({ force: true });
await sigchain.stop();
await nodeGraph.stop();
await nodeConnectionManager.stop();
await db.stop();
await keyRing.stop();
await taskManager.stop();
await fs.promises.rm(dataDir, {
force: true,
recursive: true,
});
});
test('lists all connections', async () => {
mockedConnection.mockReturnValue([
{
nodeId: testsUtils.generateRandomNodeId(),
address: {
host: '127.0.0.1',
port: 11111,
hostname: undefined,
},
usageCount: 1,
timeout: undefined,
},
]);
const values: Array<any> = [];
const responses = await rpcClient.methods.nodesListConnections({});
for await (const response of responses) {
values.push(response);
}
expect(values[0].port).toEqual(11111);
});
});