From 60e65d533607cab83a44d4fb291348e2d03d193b Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 26 Jul 2024 15:30:51 +0100 Subject: [PATCH 1/7] wip: refactor and optimize a bit --- src/components/core/utils/statusHandler.ts | 87 +++++++++++++--------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index adf29ac4b..d1d75a5a0 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -4,7 +4,8 @@ import { OceanNodeStatus, OceanNodeProvider, OceanNodeIndexer, - StorageTypes + StorageTypes, + OceanNodeConfig } from '../../../@types/OceanNode.js' import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js' import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js' @@ -13,6 +14,44 @@ import { OceanNode } from '../../../OceanNode.js' import { isAddress } from 'ethers' import { schemas } from '../../database/schemas.js' +function getAdminAddresses(config: OceanNodeConfig) { + const validAddresses = [] + if (config.allowedAdmins) { + for (const admin of config.allowedAdmins) { + if (isAddress(admin) === true) { + validAddresses.push(admin) + } + } + if (validAddresses.length === 0) { + CORE_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Invalid format for ETH address from ALLOWED ADMINS.` + ) + } + } + return validAddresses +} +const supportedStorageTypes: StorageTypes = { + url: true, + arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), + ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) +} + +// platform information +const platformInfo = { + cpus: os.cpus().length, + freemem: os.freemem(), + totalmem: os.totalmem(), + loadavg: os.loadavg(), + arch: os.arch(), + machine: os.machine(), + platform: os.platform(), + osType: os.type(), + node: process.version +} + +let previousStatus: OceanNodeStatus = null + export async function status( oceanNode: OceanNode, nodeId?: string, @@ -30,20 +69,6 @@ export async function status( } const config = await getConfiguration() - const validAddresses = [] - if (config.allowedAdmins) { - for (const admin of config.allowedAdmins) { - if (isAddress(admin) === true) { - validAddresses.push(admin) - } - } - if (validAddresses.length === 0) { - CORE_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Invalid format for ETH address from ALLOWED ADMINS.` - ) - } - } const status: OceanNodeStatus = { id: undefined, publicKey: undefined, @@ -53,23 +78,17 @@ export async function status( p2p: undefined, provider: [], indexer: [], - supportedStorage: undefined, + supportedStorage: supportedStorageTypes, uptime: process.uptime(), - platform: { - cpus: os.cpus().length, - freemem: os.freemem(), - totalmem: os.totalmem(), - loadavg: os.loadavg(), - arch: os.arch(), - machine: os.machine(), - platform: os.platform(), - osType: os.type(), - node: process.version - }, + platform: platformInfo, codeHash: config.codeHash, - allowedAdmins: validAddresses + allowedAdmins: getAdminAddresses(config) } + // only these 2 might change between requests + status.platform.freemem = os.freemem() + status.platform.loadavg = os.loadavg() + if (nodeId && nodeId !== undefined) { status.id = nodeId } else { @@ -77,18 +96,12 @@ export async function status( status.id = config.keys.peerId.toString() } - const supportedStorageTypes: StorageTypes = { - url: true, - arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), - ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) - } - status.version = process.env.npm_package_version status.publicKey = Buffer.from(config.keys.publicKey).toString('hex') status.address = config.keys.ethAddress status.http = config.hasHttp status.p2p = config.hasP2P - status.supportedStorage = supportedStorageTypes + // status.supportedStorage = supportedStorageTypes if (config.supportedNetworks) { for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { @@ -127,5 +140,9 @@ export async function status( status.c2dClusters = config.c2dClusters status.supportedSchemas = schemas.ddoSchemas } + + if (!previousStatus) { + previousStatus = status + } return status } From 71e00b64abe12e46cadd55188e65804c2e68a90b Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Mon, 29 Jul 2024 10:42:21 +0100 Subject: [PATCH 2/7] refactor get status, cache data + only fetch data that changes --- src/components/core/utils/statusHandler.ts | 148 +++++++++++---------- 1 file changed, 79 insertions(+), 69 deletions(-) diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index d1d75a5a0..f448e1b64 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -13,6 +13,7 @@ import { CORE_LOGGER } from '../../../utils/logging/common.js' import { OceanNode } from '../../../OceanNode.js' import { isAddress } from 'ethers' import { schemas } from '../../database/schemas.js' +import { SupportedNetwork } from '../../../@types/blockchain.js' function getAdminAddresses(config: OceanNodeConfig) { const validAddresses = [] @@ -50,7 +51,54 @@ const platformInfo = { node: process.version } -let previousStatus: OceanNodeStatus = null +async function getIndexerAndProviderInfo( + oceanNode: OceanNode, + config: OceanNodeConfig +): Promise { + const nodeStatus: any = { + provider: [], + indexer: [] + } + for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { + if (config.hasProvider) { + const provider: OceanNodeProvider = { + chainId: key, + network: supportedNetwork.network + } + nodeStatus.provider.push(provider) + } + if (config.hasIndexer) { + const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork) + const indexer: OceanNodeIndexer = { + chainId: key, + network: supportedNetwork.network, + block: blockNr + } + nodeStatus.indexer.push(indexer) + } + } + return nodeStatus +} + +async function getIndexerBlockInfo( + oceanNode: OceanNode, + supportedNetwork: SupportedNetwork +): Promise { + let blockNr = '0' + try { + const { indexer: indexerDatabase } = oceanNode.getDatabase() + const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId) + blockNr = lastIndexedBlock.toString() + } catch (error) { + CORE_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error fetching last indexed block for network ${supportedNetwork.network}` + ) + } + return blockNr +} + +let nodeStatus: OceanNodeStatus = null export async function status( oceanNode: OceanNode, @@ -69,80 +117,42 @@ export async function status( } const config = await getConfiguration() - const status: OceanNodeStatus = { - id: undefined, - publicKey: undefined, - address: undefined, - version: undefined, - http: undefined, - p2p: undefined, - provider: [], - indexer: [], - supportedStorage: supportedStorageTypes, - uptime: process.uptime(), - platform: platformInfo, - codeHash: config.codeHash, - allowedAdmins: getAdminAddresses(config) - } - - // only these 2 might change between requests - status.platform.freemem = os.freemem() - status.platform.loadavg = os.loadavg() - - if (nodeId && nodeId !== undefined) { - status.id = nodeId - } else { - // get current node ID - status.id = config.keys.peerId.toString() + // no previous status? + if (!nodeStatus) { + nodeStatus = { + id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID + publicKey: Buffer.from(config.keys.publicKey).toString('hex'), + address: config.keys.ethAddress, + version: process.env.npm_package_version, + http: config.hasHttp, + p2p: config.hasP2P, + provider: [], + indexer: [], + supportedStorage: supportedStorageTypes, + // uptime: process.uptime(), + platform: platformInfo, + codeHash: config.codeHash, + allowedAdmins: getAdminAddresses(config) + } } - status.version = process.env.npm_package_version - status.publicKey = Buffer.from(config.keys.publicKey).toString('hex') - status.address = config.keys.ethAddress - status.http = config.hasHttp - status.p2p = config.hasP2P - // status.supportedStorage = supportedStorageTypes - + // need to update at least block info if available if (config.supportedNetworks) { - for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { - if (config.hasProvider) { - const provider: OceanNodeProvider = { - chainId: key, - network: supportedNetwork.network - } - status.provider.push(provider) - } - if (config.hasIndexer) { - let blockNr = '0' - try { - const { indexer: indexerDatabase } = oceanNode.getDatabase() - const { lastIndexedBlock } = await indexerDatabase.retrieve( - supportedNetwork.chainId - ) - blockNr = lastIndexedBlock.toString() - } catch (error) { - CORE_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Error fetching last indexed block for network ${supportedNetwork.network}` - ) - } - const indexer: OceanNodeIndexer = { - chainId: key, - network: supportedNetwork.network, - block: blockNr - } - status.indexer.push(indexer) - } - } + const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config) + nodeStatus.provider = indexerAndProvider.provider + nodeStatus.indexer = indexerAndProvider.indexer } + // only these 2 might change between requests + nodeStatus.platform.freemem = os.freemem() + nodeStatus.platform.loadavg = os.loadavg() + nodeStatus.uptime = process.uptime() + + // depends on request if (detailed) { - status.c2dClusters = config.c2dClusters - status.supportedSchemas = schemas.ddoSchemas + nodeStatus.c2dClusters = config.c2dClusters + nodeStatus.supportedSchemas = schemas.ddoSchemas } - if (!previousStatus) { - previousStatus = status - } - return status + return nodeStatus } From c86b4f00be45d2e7fa2b8554a1be59c0c7d9e0ef Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Mon, 29 Jul 2024 18:36:28 +0100 Subject: [PATCH 3/7] small update, check lenght of array, as defaults to empty --- src/components/core/utils/statusHandler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index f448e1b64..9758503d9 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -17,7 +17,7 @@ import { SupportedNetwork } from '../../../@types/blockchain.js' function getAdminAddresses(config: OceanNodeConfig) { const validAddresses = [] - if (config.allowedAdmins) { + if (config.allowedAdmins && config.allowedAdmins.length > 0) { for (const admin of config.allowedAdmins) { if (isAddress(admin) === true) { validAddresses.push(admin) From b69c13bc551da7237acb0a3404ff76b5a738c012 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Mon, 29 Jul 2024 18:40:43 +0100 Subject: [PATCH 4/7] revert changes from another PR --- dashboard/src/components/IndexQueue.tsx | 51 +++++++------------------ 1 file changed, 13 insertions(+), 38 deletions(-) diff --git a/dashboard/src/components/IndexQueue.tsx b/dashboard/src/components/IndexQueue.tsx index 15080f021..625289047 100644 --- a/dashboard/src/components/IndexQueue.tsx +++ b/dashboard/src/components/IndexQueue.tsx @@ -9,7 +9,6 @@ import { } from '@mui/material' import styles from './Dashboard/index.module.css' import { useAdminContext } from '@/context/AdminProvider' -import Alert from '@mui/material/Alert' interface QueueItem { txId: string @@ -20,32 +19,21 @@ interface QueueItem { export default function IndexQueue() { const [queue, setQueue] = useState([]) const { networks } = useAdminContext() - const [avoidAskQueue, setAvoidAskQueue] = useState(false) - let intervalId: any = null useEffect(() => { const fetchQueue = () => { fetch('/api/services/indexQueue') - .then((response) => { - if (response.status === 400) { - console.warn('Cannot fetch queue: Node is not running Indexer') - setAvoidAskQueue(true) - if (intervalId) { - clearInterval(intervalId) // Stop doing this, there is no point, since we don't have Indexer + .then((response) => response.json()) + .then((data) => { + const transformedQueue = data.queue.map((item: any) => { + const network = networks.find((net) => net.chainId === item.chainId) + return { + txId: item.txId, + chainId: item.chainId, + chain: network ? network.network : 'Unknown Network' } - } else { - response.json().then((data) => { - const transformedQueue = data.queue.map((item: any) => { - const network = networks.find((net) => net.chainId === item.chainId) - return { - txId: item.txId, - chainId: item.chainId, - chain: network ? network.network : 'Unknown Network' - } - }) - setQueue(transformedQueue) - }) - } + }) + setQueue(transformedQueue) }) .catch((error) => { console.error('Error fetching queue:', error) @@ -53,16 +41,14 @@ export default function IndexQueue() { } fetchQueue() // Initial fetch - let pollingInterval = 10000 // Default polling interval (10 seconds) + let pollingInterval = 2000 // Default polling interval if (process.env.INDEXER_INTERVAL) { pollingInterval = Number(process.env.INDEXER_INTERVAL) } - intervalId = setInterval(fetchQueue, pollingInterval) + const intervalId = setInterval(fetchQueue, pollingInterval) return () => { - if (intervalId) { - clearInterval(intervalId) // Clear interval on component unmount - } + clearInterval(intervalId) // Clear interval on component unmount } }, []) @@ -102,17 +88,6 @@ export default function IndexQueue() { ) : (

Indexing queue is empty.

)} - {avoidAskQueue && ( - { - setAvoidAskQueue(false) - }} - > - Node is not running Indexer. No need to get queue at this point! - - )} ) } From 789cd365d60369887f7e145f6157c5d33010b3a4 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Mon, 29 Jul 2024 18:49:50 +0100 Subject: [PATCH 5/7] fix test randomly fails --- src/test/integration/logs.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/integration/logs.test.ts b/src/test/integration/logs.test.ts index 577b784fe..c187caff5 100644 --- a/src/test/integration/logs.test.ts +++ b/src/test/integration/logs.test.ts @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => { const endTime = new Date() // current time // Retrieve the latest log entries - let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100) + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200) logs = logs.filter((log) => log.message === newLogEntry.message) expect(logs?.length).to.equal(1) From e7e0bc7aa8ac454e3d8b9b5ef8a713381038cfc1 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Tue, 30 Jul 2024 10:25:25 +0100 Subject: [PATCH 6/7] fix issue with p2p task --- src/components/P2P/handleProtocolCommands.ts | 33 +++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 3043253e8..b524a3214 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -38,15 +38,22 @@ export async function handleProtocolCommands(connection: any) { let statusStream let sendStream = null + const buildWrongCommandStatus = function (errorCode: number, message: string) { + status = { + httpStatus: errorCode, + error: message + } + return status + } + const denyList = await (await getConfiguration()).denyList if (denyList.peers.length > 0) { if (denyList.peers.includes(remotePeer.toString())) { P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`) - status = { - httpStatus: 403, - error: 'Unauthorized request' - } - statusStream = new ReadableString(JSON.stringify(status)) + + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request')) + ) pipe(statusStream, connection.stream.sink) return } @@ -58,14 +65,24 @@ export async function handleProtocolCommands(connection: any) { const str = uint8ArrayToString(chunk.subarray()) task = JSON.parse(str) as Command } catch (e) { - status = { httpStatus: 400, error: 'Invalid command' } - statusStream = new ReadableString(JSON.stringify(status)) + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) pipe(statusStream, connection.stream.sink) return } break } - P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true) + if (!task) { + P2P_LOGGER.error('Invalid or missing task/command data!') + status = { httpStatus: 400, error: 'Invalid command' } + statusStream = new ReadableString(JSON.stringify(status)) + pipe(statusStream, connection.stream.sink) + return + } else { + P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true) + } + // we get the handler from the running instance // no need to create a new instance of Handler on every request const handler: Handler = this.getCoreHandlers().getHandler(task.command) From 048237aa32382eb51b5967447c88c246f9626592 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Tue, 30 Jul 2024 10:27:57 +0100 Subject: [PATCH 7/7] use helper function --- src/components/P2P/handleProtocolCommands.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index b524a3214..afcd5b57c 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -75,8 +75,9 @@ export async function handleProtocolCommands(connection: any) { } if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') - status = { httpStatus: 400, error: 'Invalid command' } - statusStream = new ReadableString(JSON.stringify(status)) + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) pipe(statusStream, connection.stream.sink) return } else {