Skip to content

Commit

Permalink
Merge pull request #566 from oceanprotocol/issue-565-optimize-get-status
Browse files Browse the repository at this point in the history
Issue 565 optimize get status
  • Loading branch information
paulo-ocean authored Jul 31, 2024
2 parents 84829ee + 5b75d54 commit f4bfb44
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 137 deletions.
51 changes: 13 additions & 38 deletions dashboard/src/components/IndexQueue.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from '@mui/material'
import styles from './Dashboard/index.module.css'
import { useAdminContext } from '@/context/AdminProvider'
import Alert from '@mui/material/Alert'

interface QueueItem {
txId: string
Expand All @@ -20,49 +19,36 @@ interface QueueItem {
export default function IndexQueue() {
const [queue, setQueue] = useState<QueueItem[]>([])
const { networks } = useAdminContext()
const [avoidAskQueue, setAvoidAskQueue] = useState<boolean>(false)

let intervalId: any = null
useEffect(() => {
const fetchQueue = () => {
fetch('/api/services/indexQueue')
.then((response) => {
if (response.status === 400) {
console.warn('Cannot fetch queue: Node is not running Indexer')
setAvoidAskQueue(true)
if (intervalId) {
clearInterval(intervalId) // Stop doing this, there is no point, since we don't have Indexer
.then((response) => response.json())
.then((data) => {
const transformedQueue = data.queue.map((item: any) => {
const network = networks.find((net) => net.chainId === item.chainId)
return {
txId: item.txId,
chainId: item.chainId,
chain: network ? network.network : 'Unknown Network'
}
} else {
response.json().then((data) => {
const transformedQueue = data.queue.map((item: any) => {
const network = networks.find((net) => net.chainId === item.chainId)
return {
txId: item.txId,
chainId: item.chainId,
chain: network ? network.network : 'Unknown Network'
}
})
setQueue(transformedQueue)
})
}
})
setQueue(transformedQueue)
})
.catch((error) => {
console.error('Error fetching queue:', error)
})
}

fetchQueue() // Initial fetch
let pollingInterval = 10000 // Default polling interval (10 seconds)
let pollingInterval = 2000 // Default polling interval
if (process.env.INDEXER_INTERVAL) {
pollingInterval = Number(process.env.INDEXER_INTERVAL)
}
intervalId = setInterval(fetchQueue, pollingInterval)
const intervalId = setInterval(fetchQueue, pollingInterval)

return () => {
if (intervalId) {
clearInterval(intervalId) // Clear interval on component unmount
}
clearInterval(intervalId) // Clear interval on component unmount
}
}, [])

Expand Down Expand Up @@ -102,17 +88,6 @@ export default function IndexQueue() {
) : (
<p>Indexing queue is empty.</p>
)}
{avoidAskQueue && (
<Alert
style={{ width: 640 }}
severity="warning"
onClose={() => {
setAvoidAskQueue(false)
}}
>
Node is not running Indexer. No need to get queue at this point!
</Alert>
)}
</div>
)
}
34 changes: 26 additions & 8 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ export async function handleProtocolCommands(connection: any) {
let statusStream
let sendStream = null

const buildWrongCommandStatus = function (errorCode: number, message: string) {
status = {
httpStatus: errorCode,
error: message
}
return status
}

const denyList = await (await getConfiguration()).denyList
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
status = {
httpStatus: 403,
error: 'Unauthorized request'
}
statusStream = new ReadableString(JSON.stringify(status))

statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
pipe(statusStream, connection.stream.sink)
return
}
Expand All @@ -58,14 +65,25 @@ export async function handleProtocolCommands(connection: any) {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
status = { httpStatus: 400, error: 'Invalid command' }
statusStream = new ReadableString(JSON.stringify(status))
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
break
}
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)
if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
} else {
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)
}

// we get the handler from the running instance
// no need to create a new instance of Handler on every request
const handler: Handler = this.getCoreHandlers().getHandler(task.command)
Expand Down
207 changes: 117 additions & 90 deletions src/components/core/utils/statusHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,20 @@ import {
OceanNodeStatus,
OceanNodeProvider,
OceanNodeIndexer,
StorageTypes
StorageTypes,
OceanNodeConfig
} from '../../../@types/OceanNode.js'
import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js'
import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { OceanNode } from '../../../OceanNode.js'
import { isAddress } from 'ethers'
import { schemas } from '../../database/schemas.js'
import { SupportedNetwork } from '../../../@types/blockchain.js'

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

function getAdminAddresses(config: OceanNodeConfig) {
const validAddresses = []
if (config.allowedAdmins) {
if (config.allowedAdmins && config.allowedAdmins.length > 0) {
for (const admin of config.allowedAdmins) {
if (isAddress(admin) === true) {
validAddresses.push(admin)
Expand All @@ -44,88 +30,129 @@ export async function status(
)
}
}
const status: OceanNodeStatus = {
id: undefined,
publicKey: undefined,
address: undefined,
version: undefined,
http: undefined,
p2p: undefined,
return validAddresses
}
const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
}

// platform information
const platformInfo = {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
}

async function getIndexerAndProviderInfo(
oceanNode: OceanNode,
config: OceanNodeConfig
): Promise<any> {
const nodeStatus: any = {
provider: [],
indexer: [],
supportedStorage: undefined,
uptime: process.uptime(),
platform: {
cpus: os.cpus().length,
freemem: os.freemem(),
totalmem: os.totalmem(),
loadavg: os.loadavg(),
arch: os.arch(),
machine: os.machine(),
platform: os.platform(),
osType: os.type(),
node: process.version
},
codeHash: config.codeHash,
allowedAdmins: validAddresses
indexer: []
}
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
nodeStatus.provider.push(provider)
}
if (config.hasIndexer) {
const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork)
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
nodeStatus.indexer.push(indexer)
}
}
return nodeStatus
}

if (nodeId && nodeId !== undefined) {
status.id = nodeId
} else {
// get current node ID
status.id = config.keys.peerId.toString()
async function getIndexerBlockInfo(
oceanNode: OceanNode,
supportedNetwork: SupportedNetwork
): Promise<string> {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
return blockNr
}

const supportedStorageTypes: StorageTypes = {
url: true,
arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY),
ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY)
let nodeStatus: OceanNodeStatus = null

export async function status(
oceanNode: OceanNode,
nodeId?: string,
detailed: boolean = false
): Promise<OceanNodeStatus> {
CORE_LOGGER.logMessage('Command status started execution...', true)
if (!oceanNode) {
CORE_LOGGER.logMessageWithEmoji(
'Node object not found. Cannot proceed with status command.',
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return
}
const config = await getConfiguration()

status.version = process.env.npm_package_version
status.publicKey = Buffer.from(config.keys.publicKey).toString('hex')
status.address = config.keys.ethAddress
status.http = config.hasHttp
status.p2p = config.hasP2P
status.supportedStorage = supportedStorageTypes
// no previous status?
if (!nodeStatus) {
nodeStatus = {
id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID
publicKey: Buffer.from(config.keys.publicKey).toString('hex'),
address: config.keys.ethAddress,
version: process.env.npm_package_version,
http: config.hasHttp,
p2p: config.hasP2P,
provider: [],
indexer: [],
supportedStorage: supportedStorageTypes,
// uptime: process.uptime(),
platform: platformInfo,
codeHash: config.codeHash,
allowedAdmins: getAdminAddresses(config)
}
}

// need to update at least block info if available
if (config.supportedNetworks) {
for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) {
if (config.hasProvider) {
const provider: OceanNodeProvider = {
chainId: key,
network: supportedNetwork.network
}
status.provider.push(provider)
}
if (config.hasIndexer) {
let blockNr = '0'
try {
const { indexer: indexerDatabase } = oceanNode.getDatabase()
const { lastIndexedBlock } = await indexerDatabase.retrieve(
supportedNetwork.chainId
)
blockNr = lastIndexedBlock.toString()
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error fetching last indexed block for network ${supportedNetwork.network}`
)
}
const indexer: OceanNodeIndexer = {
chainId: key,
network: supportedNetwork.network,
block: blockNr
}
status.indexer.push(indexer)
}
}
const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config)
nodeStatus.provider = indexerAndProvider.provider
nodeStatus.indexer = indexerAndProvider.indexer
}

// only these 2 might change between requests
nodeStatus.platform.freemem = os.freemem()
nodeStatus.platform.loadavg = os.loadavg()
nodeStatus.uptime = process.uptime()

// depends on request
if (detailed) {
status.c2dClusters = config.c2dClusters
status.supportedSchemas = schemas.ddoSchemas
nodeStatus.c2dClusters = config.c2dClusters
nodeStatus.supportedSchemas = schemas.ddoSchemas
}
return status

return nodeStatus
}
2 changes: 1 addition & 1 deletion src/test/integration/logs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => {
const endTime = new Date() // current time

// Retrieve the latest log entries
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100)
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200)
logs = logs.filter((log) => log.message === newLogEntry.message)

expect(logs?.length).to.equal(1)
Expand Down

0 comments on commit f4bfb44

Please sign in to comment.