diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87c0dc7d6..0ad73914e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-node@v2 with: - node-version: '18.20.4' + node-version: 'v20.16.0' - name: Cache node_modules uses: actions/cache@v2 env: @@ -43,7 +43,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - node: ['18.20.4'] + node: ['18.20.4', 'v20.16.0', 'v22.5.1'] steps: - uses: actions/checkout@v3 @@ -67,7 +67,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-node@v2 with: - node-version: '18.20.4' + node-version: 'v20.16.0' - name: Cache node_modules uses: actions/cache@v2 env: @@ -100,7 +100,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-node@v2 with: - node-version: '18.20.4' + node-version: 'v20.16.0' - name: Cache node_modules uses: actions/cache@v2 env: @@ -189,7 +189,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v2 with: - node-version: '18.20.4' + node-version: 'v20.16.0' - name: Cache node_modules uses: actions/cache@v2 diff --git a/.nvmrc b/.nvmrc index d78bf0a56..b427e2ae2 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -v18.20.4 +v20.16.0 \ No newline at end of file diff --git a/dashboard/src/components/IndexQueue.tsx b/dashboard/src/components/IndexQueue.tsx index 625289047..15080f021 100644 --- a/dashboard/src/components/IndexQueue.tsx +++ b/dashboard/src/components/IndexQueue.tsx @@ -9,6 +9,7 @@ import { } from '@mui/material' import styles from './Dashboard/index.module.css' import { useAdminContext } from '@/context/AdminProvider' +import Alert from '@mui/material/Alert' interface QueueItem { txId: string @@ -19,21 +20,32 @@ interface QueueItem { export default function IndexQueue() { const [queue, setQueue] = useState([]) const { networks } = useAdminContext() + const [avoidAskQueue, setAvoidAskQueue] = useState(false) + let intervalId: any = null useEffect(() => { const fetchQueue = () => { fetch('/api/services/indexQueue') - .then((response) => response.json()) - .then((data) => { - const transformedQueue = data.queue.map((item: any) => { - const network = networks.find((net) => net.chainId === item.chainId) - return { - txId: item.txId, - chainId: item.chainId, - chain: network ? network.network : 'Unknown Network' + .then((response) => { + if (response.status === 400) { + console.warn('Cannot fetch queue: Node is not running Indexer') + setAvoidAskQueue(true) + if (intervalId) { + clearInterval(intervalId) // Stop doing this, there is no point, since we don't have Indexer } - }) - setQueue(transformedQueue) + } else { + response.json().then((data) => { + const transformedQueue = data.queue.map((item: any) => { + const network = networks.find((net) => net.chainId === item.chainId) + return { + txId: item.txId, + chainId: item.chainId, + chain: network ? network.network : 'Unknown Network' + } + }) + setQueue(transformedQueue) + }) + } }) .catch((error) => { console.error('Error fetching queue:', error) @@ -41,14 +53,16 @@ export default function IndexQueue() { } fetchQueue() // Initial fetch - let pollingInterval = 2000 // Default polling interval + let pollingInterval = 10000 // Default polling interval (10 seconds) if (process.env.INDEXER_INTERVAL) { pollingInterval = Number(process.env.INDEXER_INTERVAL) } - const intervalId = setInterval(fetchQueue, pollingInterval) + intervalId = setInterval(fetchQueue, pollingInterval) return () => { - clearInterval(intervalId) // Clear interval on component unmount + if (intervalId) { + clearInterval(intervalId) // Clear interval on component unmount + } } }, []) @@ -88,6 +102,17 @@ export default function IndexQueue() { ) : (

Indexing queue is empty.

)} + {avoidAskQueue && ( + { + setAvoidAskQueue(false) + }} + > + Node is not running Indexer. No need to get queue at this point! + + )} ) } diff --git a/env.md b/env.md index f21064171..1448913db 100644 --- a/env.md +++ b/env.md @@ -25,6 +25,9 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ - `MAX_REQ_PER_SECOND`: Number of requests per second allowed by the same client. Example: `3` - `MAX_CHECKSUM_LENGTH`: Define the maximum length for a file if checksum is required (Mb). Example: `10` - `LOG_LEVEL`: Define the default log level. Example: `debug` +- `LOG_CONSOLE`: Write logs to the console. Default is `false`, but becomes `true` if neither `LOG_FILES` or `LOG_DB` are set. +- `LOG_FILES`: Write logs to files. Default is `false` +- `LOG_DB`: Write logs to noSQL database. Default is `false` ## HTTP diff --git a/package-lock.json b/package-lock.json index a3e5a0313..39badfff1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,7 +23,6 @@ "@libp2p/interface-address-manager": "^3.0.1", "@libp2p/kad-dht": "^12.1.1", "@libp2p/mdns": "^10.1.1", - "@libp2p/mplex": "^10.1.1", "@libp2p/peer-id": "^4.1.4", "@libp2p/peer-id-factory": "^4.1.4", "@libp2p/ping": "^1.1.1", @@ -3088,34 +3087,6 @@ "multiformats": "^13.0.0" } }, - "node_modules/@libp2p/mplex": { - "version": "10.1.1", - "resolved": "https://registry.npmjs.org/@libp2p/mplex/-/mplex-10.1.1.tgz", - "integrity": "sha512-W2s9rC9CvvrWHKnAnSY+MVAO989KTr73ZuEpzn4LKATTmWxDFVISWehZ5i5t7cBDlX4c7vlYYeY8tSI1VBSkDw==", - "dependencies": { - "@libp2p/interface": "^1.6.0", - "@libp2p/utils": "^5.4.5", - "it-pipe": "^3.0.1", - "it-pushable": "^3.2.3", - "it-stream-types": "^2.0.1", - "uint8-varint": "^2.0.4", - "uint8arraylist": "^2.4.8", - "uint8arrays": "^5.1.0" - } - }, - "node_modules/@libp2p/mplex/node_modules/multiformats": { - "version": "13.1.1", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.1.1.tgz", - "integrity": "sha512-JiptvwMmlxlzIlLLwhCi/srf/nk409UL0eUBr0kioRJq15hqqKyg68iftrBvhCRjR6Rw4fkNnSc4ZJXJDuta/Q==" - }, - "node_modules/@libp2p/mplex/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/multistream-select": { "version": "5.1.13", "resolved": "https://registry.npmjs.org/@libp2p/multistream-select/-/multistream-select-5.1.13.tgz", diff --git a/package.json b/package.json index cecdfa217..dc1f2008c 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,6 @@ "@libp2p/interface-address-manager": "^3.0.1", "@libp2p/kad-dht": "^12.1.1", "@libp2p/mdns": "^10.1.1", - "@libp2p/mplex": "^10.1.1", "@libp2p/peer-id": "^4.1.4", "@libp2p/peer-id-factory": "^4.1.4", "@libp2p/ping": "^1.1.1", diff --git a/src/@types/DDO/Metadata.ts b/src/@types/DDO/Metadata.ts index 1eb513558..9c6fb2b18 100644 --- a/src/@types/DDO/Metadata.ts +++ b/src/@types/DDO/Metadata.ts @@ -18,6 +18,11 @@ export interface MetadataAlgorithm { * @type {string} */ rawcode?: string + /** + * Format of the algorithm + * @type {string} + */ + format?: string /** * Object describing the Docker container image. diff --git a/src/components/Indexer/utils.ts b/src/components/Indexer/utils.ts index 5b3d53215..b50daefc8 100644 --- a/src/components/Indexer/utils.ts +++ b/src/components/Indexer/utils.ts @@ -181,7 +181,7 @@ export const processChunkLogs = async ( if (!allowed.length) { INDEXER_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Metadata Proof validator not allowed`, + `Metadata Proof validators list is empty`, true ) continue diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 3043253e8..485e40a5d 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -38,34 +38,58 @@ export async function handleProtocolCommands(connection: any) { let statusStream let sendStream = null + const buildWrongCommandStatus = function (errorCode: number, message: string) { + status = { + httpStatus: errorCode, + error: message + } + return status + } + const denyList = await (await getConfiguration()).denyList if (denyList.peers.length > 0) { if (denyList.peers.includes(remotePeer.toString())) { P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`) - status = { - httpStatus: 403, - error: 'Unauthorized request' - } - statusStream = new ReadableString(JSON.stringify(status)) + + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request')) + ) pipe(statusStream, connection.stream.sink) return } } - /* eslint no-unreachable-loop: ["error", { "ignore": ["ForInStatement", "ForOfStatement"] }] */ - for await (const chunk of connection.stream.source) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - status = { httpStatus: 400, error: 'Invalid command' } - statusStream = new ReadableString(JSON.stringify(status)) + try { + // eslint-disable-next-line no-unreachable-loop + for await (const chunk of connection.stream.source) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) + pipe(statusStream, connection.stream.sink) + return + } + } + if (!task) { + P2P_LOGGER.error('Invalid or missing task/command data!') + statusStream = new ReadableString( + JSON.stringify(buildWrongCommandStatus(400, 'Invalid command')) + ) pipe(statusStream, connection.stream.sink) return } - break + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err.message}` + ) + return } - P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true) + + P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true) // we get the handler from the running instance // no need to create a new instance of Handler on every request const handler: Handler = this.getCoreHandlers().getHandler(task.command) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 029c24f59..5c794a39d 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -16,7 +16,6 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { bootstrap } from '@libp2p/bootstrap' import { noise } from '@chainsafe/libp2p-noise' import { mdns } from '@libp2p/mdns' -import { mplex } from '@libp2p/mplex' import { yamux } from '@chainsafe/libp2p-yamux' import { peerIdFromString } from '@libp2p/peer-id' import { pipe } from 'it-pipe' @@ -152,10 +151,11 @@ export class OceanP2P extends EventEmitter { const peerId = details.detail P2P_LOGGER.debug('Connection established to:' + peerId.toString()) // Emitted when a peer has been found try { + // DO WE REALLY NEED THIS? this._libp2p.services.pubsub.connect(peerId.toString()) - } catch (e) {} - } else { - /* empty */ + } catch (e) { + P2P_LOGGER.error(e.message) + } } } @@ -322,7 +322,7 @@ export class OceanP2P extends EventEmitter { }, peerId: config.keys.peerId, transports, - streamMuxers: [yamux(), mplex()], + streamMuxers: [yamux()], connectionEncryption: [ noise() // plaintext() @@ -552,26 +552,38 @@ export class OceanP2P extends EventEmitter { let stream // dial/connect to the target node try { - // stream= await this._libp2p.dialProtocol(peer, this._protocol) - stream = await this._libp2p.dialProtocol(peerId, this._protocol) } catch (e) { response.status.httpStatus = 404 response.status.error = 'Cannot connect to peer' + P2P_LOGGER.error(`Unable to connect to peer: ${peerId}`) return response } - response.stream = stream - pipe( - // Source data - [uint8ArrayFromString(message)], - // Write to the stream, and pass its output to the next function - stream, - // this is the anayze function - // doubler as any, - // Sink function - sink - ) + if (stream) { + response.stream = stream + try { + await pipe( + // Source data + [uint8ArrayFromString(message)], + // Write to the stream, and pass its output to the next function + stream, + // this is the anayze function + // doubler as any, + // Sink function + sink + ) + } catch (err) { + P2P_LOGGER.error(`Unable to send P2P message: ${err.message}`) + response.status.httpStatus = 404 + response.status.error = err.message + } + } else { + response.status.httpStatus = 404 + response.status.error = 'Unable to get remote P2P stream (null)' + P2P_LOGGER.error(response.status.error) + } + return response } diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 51efba987..85edc08c3 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -69,6 +69,7 @@ export class ComputeStartHandler extends Handler { const node = this.getOceanNode() const assets: ComputeAsset[] = [task.dataset] if (task.additionalDatasets) assets.push(...task.additionalDatasets) + const { algorithm } = task let foundValidCompute = null const algoChecksums = await getAlgoChecksums( @@ -245,6 +246,18 @@ export class ComputeStartHandler extends Handler { validUntil: validFee.validUntil } } + if (!('meta' in algorithm) && ddo.metadata.type === 'algorithm') { + const { entrypoint, image, tag, checksum } = ddo.metadata.algorithm.container + const container = { entrypoint, image, tag, checksum } + algorithm.meta = { + language: ddo.metadata.algorithm.language, + version: ddo.metadata.algorithm.version, + container: container + } + if ('format' in ddo.metadata.algorithm) { + algorithm.meta.format = ddo.metadata.algorithm.format + } + } } } if (!foundValidCompute) { @@ -265,7 +278,7 @@ export class ComputeStartHandler extends Handler { const response = await engine.startComputeJob( assets, - task.algorithm, + algorithm, task.output, task.consumerAddress, envId, diff --git a/src/components/core/utils/statusHandler.ts b/src/components/core/utils/statusHandler.ts index adf29ac4b..9758503d9 100644 --- a/src/components/core/utils/statusHandler.ts +++ b/src/components/core/utils/statusHandler.ts @@ -4,7 +4,8 @@ import { OceanNodeStatus, OceanNodeProvider, OceanNodeIndexer, - StorageTypes + StorageTypes, + OceanNodeConfig } from '../../../@types/OceanNode.js' import { existsEnvironmentVariable, getConfiguration } from '../../../utils/index.js' import { ENVIRONMENT_VARIABLES } from '../../../utils/constants.js' @@ -12,26 +13,11 @@ import { CORE_LOGGER } from '../../../utils/logging/common.js' import { OceanNode } from '../../../OceanNode.js' import { isAddress } from 'ethers' import { schemas } from '../../database/schemas.js' +import { SupportedNetwork } from '../../../@types/blockchain.js' -export async function status( - oceanNode: OceanNode, - nodeId?: string, - detailed: boolean = false -): Promise { - CORE_LOGGER.logMessage('Command status started execution...', true) - if (!oceanNode) { - CORE_LOGGER.logMessageWithEmoji( - 'Node object not found. Cannot proceed with status command.', - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - return - } - const config = await getConfiguration() - +function getAdminAddresses(config: OceanNodeConfig) { const validAddresses = [] - if (config.allowedAdmins) { + if (config.allowedAdmins && config.allowedAdmins.length > 0) { for (const admin of config.allowedAdmins) { if (isAddress(admin) === true) { validAddresses.push(admin) @@ -44,88 +30,129 @@ export async function status( ) } } - const status: OceanNodeStatus = { - id: undefined, - publicKey: undefined, - address: undefined, - version: undefined, - http: undefined, - p2p: undefined, + return validAddresses +} +const supportedStorageTypes: StorageTypes = { + url: true, + arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), + ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) +} + +// platform information +const platformInfo = { + cpus: os.cpus().length, + freemem: os.freemem(), + totalmem: os.totalmem(), + loadavg: os.loadavg(), + arch: os.arch(), + machine: os.machine(), + platform: os.platform(), + osType: os.type(), + node: process.version +} + +async function getIndexerAndProviderInfo( + oceanNode: OceanNode, + config: OceanNodeConfig +): Promise { + const nodeStatus: any = { provider: [], - indexer: [], - supportedStorage: undefined, - uptime: process.uptime(), - platform: { - cpus: os.cpus().length, - freemem: os.freemem(), - totalmem: os.totalmem(), - loadavg: os.loadavg(), - arch: os.arch(), - machine: os.machine(), - platform: os.platform(), - osType: os.type(), - node: process.version - }, - codeHash: config.codeHash, - allowedAdmins: validAddresses + indexer: [] + } + for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { + if (config.hasProvider) { + const provider: OceanNodeProvider = { + chainId: key, + network: supportedNetwork.network + } + nodeStatus.provider.push(provider) + } + if (config.hasIndexer) { + const blockNr = await getIndexerBlockInfo(oceanNode, supportedNetwork) + const indexer: OceanNodeIndexer = { + chainId: key, + network: supportedNetwork.network, + block: blockNr + } + nodeStatus.indexer.push(indexer) + } } + return nodeStatus +} - if (nodeId && nodeId !== undefined) { - status.id = nodeId - } else { - // get current node ID - status.id = config.keys.peerId.toString() +async function getIndexerBlockInfo( + oceanNode: OceanNode, + supportedNetwork: SupportedNetwork +): Promise { + let blockNr = '0' + try { + const { indexer: indexerDatabase } = oceanNode.getDatabase() + const { lastIndexedBlock } = await indexerDatabase.retrieve(supportedNetwork.chainId) + blockNr = lastIndexedBlock.toString() + } catch (error) { + CORE_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error fetching last indexed block for network ${supportedNetwork.network}` + ) } + return blockNr +} - const supportedStorageTypes: StorageTypes = { - url: true, - arwave: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.ARWEAVE_GATEWAY), - ipfs: existsEnvironmentVariable(ENVIRONMENT_VARIABLES.IPFS_GATEWAY) +let nodeStatus: OceanNodeStatus = null + +export async function status( + oceanNode: OceanNode, + nodeId?: string, + detailed: boolean = false +): Promise { + CORE_LOGGER.logMessage('Command status started execution...', true) + if (!oceanNode) { + CORE_LOGGER.logMessageWithEmoji( + 'Node object not found. Cannot proceed with status command.', + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return } + const config = await getConfiguration() - status.version = process.env.npm_package_version - status.publicKey = Buffer.from(config.keys.publicKey).toString('hex') - status.address = config.keys.ethAddress - status.http = config.hasHttp - status.p2p = config.hasP2P - status.supportedStorage = supportedStorageTypes + // no previous status? + if (!nodeStatus) { + nodeStatus = { + id: nodeId && nodeId !== undefined ? nodeId : config.keys.peerId.toString(), // get current node ID + publicKey: Buffer.from(config.keys.publicKey).toString('hex'), + address: config.keys.ethAddress, + version: process.env.npm_package_version, + http: config.hasHttp, + p2p: config.hasP2P, + provider: [], + indexer: [], + supportedStorage: supportedStorageTypes, + // uptime: process.uptime(), + platform: platformInfo, + codeHash: config.codeHash, + allowedAdmins: getAdminAddresses(config) + } + } + // need to update at least block info if available if (config.supportedNetworks) { - for (const [key, supportedNetwork] of Object.entries(config.supportedNetworks)) { - if (config.hasProvider) { - const provider: OceanNodeProvider = { - chainId: key, - network: supportedNetwork.network - } - status.provider.push(provider) - } - if (config.hasIndexer) { - let blockNr = '0' - try { - const { indexer: indexerDatabase } = oceanNode.getDatabase() - const { lastIndexedBlock } = await indexerDatabase.retrieve( - supportedNetwork.chainId - ) - blockNr = lastIndexedBlock.toString() - } catch (error) { - CORE_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Error fetching last indexed block for network ${supportedNetwork.network}` - ) - } - const indexer: OceanNodeIndexer = { - chainId: key, - network: supportedNetwork.network, - block: blockNr - } - status.indexer.push(indexer) - } - } + const indexerAndProvider = await getIndexerAndProviderInfo(oceanNode, config) + nodeStatus.provider = indexerAndProvider.provider + nodeStatus.indexer = indexerAndProvider.indexer } + // only these 2 might change between requests + nodeStatus.platform.freemem = os.freemem() + nodeStatus.platform.loadavg = os.loadavg() + nodeStatus.uptime = process.uptime() + + // depends on request if (detailed) { - status.c2dClusters = config.c2dClusters - status.supportedSchemas = schemas.ddoSchemas + nodeStatus.c2dClusters = config.c2dClusters + nodeStatus.supportedSchemas = schemas.ddoSchemas } - return status + + return nodeStatus } diff --git a/src/components/database/index.ts b/src/components/database/index.ts index 9276f1132..15deabbff 100644 --- a/src/components/database/index.ts +++ b/src/components/database/index.ts @@ -6,7 +6,7 @@ import { LOG_LEVELS_STR, configureCustomDBTransport, GENERIC_EMOJIS, - isDevelopmentEnvironment + USE_DB_TRANSPORT } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { validateObject } from '../core/utils/validateDdoHandler.js' @@ -1013,11 +1013,11 @@ export class Database { // add this DB transport too // once we create a DB instance, the logger will be using this transport as well // we cannot have this the other way around because of the dependencies cycle - if (!isDevelopmentEnvironment()) { + if (USE_DB_TRANSPORT()) { configureCustomDBTransport(this, DATABASE_LOGGER) } else { DATABASE_LOGGER.warn( - '"NODE_ENV" is set to "development". This means logs will be saved to console and file(s) only.' + 'Property "LOG_DB" is set to "false". This means logs will NOT be saved to database!' ) } return (async (): Promise => { diff --git a/src/components/httpRoutes/commands.ts b/src/components/httpRoutes/commands.ts index c5dc7f286..b8b395e05 100644 --- a/src/components/httpRoutes/commands.ts +++ b/src/components/httpRoutes/commands.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-unreachable */ import express, { Request, Response } from 'express' import { P2PCommandResponse } from '../../@types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' @@ -36,91 +37,117 @@ directCommandRoute.post( '/directCommand', express.json(), async (req: Request, res: Response): Promise => { - const validate = validateCommandParameters(req.body, []) - if (!validate.valid) { - res.status(validate.status).send(validate.reason) - return - } + try { + const validate = validateCommandParameters(req.body, []) + if (!validate.valid) { + res.status(validate.status).send(validate.reason) + return + } + + let closedResponse = false - let isBinaryContent = false - const sink = async function (source: any) { - let first = true - for await (const chunk of source) { - if (first) { - first = false - try { - const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays - const decoded = JSON.parse(str) + // detect connection closed + res.on('close', () => { + if (!closedResponse) { + HTTP_LOGGER.error('TCP connection was closed before we could send a response!') + } + closedResponse = true + }) + let isBinaryContent = false + const sink = async function (source: any) { + let first = true + for await (const chunk of source) { + if (first) { + first = false + try { + const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays + const decoded = JSON.parse(str) - res.status(decoded.httpStatus) - if ('headers' in decoded) { - res.header(decoded.headers) - // when streaming binary data we cannot convert to plain string, specially if encrypted data - if (str?.toLowerCase().includes('application/octet-stream')) { - isBinaryContent = true + res.status(decoded.httpStatus) + if ('headers' in decoded) { + res.header(decoded.headers) + // when streaming binary data we cannot convert to plain string, specially if encrypted data + if (str?.toLowerCase().includes('application/octet-stream')) { + isBinaryContent = true + } } + } catch (e) { + res.status(500) + res.write(uint8ArrayToString(chunk.subarray())) + closedResponse = true + res.end() + HTTP_LOGGER.error(e.message) } - } catch (e) { - res.status(500) - res.write(uint8ArrayToString(chunk.subarray())) - res.end() - } - } else { - if (isBinaryContent) { - // Binary content, could be encrypted - res.write(chunk.subarray()) } else { - const str = uint8ArrayToString(chunk.subarray()) - res.write(str) + try { + if (isBinaryContent) { + // Binary content, could be encrypted + res.write(chunk.subarray()) + } else { + const str = uint8ArrayToString(chunk.subarray()) + res.write(str) + } + } catch (e) { + HTTP_LOGGER.error(e.message) + } } } + closedResponse = true + res.end() } - res.end() - } - HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true) + HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true) - // TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()" - // even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P) - // All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have - // any access to main OceanNode, neither Provider or Indexer components - // probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode() - // (we kinda do it already on most handlers anyway) + // TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()" + // even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P) + // All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have + // any access to main OceanNode, neither Provider or Indexer components + // probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode() + // (we kinda do it already on most handlers anyway) - let response: P2PCommandResponse = null - // send to this peer (we might not need P2P connectivity) - if ( - !hasP2PInterface || - !req.body.node || - req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node) - ) { - // send to this node - response = await req.oceanNode.handleDirectProtocolCommand( - JSON.stringify(req.body), - sink - ) - // UPDATED: we can just call the handler directly here, once we have them - // moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode - // These actions do not need P2P connections directly - } else if (hasP2PInterface) { - // send to another peer (Only here we need P2P networking) - response = await req.oceanNode - .getP2PNode() - .sendTo(req.body.node as string, JSON.stringify(req.body), sink) - } else { - response = { - stream: null, - status: { - httpStatus: 400, - error: 'Invalid or Non Existing P2P configuration' + let response: P2PCommandResponse = null + // send to this peer (we might not need P2P connectivity) + if ( + !hasP2PInterface || + !req.body.node || + req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node) + ) { + // send to this node + response = await req.oceanNode.handleDirectProtocolCommand( + JSON.stringify(req.body), + sink + ) + // UPDATED: we can just call the handler directly here, once we have them + // moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode + // These actions do not need P2P connections directly + } else if (hasP2PInterface) { + // send to another peer (Only here we need P2P networking) + response = await req.oceanNode + .getP2PNode() + .sendTo(req.body.node as string, JSON.stringify(req.body), sink) + } else { + response = { + stream: null, + status: { + httpStatus: 400, + error: 'Invalid or Non Existing P2P configuration' + } } } - } - if (response.stream == null) { - res.status(response.status.httpStatus) - res.write(response.status.error) - res.end() + // only if response was not already sent + if (response.stream == null && !closedResponse) { + try { + res.status(response.status.httpStatus) + res.write(response.status.error) + closedResponse = true + res.end() + } catch (e) { + HTTP_LOGGER.error(e.message) + } + } + } catch (err) { + HTTP_LOGGER.error(err.message) } } ) diff --git a/src/components/httpRoutes/requestValidator.ts b/src/components/httpRoutes/requestValidator.ts index 2f6b1bb15..6fb6e553b 100644 --- a/src/components/httpRoutes/requestValidator.ts +++ b/src/components/httpRoutes/requestValidator.ts @@ -1,6 +1,6 @@ import { Request, Response } from 'express' -import { getConfiguration } from '../../utils' -import { HTTP_LOGGER } from '../../utils/logging/common' +import { getConfiguration } from '../../utils/index.js' +import { HTTP_LOGGER } from '../../utils/logging/common.js' // TODO we should group common stuff, // we have multiple similar validation interfaces diff --git a/src/components/httpRoutes/rootEndpoint.ts b/src/components/httpRoutes/rootEndpoint.ts index 6b1fe904a..6aa57c83d 100644 --- a/src/components/httpRoutes/rootEndpoint.ts +++ b/src/components/httpRoutes/rootEndpoint.ts @@ -7,11 +7,10 @@ export const rootEndpointRoutes = express.Router() rootEndpointRoutes.get('/', async (req, res) => { const config = await getConfiguration() if (!config.supportedNetworks) { - HTTP_LOGGER.error(`Supported networks not defined`) - res.status(400).send(`Supported networks not defined`) + HTTP_LOGGER.warn(`Supported networks not defined`) } res.json({ - chainIds: Object.keys(config.supportedNetworks), + chainIds: config.supportedNetworks ? Object.keys(config.supportedNetworks) : [], providerAddress: config.keys.ethAddress, serviceEndpoints: getAllServiceEndpoints(), software: 'Ocean-Node', diff --git a/src/components/storage/index.ts b/src/components/storage/index.ts index 9ece1c709..3d28ce212 100644 --- a/src/components/storage/index.ts +++ b/src/components/storage/index.ts @@ -14,6 +14,7 @@ import urlJoin from 'url-join' import { encrypt as encryptData, decrypt as decryptData } from '../../utils/crypt.js' import { Readable } from 'stream' import { getConfiguration } from '../../utils/index.js' +import { CORE_LOGGER } from '../../utils/logging/common.js' export abstract class Storage { private file: UrlFileObject | IpfsFileObject | ArweaveFileObject @@ -89,7 +90,7 @@ export abstract class Storage { response.push(fileInfo) } } catch (error) { - console.log(error) + CORE_LOGGER.error(error) } return response } diff --git a/src/test/.env.test b/src/test/.env.test index 48115ee1f..9e142ea3c 100644 --- a/src/test/.env.test +++ b/src/test/.env.test @@ -2,10 +2,11 @@ HTTP_API_PORT=8001 P2P_ipV4BindTcpPort=8000 PRIVATE_KEY=0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58 RPCS='{ "8996": {"rpc": "http://127.0.0.1:8545", "chainId": 8996, "network": "development", "chunkSize": 100}}' -DB_URL=http://localhost:8108?apiKey=xyz +DB_URL=http://localhost:8108/?apiKey=xyz IPFS_GATEWAY=https://ipfs.io/ ARWEAVE_GATEWAY=https://arweave.net/ NODE1_PRIVATE_KEY=0xcb345bd2b11264d523ddaf383094e2675c420a17511c3102a53817f13474a7ff NODE2_PRIVATE_KEY=0x3634cc4a3d2694a1186a7ce545f149e022eea103cc254d18d08675104bb4b5ac INDEXER_INTERVAL=9000 ADDRESS_FILE=${HOME}/.ocean/ocean-contracts/artifacts/address.json +LOG_LEVEL=debug diff --git a/src/test/integration/logs.test.ts b/src/test/integration/logs.test.ts index 513ad4f9a..38a8b7199 100644 --- a/src/test/integration/logs.test.ts +++ b/src/test/integration/logs.test.ts @@ -7,13 +7,22 @@ import { configureCustomDBTransport, getCustomLoggerForModule } from '../../utils/logging/Logger.js' +import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' +import { + buildEnvOverrideConfig, + OverrideEnvConfig, + setupEnvironment, + tearDownEnvironment +} from '../utils/utils.js' + +let previousConfiguration: OverrideEnvConfig[] describe('LogDatabase CRUD', () => { let database: Database let logger: CustomNodeLogger const logEntry = { timestamp: Date.now(), - level: 'info', + level: 'debug', message: `Test log message ${Date.now()}`, moduleName: 'testModule-1', meta: 'Test meta information' @@ -21,14 +30,21 @@ describe('LogDatabase CRUD', () => { let logId: string // Variable to store the ID of the created log entry before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig([ENVIRONMENT_VARIABLES.LOG_DB], ['true']) + ) const dbConfig = { url: 'http://localhost:8108/?apiKey=xyz' } database = await new Database(dbConfig) // Initialize logger with the custom transport that writes to the LogDatabase - logger = getCustomLoggerForModule(LOGGER_MODULE_NAMES.HTTP, LOG_LEVELS_STR.LEVEL_INFO) + logger = getCustomLoggerForModule( + LOGGER_MODULE_NAMES.HTTP, + LOG_LEVELS_STR.LEVEL_DEBUG + ) // normally this is only added on production environments - configureCustomDBTransport(database, logger) + logger = configureCustomDBTransport(database, logger) }) it('insert log', async () => { @@ -56,24 +72,24 @@ describe('LogDatabase CRUD', () => { it('should save a log in the database when a log event is triggered', async () => { const newLogEntry = { timestamp: Date.now(), - level: 'info', + level: 'debug', message: `NEW Test log message ${Date.now()}` } // Trigger a log event which should be saved in the database logger.log(newLogEntry.level, newLogEntry.message) - // Wait for the log to be written to the database await new Promise((resolve) => setTimeout(resolve, 1000)) // Delay to allow log to be processed // Define the time frame for the log retrieval - const startTime = new Date(Date.now() - 10000) // 10 seconds ago + const startTime = new Date(Date.now() - 5000) // 5 seconds ago const endTime = new Date() // current time - // Retrieve the latest log entry - const logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 1) + // Retrieve the latest log entries + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 300) + logs = logs.filter((log) => log.message === newLogEntry.message) expect(logs?.length).to.equal(1) - expect(logs?.[0].id).to.equal(String(Number(logId) + 1)) + expect(Number(logs?.[0].id)).to.greaterThan(Number(logId)) expect(logs?.[0].level).to.equal(newLogEntry.level) expect(logs?.[0].message).to.equal(newLogEntry.message) expect(logs?.[0].moduleName).to.equal('HTTP') @@ -82,7 +98,7 @@ describe('LogDatabase CRUD', () => { it('should save a log in the database when a log.logMessage is called', async () => { const newLogEntry = { timestamp: Date.now(), - level: 'info', + level: 'debug', message: `logMessage: Test log message ${Date.now()}`, moduleName: 'testModule-3', meta: 'Test meta information' @@ -94,14 +110,15 @@ describe('LogDatabase CRUD', () => { await new Promise((resolve) => setTimeout(resolve, 1000)) // Delay to allow log to be processed // Define the time frame for the log retrieval - const startTime = new Date(Date.now() - 10000) // 10 seconds ago + const startTime = new Date(Date.now() - 5000) // 5 seconds ago const endTime = new Date() // current time // Retrieve the latest log entry - const logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 1) + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 10) + logs = logs.filter((log) => log.message === newLogEntry.message) expect(logs?.length).to.equal(1) - expect(logs?.[0].id).to.equal(String(Number(logId) + 2)) + expect(Number(logs?.[0].id)).to.greaterThan(Number(logId)) expect(logs?.[0].level).to.equal(newLogEntry.level) expect(logs?.[0].message).to.equal(newLogEntry.message) expect(logs?.[0].moduleName).to.equal('HTTP') @@ -110,7 +127,7 @@ describe('LogDatabase CRUD', () => { it('should save a log in the database when a log.logMessageWithEmoji is called', async () => { const newLogEntry = { timestamp: Date.now(), - level: 'info', + level: 'debug', message: `logMessageWithEmoji: Test log message ${Date.now()}`, moduleName: 'testModule-4', meta: 'Test meta information' @@ -122,18 +139,23 @@ describe('LogDatabase CRUD', () => { await new Promise((resolve) => setTimeout(resolve, 1000)) // Delay to allow log to be processed // Define the time frame for the log retrieval - const startTime = new Date(Date.now() - 10000) // 10 seconds ago + const startTime = new Date(Date.now() - 5000) // 5 seconds ago const endTime = new Date() // current time // Retrieve the latest log entry - const logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 1) + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 10) + logs = logs.filter((log) => log.message.includes(newLogEntry.message)) expect(logs?.length).to.equal(1) - expect(logs?.[0].id).to.equal(String(Number(logId) + 3)) + expect(Number(logs?.[0].id)).to.greaterThan(Number(logId)) expect(logs?.[0].level).to.equal(newLogEntry.level) assert(logs?.[0].message) expect(logs?.[0].moduleName).to.equal('HTTP') }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + }) }) describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { @@ -143,6 +165,11 @@ describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { const endTime = new Date() // now before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig([ENVIRONMENT_VARIABLES.LOG_DB], ['true']) + ) + const dbConfig = { url: 'http://localhost:8108/?apiKey=xyz' } @@ -163,7 +190,7 @@ describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { }) it('should retrieve logs with a specific level', async () => { - const level = 'info' + const level = 'debug' const logs = await database.logs.retrieveMultipleLogs( startTime, endTime, @@ -176,7 +203,7 @@ describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { it('should retrieve logs with both a specific moduleName and level', async () => { const moduleName = 'testModule-1' - const level = 'info' + const level = 'debug' const logs = await database.logs.retrieveMultipleLogs( startTime, endTime, @@ -203,7 +230,7 @@ describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { let database: Database const logEntry = { timestamp: Date.now(), - level: 'info', + level: 'debug', message: 'Test log message for single deletion', moduleName: 'testModule-2', meta: 'Test meta information for single deletion' @@ -288,26 +315,34 @@ describe('LogDatabase retrieveMultipleLogs with specific parameters', () => { const elapsedTimeInMs = endPerfTime[0] * 1000 + endPerfTime[1] / 1e6 expect(elapsedTimeInMs).to.be.below(1000) // threshold }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + }) }) describe('LogDatabase deleteOldLogs', () => { let database: Database - const logEntry = { + const oldLogEntry = { timestamp: new Date().getTime() - 31 * 24 * 60 * 60 * 1000, // 31 days ago - level: 'info', + level: 'debug', message: 'Old log message for deletion test', moduleName: 'testModule-1', meta: 'Test meta information' } const recentLogEntry = { timestamp: new Date().getTime(), // current time - level: 'info', + level: 'debug', message: 'Recent log message not for deletion', moduleName: 'testModule-1', meta: 'Test meta information' } before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig([ENVIRONMENT_VARIABLES.LOG_DB], ['true']) + ) const dbConfig = { url: 'http://localhost:8108/?apiKey=xyz' } @@ -315,7 +350,7 @@ describe('LogDatabase deleteOldLogs', () => { }) it('should insert an old log and a recent log', async () => { - const oldLogResult = await database.logs.insertLog(logEntry) + const oldLogResult = await database.logs.insertLog(oldLogEntry) expect(oldLogResult).to.include.keys( 'id', 'timestamp', @@ -337,26 +372,39 @@ describe('LogDatabase deleteOldLogs', () => { }) it('should delete logs older than 30 days', async () => { - await database.logs.deleteOldLogs() + const deleted = await database.logs.deleteOldLogs() + assert(deleted > 0, 'could not delete old logs') // Adjust the time window to ensure we don't catch the newly inserted log - const startTime = new Date(logEntry.timestamp) - const endTime = new Date() - const logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100) + let startTime = new Date(oldLogEntry.timestamp) + let endTime = new Date() + let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100) // Check that the old log is not present, but the recent one is - const oldLogPresent = logs?.some((log) => log.message === logEntry.message) - const recentLogPresent = logs?.some((log) => log.message === recentLogEntry.message) - + const oldLogPresent = logs?.some((log) => log.message === oldLogEntry.message) assert(oldLogPresent === false, 'Old logs are still present') + + // since we have many logs going to DB by default, we need to re-frame the timestamp to grab it + startTime = new Date(recentLogEntry.timestamp - 1000) + endTime = new Date(recentLogEntry.timestamp + 1000) + logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 100) + const recentLogPresent = logs?.some((log) => log.message === recentLogEntry.message) assert(recentLogPresent === true, 'Recent logs are not present') }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + }) }) describe('LogDatabase retrieveMultipleLogs with pagination', () => { let database: Database const logCount = 10 // Total number of logs to insert and also the limit for logs per page before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig([ENVIRONMENT_VARIABLES.LOG_DB], ['true']) + ) const dbConfig = { url: 'http://localhost:8108/?apiKey=xyz' } @@ -411,7 +459,7 @@ describe('LogDatabase retrieveMultipleLogs with pagination', () => { }) it('should return empty results for a non-existent page', async () => { - const nonExistentPage = 100 // Assuming this page doesn't exist + const nonExistentPage = 300 // Assuming this page doesn't exist const logs = await database.logs.retrieveMultipleLogs( new Date(Date.now() - 10000), // 10 seconds ago new Date(), // now @@ -422,4 +470,8 @@ describe('LogDatabase retrieveMultipleLogs with pagination', () => { ) assert.isEmpty(logs, 'Expected logs to be empty') }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + }) }) diff --git a/src/test/unit/logging.test.ts b/src/test/unit/logging.test.ts index 8b1534d9a..b49b812e6 100644 --- a/src/test/unit/logging.test.ts +++ b/src/test/unit/logging.test.ts @@ -3,6 +3,7 @@ import { ENVIRONMENT_VARIABLES, getConfiguration } from '../../utils/index.js' import { expect } from 'chai' import { + DEFAULT_TEST_TIMEOUT, OverrideEnvConfig, buildEnvOverrideConfig, setupEnvironment, @@ -12,6 +13,7 @@ import { CustomOceanNodesTransport, MAX_LOGGER_INSTANCES, NUM_LOGGER_INSTANCES, + USE_DB_TRANSPORT, isDevelopmentEnvironment } from '../../utils/logging/Logger.js' import { OCEAN_NODE_LOGGER } from '../../utils/logging/common.js' @@ -24,7 +26,15 @@ describe('Logger instances and transports tests', async () => { // need to do it first envOverrides = await setupEnvironment( null, - buildEnvOverrideConfig([ENVIRONMENT_VARIABLES.NODE_ENV], ['development']) + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.NODE_ENV, + ENVIRONMENT_VARIABLES.LOG_DB, + ENVIRONMENT_VARIABLES.LOG_LEVEL, + ENVIRONMENT_VARIABLES.DB_URL + ], + ['development', 'false', 'info', 'http://localhost:8108/?apiKey=xyz'] + ) ) // because of this it('should be development environment', () => { @@ -36,21 +46,23 @@ describe('Logger instances and transports tests', async () => { expect(numExistingInstances).to.be.lessThanOrEqual(MAX_LOGGER_INSTANCES) }) - it(`should change NODE_ENV to "production" and logger should have DB transport`, async () => { - expect(process.env.NODE_ENV).to.be.equal('development') + it(`should change LOG_DB to "true" and logger should have DB transport`, async function () { + // when we are logging to DB, things can slow down a bit + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + expect(USE_DB_TRANSPORT()).to.be.equal(false) expect(OCEAN_NODE_LOGGER.hasDBTransport()).to.be.equal(false) const envAfter = await setupEnvironment( null, buildEnvOverrideConfig( - [ENVIRONMENT_VARIABLES.NODE_ENV, ENVIRONMENT_VARIABLES.DB_URL], - ['production', 'http://172.15.0.6:8108?apiKey=xyz'] + [ENVIRONMENT_VARIABLES.LOG_DB, ENVIRONMENT_VARIABLES.DB_URL], + ['true', 'http://localhost:8108/?apiKey=xyz'] ) ) - expect(process.env.NODE_ENV).to.be.equal('production') + expect(USE_DB_TRANSPORT()).to.be.equal(true) // will build the DB transport layer const config = await getConfiguration(true) // eslint-disable-next-line no-unused-vars - const DB = new Database(config.dbConfig) + const DB = await new Database(config.dbConfig) // Could generate Typesene error if DB is not running, but does not matter for this test OCEAN_NODE_LOGGER.logMessage('Should build DB transport layer') @@ -65,7 +77,7 @@ describe('Logger instances and transports tests', async () => { OCEAN_NODE_LOGGER.removeTransport(transports[0]) expect(OCEAN_NODE_LOGGER.hasDBTransport()).to.be.equal(false) await tearDownEnvironment(envAfter) - expect(process.env.NODE_ENV).to.be.equal('development') + expect(USE_DB_TRANSPORT()).to.be.equal(false) }) after(() => { diff --git a/src/test/unit/storage.test.ts b/src/test/unit/storage.test.ts index ab9908e28..7823dc14e 100644 --- a/src/test/unit/storage.test.ts +++ b/src/test/unit/storage.test.ts @@ -321,7 +321,7 @@ describe('URL Storage with malformed URL', () => { }) describe('Arweave Storage getFileInfo tests', function () { - this.timeout(15000) + // this.timeout(15000) let storage: ArweaveStorage before(() => { diff --git a/src/test/utils/hooks.ts b/src/test/utils/hooks.ts index be2bae3fa..59ace1d4b 100644 --- a/src/test/utils/hooks.ts +++ b/src/test/utils/hooks.ts @@ -69,7 +69,9 @@ export const mochaHooks = { const initialVariable: OverrideEnvConfig = initialConfiguration.get(varName) if (initialVariable.originalValue !== currentEnvVariable.originalValue) { // reset it to the original - CONFIG_LOGGER.debug('Restoring environment variable: ' + varName) + CONFIG_LOGGER.debug( + `(Hook) Restoring environment variable: ${varName} \ncurrent:\n ${process.env[varName]} \noriginal:\n ${initialVariable.originalValue}` + ) process.env[varName] = initialVariable.originalValue } }) diff --git a/src/test/utils/utils.ts b/src/test/utils/utils.ts index e45826687..743623564 100644 --- a/src/test/utils/utils.ts +++ b/src/test/utils/utils.ts @@ -92,7 +92,11 @@ export async function setupEnvironment( element.override || (element.required && process.env[element.name] === undefined) // if override OR not set but required to run ) { - CONFIG_LOGGER.debug('Overriding environment variable: ' + element.name) + CONFIG_LOGGER.debug( + `Overriding environment variable: ${element.name}\ncurrent:\n ${ + process.env[element.name] + }\nnew:\n ${element.newValue}` + ) element.originalValue = process.env[element.name] // save original value process.env[element.name] = element.newValue ENVIRONMENT_VARIABLES[element.name].value = element.newValue diff --git a/src/utils/asset.ts b/src/utils/asset.ts index 13b9b1093..8901b0a7e 100644 --- a/src/utils/asset.ts +++ b/src/utils/asset.ts @@ -56,7 +56,9 @@ export async function fetchFileMetadata( } } contentLength = totalSize - } catch (error) {} + } catch (error) { + CORE_LOGGER.error(error) + } return { contentLength: contentLength.toString(), diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 491bf0d47..0366620bb 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -275,6 +275,24 @@ export const ENVIRONMENT_VARIABLES: Record = { name: 'LOG_LEVEL', value: process.env.LOG_LEVEL, required: false + }, + LOG_CONSOLE: { + // log to console output? true if no other bellow is set + name: 'LOG_CONSOLE', + value: process.env.LOG_CONSOLE, + required: false + }, + LOG_FILES: { + // log to files? + name: 'LOG_FILES', + value: process.env.LOG_FILES, + required: false + }, + LOG_DB: { + // log to DB? + name: 'LOG_DB', + value: process.env.LOG_DB, + required: false } } diff --git a/src/utils/logging/Logger.ts b/src/utils/logging/Logger.ts index c4c452299..7c47c89b5 100644 --- a/src/utils/logging/Logger.ts +++ b/src/utils/logging/Logger.ts @@ -109,14 +109,18 @@ export class CustomOceanNodesTransport extends Transport { const document = { level: info.level, message: info.message, - moduleName: info.moduleName || 'undefined', + moduleName: info.moduleName || LOGGER_MODULE_NAMES.ALL_COMBINED, timestamp: Date.now(), // Storing the current timestamp as a Unix epoch timestamp (number) meta: JSON.stringify(info.meta) // Ensure meta is a string } try { // Use the insertLog method of the LogDatabase instance - if (this.dbInstance && this.dbInstance.logs) { + if ( + this.dbInstance && + this.dbInstance.logs // && + // !isTypesenseIgnoreLogMessage(document.moduleName, document.message) + ) { // double check before writing await this.dbInstance.logs.insertLog(document) } @@ -136,6 +140,23 @@ let customDBTransport: CustomOceanNodesTransport = null export const MAX_LOGGER_INSTANCES = 10 export const NUM_LOGGER_INSTANCES = INSTANCE_COUNT +// log locations +function USE_FILE_TRANSPORT(): boolean { + return process.env.LOG_FILES && process.env.LOG_FILES !== 'false' +} + +export function USE_DB_TRANSPORT(): boolean { + return process.env.LOG_DB && process.env.LOG_DB !== 'false' +} + +// default to true, if not explicitly set otherwise AND no other locations defined +function USE_CONSOLE_TRANSPORT(): boolean { + return ( + (process.env.LOG_CONSOLE && process.env.LOG_CONSOLE !== 'false') || + (!USE_FILE_TRANSPORT() && !USE_DB_TRANSPORT()) + ) +} + // if not set, then gets default 'development' level & colors export function isDevelopmentEnvironment(): boolean { const env = process.env.NODE_ENV || 'development' @@ -200,7 +221,7 @@ function getDefaultOptions(moduleName: string): winston.LoggerOptions { level: getDefaultLevel(), levels: LOG_LEVELS_NUM, format, - transports: [buildCustomFileTransport(moduleName), defaultConsoleTransport], + transports: getDefaultLoggerTransports(moduleName), exceptionHandlers: [ new winston.transports.File({ dirname: 'logs/', filename: EXCEPTIONS_HANDLER }) ] @@ -257,12 +278,14 @@ export function buildCustomFileTransport( export function getDefaultLoggerTransports( moduleOrComponentName: string ): winston.transport[] { - // always log to file - const transports: winston.transport[] = [ - buildCustomFileTransport(moduleOrComponentName) - ] - // only log to console if development - if (isDevelopmentEnvironment()) { + const transports: winston.transport[] = [] + // account for runtime changes done by tests (force read again value) + if (USE_FILE_TRANSPORT()) { + // always log to file + transports.push(buildCustomFileTransport(moduleOrComponentName)) + } + + if (USE_CONSOLE_TRANSPORT()) { transports.push(defaultConsoleTransport) } return transports @@ -433,12 +456,11 @@ export class CustomNodeLogger { includeModuleName: boolean = false ) { // lazy check db custom transport, needed beacause of dependency cycles - if ( - customDBTransport !== null && // if null then what? - !isDevelopmentEnvironment() && - !this.hasDBTransport() - ) { + const usingDBTransport = this.hasDBTransport() + if (customDBTransport !== null && USE_DB_TRANSPORT() && !usingDBTransport) { this.addTransport(customDBTransport) + } else if (usingDBTransport && !USE_DB_TRANSPORT()) { + this.removeTransport(this.getDBTransport()) } this.getLogger().log( @@ -446,6 +468,7 @@ export class CustomNodeLogger { includeModuleName ? this.buildMessage(message) : message, { moduleName: this.getModuleName().toUpperCase() } ) + // } } logMessage(message: string, includeModuleName: boolean = false) { @@ -544,6 +567,7 @@ export function configureCustomDBTransport( } if (!logger.hasDBTransport()) { logger.addTransport(customDBTransport) + logger.logMessage('Adding DB transport to Logger: ' + logger.getModuleName()) } return logger }