Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 565 optimize get status #566

Merged
merged 10 commits into from
Jul 31, 2024
34 changes: 26 additions & 8 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ export async function handleProtocolCommands(connection: any) {
let statusStream
let sendStream = null

const buildWrongCommandStatus = function (errorCode: number, message: string) {
status = {
httpStatus: errorCode,
error: message
}
return status
}

const denyList = await (await getConfiguration()).denyList
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
status = {
httpStatus: 403,
error: 'Unauthorized request'
}
statusStream = new ReadableString(JSON.stringify(status))

statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
pipe(statusStream, connection.stream.sink)
return
}
Expand All @@ -58,14 +65,25 @@ export async function handleProtocolCommands(connection: any) {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
status = { httpStatus: 400, error: 'Invalid command' }
statusStream = new ReadableString(JSON.stringify(status))
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
break
}
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)
if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
} else {
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)
}

// we get the handler from the running instance
// no need to create a new instance of Handler on every request
const handler: Handler = this.getCoreHandlers().getHandler(task.command)
Expand Down
207 changes: 117 additions & 90 deletions src/components/core/utils/statusHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,20 @@ import {
OceanNodeStatus,
OceanNodeProvider,
OceanNodeIndexer,
StorageTypes
StorageTypes,
OceanNodeConfig
} from '../../../@types/OceanNode.js'
import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js'
import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { OceanNode } from '../../../OceanNode.js'
import { isAddress } from 'ethers'
import { schemas } from '../../database/schemas.js'
import { SupportedNetwork } from '../../../@types/blockchain.js'

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

function getAdminAddresses(config: OceanNodeConfig) {
const validAddresses = []
if (config.allowedAdmins) {
if (config.allowedAdmins && config.allowedAdmins.length > 0) {
for (const admin of config.allowedAdmins) {
if (isAddress(admin) === true) {
validAddresses.push(admin)
Expand All @@ -44,88 +30,129 @@ export async function status(
)
}
}
const status: OceanNodeStatus = {
id: undefined,
publicKey: undefined,
address: undefined,
version: undefined,
http: undefined,
p2p: undefined,
return validAddresses
}
const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
}

// platform information
const platformInfo = {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
}

async function getIndexerAndProviderInfo(
oceanNode: OceanNode,
config: OceanNodeConfig
): Promise<any> {
const nodeStatus: any = {
provider: [],
indexer: [],
supportedStorage: undefined,
uptime: process.uptime(),
platform: {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
},
codeHash: config.codeHash,
allowedAdmins: validAddresses
indexer: []
}
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
nodeStatus.provider.push(provider)
}
if (config.hasIndexer) {
const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork)
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
nodeStatus.indexer.push(indexer)
}
}
return nodeStatus
}

if (nodeId && nodeId !== undefined) {
status.id = nodeId
} else {
// get current node ID
status.id = config.keys.peerId.toString()
async function getIndexerBlockInfo(
oceanNode: OceanNode,
supportedNetwork: SupportedNetwork
): Promise<string> {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
return blockNr
}

const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
let nodeStatus: OceanNodeStatus = null

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

status.version = process.env.npm_package_version
status.publicKey = Buffer.from(config.keys.publicKey).toString('hex')
status.address = config.keys.ethAddress
status.http = config.hasHttp
status.p2p = config.hasP2P
status.supportedStorage = supportedStorageTypes
// no previous status?
if (!nodeStatus) {
nodeStatus = {
id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID
publicKey: Buffer.from(config.keys.publicKey).toString('hex'),
address: config.keys.ethAddress,
version: process.env.npm_package_version,
http: config.hasHttp,
p2p: config.hasP2P,
provider: [],
indexer: [],
supportedStorage: supportedStorageTypes,
// uptime: process.uptime(),
platform: platformInfo,
codeHash: config.codeHash,
allowedAdmins: getAdminAddresses(config)
}
}

// need to update at least block info if available
if (config.supportedNetworks) {
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
status.provider.push(provider)
}
if (config.hasIndexer) {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(
supportedNetwork.chainId
)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
status.indexer.push(indexer)
}
}
const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config)
nodeStatus.provider = indexerAndProvider.provider
nodeStatus.indexer = indexerAndProvider.indexer
}

// only these 2 might change between requests
nodeStatus.platform.freemem = os.freemem()
nodeStatus.platform.loadavg = os.loadavg()
nodeStatus.uptime = process.uptime()

// depends on request
if (detailed) {
status.c2dClusters = config.c2dClusters
status.supportedSchemas = schemas.ddoSchemas
nodeStatus.c2dClusters = config.c2dClusters
nodeStatus.supportedSchemas = schemas.ddoSchemas
}
return status

return nodeStatus
}
2 changes: 1 addition & 1 deletion src/test/integration/logs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => {
const endTime = new Date() // current time

// Retrieve the latest log entries
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100)
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200)
logs = logs.filter((log) => log.message === newLogEntry.message)

expect(logs?.length).to.equal(1)
Expand Down
Loading