Skip to content

Commit

Permalink
Merge pull request #238 from oceanprotocol/issue-227-get-environments
Browse files Browse the repository at this point in the history
Issue 227 get environments
  • Loading branch information
jamiehewitt15 authored Jan 29, 2024
2 parents 4951701 + 96ba202 commit 73afbff
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 3 deletions.
55 changes: 55 additions & 0 deletions src/components/core/compute.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Readable } from 'stream'
import { P2PCommandResponse } from '../../@types'
import { CORE_LOGGER } from '../../utils/logging/common.js'
import { Handler } from './handler.js'
import { GetEnvironments } from '../../utils/constants.js'
import { getConfiguration } from '../../utils/config.js'
import axios from 'axios'

export class GetEnvironmentsHandler extends Handler {
async handle(task: GetEnvironments): Promise<P2PCommandResponse> {
try {
CORE_LOGGER.logMessage(
'File Info Request recieved with arguments: ' + JSON.stringify(task, null, 2),
true
)
const response: any[] = []
const config = await getConfiguration()
const { c2dClusters } = config
for (const cluster of c2dClusters) {
CORE_LOGGER.logMessage(
`Requesting environment from Operator URL: ${cluster.url}`,
true
)
const url = `${cluster.url}api/v1/operator/environments?chain_id=${task.chainId}`
const { data } = await axios.get(url)
const { hash } = cluster
for (const item of data) {
item.id = hash + '-' + item.id
}
response.push(...data)
}

CORE_LOGGER.logMessage(
'File Info Response: ' + JSON.stringify(response, null, 2),
true
)

return {
stream: Readable.from(JSON.stringify(response)),
status: {
httpStatus: 200
}
}
} catch (error) {
CORE_LOGGER.error(error.message)
return {
stream: null,
status: {
httpStatus: 500,
error: error.message
}
}
}
}
}
5 changes: 5 additions & 0 deletions src/components/core/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { QueryHandler } from './queryHandler.js'
import { StatusHandler } from './statusHandler.js'
import { ReindexHandler } from './reindexHandler.js'
import { OceanNode } from '../../OceanNode.js'
import { GetEnvironmentsHandler } from './compute.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -71,6 +72,10 @@ export class CoreHandlersRegistry {
this.registerCoreHandler(PROTOCOL_COMMANDS.REINDEX, new ReindexHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.FILE_INFO, new FileInfoHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.VALIDATE_DDO, new ValidateDDOHandler(node))
this.registerCoreHandler(
PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS,
new GetEnvironmentsHandler(node)
)
}

public static getInstance(node: OceanNode): CoreHandlersRegistry {
Expand Down
46 changes: 46 additions & 0 deletions src/components/httpRoutes/compute.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import express from 'express'
import { GetEnvironmentsHandler } from '../core/compute.js'
import { streamToObject } from '../../utils/util.js'
import { PROTOCOL_COMMANDS } from '../../utils/constants.js'
import { Readable } from 'stream'
import { HTTP_LOGGER } from '../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js'

export const computeRoutes = express.Router()

computeRoutes.get('/api/services/computeEnvironments', async (req, res) => {
try {
HTTP_LOGGER.logMessage(
`GET computeEnvironments request received with query: ${JSON.stringify(req.query)}`,
true
)
const chainId = parseInt(req.query.chainId as string)

if (isNaN(chainId) || chainId < 1) {
HTTP_LOGGER.logMessage(
`Invalid chainId: ${chainId} on GET computeEnvironments request`,
true
)
return res.status(400).send('Invalid chainId')
}

const getEnvironmentsTask = {
command: PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS,
chainId,
node: req.query.node as string
}
const response = await new GetEnvironmentsHandler().handle(getEnvironmentsTask) // get compute environments
const computeEnvironments = await streamToObject(response.stream as Readable)

// check if computeEnvironments is a valid json object and not empty
if (computeEnvironments && computeEnvironments.length > 0) {
res.json(computeEnvironments)
} else {
HTTP_LOGGER.logMessage(`Compute environments not found`, true)
res.status(404).send('Compute environments not found')
}
} catch (error) {
HTTP_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error: ${error}`)
res.status(500).send('Internal Server Error')
}
})
2 changes: 2 additions & 0 deletions src/components/httpRoutes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { aquariusRoutes } from './aquarius.js'
import { rootEndpointRoutes } from './rootEndpoint.js'
import { downloadRoute } from './download.js'
import { fileInfoRoute } from './fileInfo.js'
import { computeRoutes } from './compute.js'
export * from './getOceanPeers.js'

export const httpRoutes = express.Router()
Expand All @@ -25,3 +26,4 @@ httpRoutes.use(fileInfoRoute)
httpRoutes.use('/api/services/', providerRoutes)
httpRoutes.use('/api/aquarius/', aquariusRoutes)
httpRoutes.use(rootEndpointRoutes)
httpRoutes.use(computeRoutes)
4 changes: 4 additions & 0 deletions src/components/httpRoutes/validateCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ export function validateCommandAPIParameters(requestBody: any): ValidateParams {
'Missing required parameter(s): "fileIndex","documentId", "serviceId","transferTxId", "nonce","consumerAddress", "signature"'
)
}
} else if (command === PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS) {
if (!requestBody.chainId) {
return buildInvalidRequestMessage('Missing required parameter: "chainId"')
}
}
// only once is enough :-)
return {
Expand Down
61 changes: 61 additions & 0 deletions src/test/integration/compute.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { expect, assert } from 'chai'
import { GetEnvironmentsHandler } from '../../components/core/compute.js'
import { getConfiguration } from '../../utils/config.js'
import { Database } from '../../components/database/index.js'
import { OceanNode } from '../../OceanNode.js'
import { PROTOCOL_COMMANDS } from '../../utils/constants.js'
import { OceanNodeConfig } from '../../@types/OceanNode.js'
import { Readable } from 'stream'
import { streamToObject } from '../../utils/util.js'

describe('Compute', () => {
let config: OceanNodeConfig
let dbconn: Database
let oceanNode: OceanNode

before(async () => {
config = await getConfiguration(true) // Force reload the configuration
dbconn = await new Database(config.dbConfig)
oceanNode = await OceanNode.getInstance(dbconn)
})

it('Sets up compute envs', async () => {
assert(oceanNode, 'Failed to instantiate OceanNode')
assert(config.c2dClusters, 'Failed to get c2dClusters')
})

it('Get compute environments', async () => {
const getEnvironmentsTask = {
command: PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS,
chainId: 8996
}
const response = await new GetEnvironmentsHandler(oceanNode).handle(
getEnvironmentsTask
)

assert(response, 'Failed to get response')
assert(response.status.httpStatus === 200, 'Failed to get 200 response')
assert(response.stream, 'Failed to get stream')
expect(response.stream).to.be.instanceOf(Readable)

const computeEnvironments = await streamToObject(response.stream as Readable)

for (const computeEnvironment of computeEnvironments) {
assert(computeEnvironment.id, 'id missing in computeEnvironments')
assert(
computeEnvironment.consumerAddress,
'consumerAddress missing in computeEnvironments'
)
assert(computeEnvironment.lastSeen, 'lastSeen missing in computeEnvironments')
assert(computeEnvironment.id.startsWith('0x'), 'id should start with 0x')
assert(computeEnvironment.cpuNumber > 0, 'cpuNumber missing in computeEnvironments')
assert(computeEnvironment.ramGB > 0, 'ramGB missing in computeEnvironments')
assert(computeEnvironment.diskGB > 0, 'diskGB missing in computeEnvironments')
assert(computeEnvironment.maxJobs > 0, 'maxJobs missing in computeEnvironments')
assert(
computeEnvironment.maxJobDuration > 0,
'maxJobDuration missing in computeEnvironments'
)
}
})
})
2 changes: 1 addition & 1 deletion src/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ function getC2DClusterEnvironment(): C2DClusterInfo[] {
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return clusters
}
return clusters
}

/**
Expand Down
9 changes: 7 additions & 2 deletions src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export const PROTOCOL_COMMANDS = {
FIND_DDO: 'findDDO',
GET_FEES: 'getFees',
FILE_INFO: 'fileInfo',
VALIDATE_DDO: 'validateDDO'
VALIDATE_DDO: 'validateDDO',
GET_COMPUTE_ENVIRONMENTS: 'getComputeEnvironments'
}
// more visible, keep then close to make sure we always update both
export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [
Expand All @@ -34,7 +35,8 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [
PROTOCOL_COMMANDS.FIND_DDO,
PROTOCOL_COMMANDS.GET_FEES,
PROTOCOL_COMMANDS.FILE_INFO,
PROTOCOL_COMMANDS.VALIDATE_DDO
PROTOCOL_COMMANDS.VALIDATE_DDO,
PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS
]

export interface Command {
Expand Down Expand Up @@ -74,6 +76,9 @@ export interface DDOCommand extends Command {
}
export interface GetDdoCommand extends DDOCommand {}
export interface FindDDOCommand extends DDOCommand {}
export interface GetEnvironments extends Command {
chainId: number
}
export interface ValidateDDOCommand extends DDOCommand {
chainId: number
nftAddress: string
Expand Down

0 comments on commit 73afbff

Please sign in to comment.