Skip to content

Commit

Permalink
Merge branch 'develop' into issue-optimize-cpu
Browse files Browse the repository at this point in the history
  • Loading branch information
paulo-ocean committed Jul 31, 2024
2 parents 97403eb + 2595e0d commit 3561394
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 108 deletions.
5 changes: 5 additions & 0 deletions src/@types/DDO/Metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export interface MetadataAlgorithm {
* @type {string}
*/
rawcode?: string
/**
* Format of the algorithm
* @type {string}
*/
format?: string

/**
* Object describing the Docker container image.
Expand Down
2 changes: 1 addition & 1 deletion src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ export const processChunkLogs = async (
if (!allowed.length) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Metadata Proof validator not allowed`,
`Metadata Proof validators list is empty`,
true
)
continue
Expand Down
54 changes: 39 additions & 15 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,58 @@ export async function handleProtocolCommands(connection: any) {
let statusStream
let sendStream = null

const buildWrongCommandStatus = function (errorCode: number, message: string) {
status = {
httpStatus: errorCode,
error: message
}
return status
}

const denyList = await (await getConfiguration()).denyList
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
status = {
httpStatus: 403,
error: 'Unauthorized request'
}
statusStream = new ReadableString(JSON.stringify(status))

statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
pipe(statusStream, connection.stream.sink)
return
}
}

/* eslint no-unreachable-loop: ["error", { "ignore": ["ForInStatement", "ForOfStatement"] }] */
for await (const chunk of connection.stream.source) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
status = { httpStatus: 400, error: 'Invalid command' }
statusStream = new ReadableString(JSON.stringify(status))
try {
// eslint-disable-next-line no-unreachable-loop
for await (const chunk of connection.stream.source) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
}
if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
break
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
)
return
}
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)

P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true)
// we get the handler from the running instance
// no need to create a new instance of Handler on every request
const handler: Handler = this.getCoreHandlers().getHandler(task.command)
Expand Down
15 changes: 14 additions & 1 deletion src/components/core/compute/startCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class ComputeStartHandler extends Handler {
const node = this.getOceanNode()
const assets: ComputeAsset[] = [task.dataset]
if (task.additionalDatasets) assets.push(...task.additionalDatasets)
const { algorithm } = task
let foundValidCompute = null

const algoChecksums = await getAlgoChecksums(
Expand Down Expand Up @@ -245,6 +246,18 @@ export class ComputeStartHandler extends Handler {
validUntil: validFee.validUntil
}
}
if (!('meta' in algorithm) && ddo.metadata.type === 'algorithm') {
const { entrypoint, image, tag, checksum } = ddo.metadata.algorithm.container
const container = { entrypoint, image, tag, checksum }
algorithm.meta = {
language: ddo.metadata.algorithm.language,
version: ddo.metadata.algorithm.version,
container: container

Check warning on line 255 in src/components/core/compute/startCompute.ts

View workflow job for this annotation

GitHub Actions / lint

Expected property shorthand
}
if ('format' in ddo.metadata.algorithm) {
algorithm.meta.format = ddo.metadata.algorithm.format
}
}
}
}
if (!foundValidCompute) {
Expand All @@ -265,7 +278,7 @@ export class ComputeStartHandler extends Handler {

const response = await engine.startComputeJob(
assets,
task.algorithm,
algorithm,
task.output,
task.consumerAddress,
envId,
Expand Down
207 changes: 117 additions & 90 deletions src/components/core/utils/statusHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,20 @@ import {
OceanNodeStatus,
OceanNodeProvider,
OceanNodeIndexer,
StorageTypes
StorageTypes,
OceanNodeConfig
} from '../../../@types/OceanNode.js'
import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js'
import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { OceanNode } from '../../../OceanNode.js'
import { isAddress } from 'ethers'
import { schemas } from '../../database/schemas.js'
import { SupportedNetwork } from '../../../@types/blockchain.js'

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

function getAdminAddresses(config: OceanNodeConfig) {
const validAddresses = []
if (config.allowedAdmins) {
if (config.allowedAdmins && config.allowedAdmins.length > 0) {
for (const admin of config.allowedAdmins) {
if (isAddress(admin) === true) {
validAddresses.push(admin)
Expand All @@ -44,88 +30,129 @@ export async function status(
)
}
}
const status: OceanNodeStatus = {
id: undefined,
publicKey: undefined,
address: undefined,
version: undefined,
http: undefined,
p2p: undefined,
return validAddresses
}
const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
}

// platform information
const platformInfo = {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
}

async function getIndexerAndProviderInfo(
oceanNode: OceanNode,
config: OceanNodeConfig
): Promise<any> {
const nodeStatus: any = {
provider: [],
indexer: [],
supportedStorage: undefined,
uptime: process.uptime(),
platform: {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
},
codeHash: config.codeHash,
allowedAdmins: validAddresses
indexer: []
}
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
nodeStatus.provider.push(provider)
}
if (config.hasIndexer) {
const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork)
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
nodeStatus.indexer.push(indexer)
}
}
return nodeStatus
}

if (nodeId && nodeId !== undefined) {
status.id = nodeId
} else {
// get current node ID
status.id = config.keys.peerId.toString()
async function getIndexerBlockInfo(
oceanNode: OceanNode,
supportedNetwork: SupportedNetwork
): Promise<string> {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
return blockNr
}

const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
let nodeStatus: OceanNodeStatus = null

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

status.version = process.env.npm_package_version
status.publicKey = Buffer.from(config.keys.publicKey).toString('hex')
status.address = config.keys.ethAddress
status.http = config.hasHttp
status.p2p = config.hasP2P
status.supportedStorage = supportedStorageTypes
// no previous status?
if (!nodeStatus) {
nodeStatus = {
id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID
publicKey: Buffer.from(config.keys.publicKey).toString('hex'),
address: config.keys.ethAddress,
version: process.env.npm_package_version,
http: config.hasHttp,
p2p: config.hasP2P,
provider: [],
indexer: [],
supportedStorage: supportedStorageTypes,
// uptime: process.uptime(),
platform: platformInfo,
codeHash: config.codeHash,
allowedAdmins: getAdminAddresses(config)
}
}

// need to update at least block info if available
if (config.supportedNetworks) {
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
status.provider.push(provider)
}
if (config.hasIndexer) {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(
supportedNetwork.chainId
)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
status.indexer.push(indexer)
}
}
const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config)
nodeStatus.provider = indexerAndProvider.provider
nodeStatus.indexer = indexerAndProvider.indexer
}

// only these 2 might change between requests
nodeStatus.platform.freemem = os.freemem()
nodeStatus.platform.loadavg = os.loadavg()
nodeStatus.uptime = process.uptime()

// depends on request
if (detailed) {
status.c2dClusters = config.c2dClusters
status.supportedSchemas = schemas.ddoSchemas
nodeStatus.c2dClusters = config.c2dClusters
nodeStatus.supportedSchemas = schemas.ddoSchemas
}
return status

return nodeStatus
}
2 changes: 1 addition & 1 deletion src/test/integration/logs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => {
const endTime = new Date() // current time

// Retrieve the latest log entries
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100)
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200)
logs = logs.filter((log) => log.message === newLogEntry.message)

expect(logs?.length).to.equal(1)
Expand Down

0 comments on commit 3561394

Please sign in to comment.