diff --git a/src/@types/DDO/Metadata.ts b/src/@types/DDO/Metadata.ts index 1eb513558..9c6fb2b18 100644 --- a/src/@types/DDO/Metadata.ts +++ b/src/@types/DDO/Metadata.ts @@ -18,6 +18,11 @@ export interface MetadataAlgorithm { * @type {string} */ rawcode?: string + /** + * Format of the algorithm + * @type {string} + */ + format?: string /** * Object describing the Docker container image. diff --git a/src/components/Indexer/utils.ts b/src/components/Indexer/utils.ts index 5b3d53215..b50daefc8 100644 --- a/src/components/Indexer/utils.ts +++ b/src/components/Indexer/utils.ts @@ -181,7 +181,7 @@ export const processChunkLogs = async ( if (!allowed.length) { INDEXER_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Metadata Proof validator not allowed`, + `Metadata Proof validators list is empty`, true ) continue diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 3043253e8..485e40a5d 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -38,34 +38,58 @@ export async function handleProtocolCommands(connection: any) { let statusStream let sendStream = null + const buildWrongCommandStatus = function (errorCode: number, message: string) { + status = { + httpStatus: errorCode, + error: message + } + return status + } + const denyList = await (await getConfiguration()).denyList if (denyList.peers.length > 0) { if (denyList.peers.includes(remotePeer.toString())) { P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`) - status = { - httpStatus: 403, - error: 'Unauthorized request' - } - statusStream = new ReadableString(JSON.stringify(status)) + + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request')) + ) pipe(statusStream, connection.stream.sink) return } } - /* eslint no-unreachable-loop: ["error", { "ignore": ["ForInStatement", "ForOfStatement"] }] */ - for await (const chunk of connection.stream.source) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - status = { httpStatus: 400, error: 'Invalid command' } - statusStream = new ReadableString(JSON.stringify(status)) + try { + // eslint-disable-next-line no-unreachable-loop + for await (const chunk of connection.stream.source) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) + pipe(statusStream, connection.stream.sink) + return + } + } + if (!task) { + P2P_LOGGER.error('Invalid or missing task/command data!') + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) pipe(statusStream, connection.stream.sink) return } - break + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err.message}` + ) + return } - P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true) + + P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true) // we get the handler from the running instance // no need to create a new instance of Handler on every request const handler: Handler = this.getCoreHandlers().getHandler(task.command) diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 51efba987..85edc08c3 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -69,6 +69,7 @@ export class ComputeStartHandler extends Handler { const node = this.getOceanNode() const assets: ComputeAsset[] = [task.dataset] if (task.additionalDatasets) assets.push(...task.additionalDatasets) + const { algorithm } = task let foundValidCompute = null const algoChecksums = await getAlgoChecksums( @@ -245,6 +246,18 @@ export class ComputeStartHandler extends Handler { validUntil: validFee.validUntil } } + if (!('meta' in algorithm) && ddo.metadata.type === 'algorithm') { + const { entrypoint, image, tag, checksum } = ddo.metadata.algorithm.container + const container = { entrypoint, image, tag, checksum } + algorithm.meta = { + language: ddo.metadata.algorithm.language, + version: ddo.metadata.algorithm.version, + container: container + } + if ('format' in ddo.metadata.algorithm) { + algorithm.meta.format = ddo.metadata.algorithm.format + } + } } } if (!foundValidCompute) { @@ -265,7 +278,7 @@ export class ComputeStartHandler extends Handler { const response = await engine.startComputeJob( assets, - task.algorithm, + algorithm, task.output, task.consumerAddress, envId, diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index adf29ac4b..9758503d9 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -4,7 +4,8 @@ import { OceanNodeStatus, OceanNodeProvider, OceanNodeIndexer, - StorageTypes + StorageTypes, + OceanNodeConfig } from '../../../@types/OceanNode.js' import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js' import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js' @@ -12,26 +13,11 @@ import { CORE_LOGGER } from '../../../utils/logging/common.js' import { OceanNode } from '../../../OceanNode.js' import { isAddress } from 'ethers' import { schemas } from '../../database/schemas.js' +import { SupportedNetwork } from '../../../@types/blockchain.js' -export async function status( - oceanNode: OceanNode, - nodeId?: string, - detailed: boolean = false -): Promise { - CORE_LOGGER.logMessage('Command status started execution...', true) - if (!oceanNode) { - CORE_LOGGER.logMessageWithEmoji( - 'Node object not found. Cannot proceed with status command.', - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - return - } - const config = await getConfiguration() - +function getAdminAddresses(config: OceanNodeConfig) { const validAddresses = [] - if (config.allowedAdmins) { + if (config.allowedAdmins && config.allowedAdmins.length > 0) { for (const admin of config.allowedAdmins) { if (isAddress(admin) === true) { validAddresses.push(admin) @@ -44,88 +30,129 @@ export async function status( ) } } - const status: OceanNodeStatus = { - id: undefined, - publicKey: undefined, - address: undefined, - version: undefined, - http: undefined, - p2p: undefined, + return validAddresses +} +const supportedStorageTypes: StorageTypes = { + url: true, + arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), + ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) +} + +// platform information +const platformInfo = { + cpus: os.cpus().length, + freemem: os.freemem(), + totalmem: os.totalmem(), + loadavg: os.loadavg(), + arch: os.arch(), + machine: os.machine(), + platform: os.platform(), + osType: os.type(), + node: process.version +} + +async function getIndexerAndProviderInfo( + oceanNode: OceanNode, + config: OceanNodeConfig +): Promise { + const nodeStatus: any = { provider: [], - indexer: [], - supportedStorage: undefined, - uptime: process.uptime(), - platform: { - cpus: os.cpus().length, - freemem: os.freemem(), - totalmem: os.totalmem(), - loadavg: os.loadavg(), - arch: os.arch(), - machine: os.machine(), - platform: os.platform(), - osType: os.type(), - node: process.version - }, - codeHash: config.codeHash, - allowedAdmins: validAddresses + indexer: [] + } + for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { + if (config.hasProvider) { + const provider: OceanNodeProvider = { + chainId: key, + network: supportedNetwork.network + } + nodeStatus.provider.push(provider) + } + if (config.hasIndexer) { + const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork) + const indexer: OceanNodeIndexer = { + chainId: key, + network: supportedNetwork.network, + block: blockNr + } + nodeStatus.indexer.push(indexer) + } } + return nodeStatus +} - if (nodeId && nodeId !== undefined) { - status.id = nodeId - } else { - // get current node ID - status.id = config.keys.peerId.toString() +async function getIndexerBlockInfo( + oceanNode: OceanNode, + supportedNetwork: SupportedNetwork +): Promise { + let blockNr = '0' + try { + const { indexer: indexerDatabase } = oceanNode.getDatabase() + const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId) + blockNr = lastIndexedBlock.toString() + } catch (error) { + CORE_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error fetching last indexed block for network ${supportedNetwork.network}` + ) } + return blockNr +} - const supportedStorageTypes: StorageTypes = { - url: true, - arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), - ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) +let nodeStatus: OceanNodeStatus = null + +export async function status( + oceanNode: OceanNode, + nodeId?: string, + detailed: boolean = false +): Promise { + CORE_LOGGER.logMessage('Command status started execution...', true) + if (!oceanNode) { + CORE_LOGGER.logMessageWithEmoji( + 'Node object not found. Cannot proceed with status command.', + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return } + const config = await getConfiguration() - status.version = process.env.npm_package_version - status.publicKey = Buffer.from(config.keys.publicKey).toString('hex') - status.address = config.keys.ethAddress - status.http = config.hasHttp - status.p2p = config.hasP2P - status.supportedStorage = supportedStorageTypes + // no previous status? + if (!nodeStatus) { + nodeStatus = { + id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID + publicKey: Buffer.from(config.keys.publicKey).toString('hex'), + address: config.keys.ethAddress, + version: process.env.npm_package_version, + http: config.hasHttp, + p2p: config.hasP2P, + provider: [], + indexer: [], + supportedStorage: supportedStorageTypes, + // uptime: process.uptime(), + platform: platformInfo, + codeHash: config.codeHash, + allowedAdmins: getAdminAddresses(config) + } + } + // need to update at least block info if available if (config.supportedNetworks) { - for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { - if (config.hasProvider) { - const provider: OceanNodeProvider = { - chainId: key, - network: supportedNetwork.network - } - status.provider.push(provider) - } - if (config.hasIndexer) { - let blockNr = '0' - try { - const { indexer: indexerDatabase } = oceanNode.getDatabase() - const { lastIndexedBlock } = await indexerDatabase.retrieve( - supportedNetwork.chainId - ) - blockNr = lastIndexedBlock.toString() - } catch (error) { - CORE_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Error fetching last indexed block for network ${supportedNetwork.network}` - ) - } - const indexer: OceanNodeIndexer = { - chainId: key, - network: supportedNetwork.network, - block: blockNr - } - status.indexer.push(indexer) - } - } + const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config) + nodeStatus.provider = indexerAndProvider.provider + nodeStatus.indexer = indexerAndProvider.indexer } + // only these 2 might change between requests + nodeStatus.platform.freemem = os.freemem() + nodeStatus.platform.loadavg = os.loadavg() + nodeStatus.uptime = process.uptime() + + // depends on request if (detailed) { - status.c2dClusters = config.c2dClusters - status.supportedSchemas = schemas.ddoSchemas + nodeStatus.c2dClusters = config.c2dClusters + nodeStatus.supportedSchemas = schemas.ddoSchemas } - return status + + return nodeStatus } diff --git a/src/test/integration/logs.test.ts b/src/test/integration/logs.test.ts index 577b784fe..c187caff5 100644 --- a/src/test/integration/logs.test.ts +++ b/src/test/integration/logs.test.ts @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => { const endTime = new Date() // current time // Retrieve the latest log entries - let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100) + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200) logs = logs.filter((log) => log.message === newLogEntry.message) expect(logs?.length).to.equal(1)