diff --git a/src/components/core/compute.ts b/src/components/core/compute.ts new file mode 100644 index 000000000..f93a3132e --- /dev/null +++ b/src/components/core/compute.ts @@ -0,0 +1,55 @@ +import { Readable } from 'stream' +import { P2PCommandResponse } from '../../@types' +import { CORE_LOGGER } from '../../utils/logging/common.js' +import { Handler } from './handler.js' +import { GetEnvironments } from '../../utils/constants.js' +import { getConfiguration } from '../../utils/config.js' +import axios from 'axios' + +export class GetEnvironmentsHandler extends Handler { + async handle(task: GetEnvironments): Promise { + try { + CORE_LOGGER.logMessage( + 'File Info Request recieved with arguments: ' + JSON.stringify(task, null, 2), + true + ) + const response: any[] = [] + const config = await getConfiguration() + const { c2dClusters } = config + for (const cluster of c2dClusters) { + CORE_LOGGER.logMessage( + `Requesting environment from Operator URL: ${cluster.url}`, + true + ) + const url = `${cluster.url}api/v1/operator/environments?chain_id=${task.chainId}` + const { data } = await axios.get(url) + const { hash } = cluster + for (const item of data) { + item.id = hash + '-' + item.id + } + response.push(...data) + } + + CORE_LOGGER.logMessage( + 'File Info Response: ' + JSON.stringify(response, null, 2), + true + ) + + return { + stream: Readable.from(JSON.stringify(response)), + status: { + httpStatus: 200 + } + } + } catch (error) { + CORE_LOGGER.error(error.message) + return { + stream: null, + status: { + httpStatus: 500, + error: error.message + } + } + } + } +} diff --git a/src/components/core/coreHandlersRegistry.ts b/src/components/core/coreHandlersRegistry.ts index 508c987d0..7ddeed641 100644 --- a/src/components/core/coreHandlersRegistry.ts +++ b/src/components/core/coreHandlersRegistry.ts @@ -21,6 +21,7 @@ import { QueryHandler } from './queryHandler.js' import { StatusHandler } from './statusHandler.js' import { ReindexHandler } from './reindexHandler.js' import { OceanNode } from '../../OceanNode.js' +import { GetEnvironmentsHandler } from './compute.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -71,6 +72,10 @@ export class CoreHandlersRegistry { this.registerCoreHandler(PROTOCOL_COMMANDS.REINDEX, new ReindexHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.FILE_INFO, new FileInfoHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.VALIDATE_DDO, new ValidateDDOHandler(node)) + this.registerCoreHandler( + PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS, + new GetEnvironmentsHandler(node) + ) } public static getInstance(node: OceanNode): CoreHandlersRegistry { diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts new file mode 100644 index 000000000..3e3e41c49 --- /dev/null +++ b/src/components/httpRoutes/compute.ts @@ -0,0 +1,46 @@ +import express from 'express' +import { GetEnvironmentsHandler } from '../core/compute.js' +import { streamToObject } from '../../utils/util.js' +import { PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { Readable } from 'stream' +import { HTTP_LOGGER } from '../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js' + +export const computeRoutes = express.Router() + +computeRoutes.get('/api/services/computeEnvironments', async (req, res) => { + try { + HTTP_LOGGER.logMessage( + `GET computeEnvironments request received with query: ${JSON.stringify(req.query)}`, + true + ) + const chainId = parseInt(req.query.chainId as string) + + if (isNaN(chainId) || chainId < 1) { + HTTP_LOGGER.logMessage( + `Invalid chainId: ${chainId} on GET computeEnvironments request`, + true + ) + return res.status(400).send('Invalid chainId') + } + + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS, + chainId, + node: req.query.node as string + } + const response = await new GetEnvironmentsHandler().handle(getEnvironmentsTask) // get compute environments + const computeEnvironments = await streamToObject(response.stream as Readable) + + // check if computeEnvironments is a valid json object and not empty + if (computeEnvironments && computeEnvironments.length > 0) { + res.json(computeEnvironments) + } else { + HTTP_LOGGER.logMessage(`Compute environments not found`, true) + res.status(404).send('Compute environments not found') + } + } catch (error) { + HTTP_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error: ${error}`) + res.status(500).send('Internal Server Error') + } +}) diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index f8099620d..23fc9f701 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -8,6 +8,7 @@ import { aquariusRoutes } from './aquarius.js' import { rootEndpointRoutes } from './rootEndpoint.js' import { downloadRoute } from './download.js' import { fileInfoRoute } from './fileInfo.js' +import { computeRoutes } from './compute.js' export * from './getOceanPeers.js' export const httpRoutes = express.Router() @@ -25,3 +26,4 @@ httpRoutes.use(fileInfoRoute) httpRoutes.use('/api/services/', providerRoutes) httpRoutes.use('/api/aquarius/', aquariusRoutes) httpRoutes.use(rootEndpointRoutes) +httpRoutes.use(computeRoutes) diff --git a/src/components/httpRoutes/validateCommands.ts b/src/components/httpRoutes/validateCommands.ts index c76963e99..10c46db03 100644 --- a/src/components/httpRoutes/validateCommands.ts +++ b/src/components/httpRoutes/validateCommands.ts @@ -112,6 +112,10 @@ export function validateCommandAPIParameters(requestBody: any): ValidateParams { 'Missing required parameter(s): "fileIndex","documentId", "serviceId","transferTxId", "nonce","consumerAddress", "signature"' ) } + } else if (command === PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS) { + if (!requestBody.chainId) { + return buildInvalidRequestMessage('Missing required parameter: "chainId"') + } } // only once is enough :-) return { diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts new file mode 100644 index 000000000..53e4c28d4 --- /dev/null +++ b/src/test/integration/compute.test.ts @@ -0,0 +1,61 @@ +import { expect, assert } from 'chai' +import { GetEnvironmentsHandler } from '../../components/core/compute.js' +import { getConfiguration } from '../../utils/config.js' +import { Database } from '../../components/database/index.js' +import { OceanNode } from '../../OceanNode.js' +import { PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { OceanNodeConfig } from '../../@types/OceanNode.js' +import { Readable } from 'stream' +import { streamToObject } from '../../utils/util.js' + +describe('Compute', () => { + let config: OceanNodeConfig + let dbconn: Database + let oceanNode: OceanNode + + before(async () => { + config = await getConfiguration(true) // Force reload the configuration + dbconn = await new Database(config.dbConfig) + oceanNode = await OceanNode.getInstance(dbconn) + }) + + it('Sets up compute envs', async () => { + assert(oceanNode, 'Failed to instantiate OceanNode') + assert(config.c2dClusters, 'Failed to get c2dClusters') + }) + + it('Get compute environments', async () => { + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS, + chainId: 8996 + } + const response = await new GetEnvironmentsHandler(oceanNode).handle( + getEnvironmentsTask + ) + + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + + const computeEnvironments = await streamToObject(response.stream as Readable) + + for (const computeEnvironment of computeEnvironments) { + assert(computeEnvironment.id, 'id missing in computeEnvironments') + assert( + computeEnvironment.consumerAddress, + 'consumerAddress missing in computeEnvironments' + ) + assert(computeEnvironment.lastSeen, 'lastSeen missing in computeEnvironments') + assert(computeEnvironment.id.startsWith('0x'), 'id should start with 0x') + assert(computeEnvironment.cpuNumber > 0, 'cpuNumber missing in computeEnvironments') + assert(computeEnvironment.ramGB > 0, 'ramGB missing in computeEnvironments') + assert(computeEnvironment.diskGB > 0, 'diskGB missing in computeEnvironments') + assert(computeEnvironment.maxJobs > 0, 'maxJobs missing in computeEnvironments') + assert( + computeEnvironment.maxJobDuration > 0, + 'maxJobDuration missing in computeEnvironments' + ) + } + }) +}) diff --git a/src/utils/config.ts b/src/utils/config.ts index 9ae867cf1..880c1b234 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -233,8 +233,8 @@ function getC2DClusterEnvironment(): C2DClusterInfo[] { GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - return clusters } + return clusters } /** diff --git a/src/utils/constants.ts b/src/utils/constants.ts index b48db63e0..1fb50ebe3 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -18,7 +18,8 @@ export const PROTOCOL_COMMANDS = { FIND_DDO: 'findDDO', GET_FEES: 'getFees', FILE_INFO: 'fileInfo', - VALIDATE_DDO: 'validateDDO' + VALIDATE_DDO: 'validateDDO', + GET_COMPUTE_ENVIRONMENTS: 'getComputeEnvironments' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -34,7 +35,8 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.FIND_DDO, PROTOCOL_COMMANDS.GET_FEES, PROTOCOL_COMMANDS.FILE_INFO, - PROTOCOL_COMMANDS.VALIDATE_DDO + PROTOCOL_COMMANDS.VALIDATE_DDO, + PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS ] export interface Command { @@ -74,6 +76,9 @@ export interface DDOCommand extends Command { } export interface GetDdoCommand extends DDOCommand {} export interface FindDDOCommand extends DDOCommand {} +export interface GetEnvironments extends Command { + chainId: number +} export interface ValidateDDOCommand extends DDOCommand { chainId: number nftAddress: string