Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:oceanprotocol/ocean-node into im…
Browse files Browse the repository at this point in the history
…prove-error-logs
  • Loading branch information
jamiehewitt15 committed Aug 2, 2024
2 parents 108bc62 + dc415f6 commit b55f596
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 102 deletions.
20 changes: 8 additions & 12 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ export class OceanP2P extends EventEmitter {
const peerId = details.detail
P2P_LOGGER.debug('Connection established to:' + peerId.toString()) // Emitted when a peer has been found
try {
// DO WE REALLY NEED THIS?
this._libp2p.services.pubsub.connect(peerId.toString())
} catch (e) {}
} else {
/* empty */
} catch (e) {
P2P_LOGGER.error(e.message)
}
}
}

Expand Down Expand Up @@ -551,20 +552,18 @@ export class OceanP2P extends EventEmitter {
let stream
// dial/connect to the target node
try {
// stream= await this._libp2p.dialProtocol(peer, this._protocol)

stream = await this._libp2p.dialProtocol(peerId, this._protocol)
} catch (e) {
response.status.httpStatus = 404
response.status.error = 'Cannot connect to peer'
P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to connect to peer: ${peerId}`)
P2P_LOGGER.error(`Unable to connect to peer: ${peerId}`)
return response
}

if (stream) {
response.stream = stream
try {
pipe(
await pipe(
// Source data
[uint8ArrayFromString(message)],
// Write to the stream, and pass its output to the next function
Expand All @@ -575,17 +574,14 @@ export class OceanP2P extends EventEmitter {
sink
)
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to send P2P message: ${err.message}`
)
P2P_LOGGER.error(`Unable to send P2P message: ${err.message}`)
response.status.httpStatus = 404
response.status.error = err.message
}
} else {
response.status.httpStatus = 404
response.status.error = 'Unable to get remote P2P stream (null)'
P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, response.status.error)
P2P_LOGGER.error(response.status.error)
}

return response
Expand Down
185 changes: 97 additions & 88 deletions src/components/httpRoutes/commands.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-unreachable */
import express, { Request, Response } from 'express'
import { P2PCommandResponse } from '../../@types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
Expand Down Expand Up @@ -36,109 +37,117 @@ directCommandRoute.post(
'/directCommand',
express.json(),
async (req: Request, res: Response): Promise<void> => {
const validate = validateCommandParameters(req.body, [])
if (!validate.valid) {
res.status(validate.status).send(validate.reason)
return
}
try {
const validate = validateCommandParameters(req.body, [])
if (!validate.valid) {
res.status(validate.status).send(validate.reason)
return
}

let closedResponse = false
let closedResponse = false

// detect connection closed
res.on('close', () => {
if (!closedResponse) {
HTTP_LOGGER.error('TCP connection was closed before we could send a response!')
}
closedResponse = true
})
let isBinaryContent = false
const sink = async function (source: any) {
let first = true
for await (const chunk of source) {
if (first) {
first = false
try {
const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays
const decoded = JSON.parse(str)
// detect connection closed
res.on('close', () => {
if (!closedResponse) {
HTTP_LOGGER.error('TCP connection was closed before we could send a response!')
}
closedResponse = true
})
let isBinaryContent = false
const sink = async function (source: any) {
let first = true
for await (const chunk of source) {
if (first) {
first = false
try {
const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays
const decoded = JSON.parse(str)

res.status(decoded.httpStatus)
if ('headers' in decoded) {
res.header(decoded.headers)
// when streaming binary data we cannot convert to plain string, specially if encrypted data
if (str?.toLowerCase().includes('application/octet-stream')) {
isBinaryContent = true
res.status(decoded.httpStatus)
if ('headers' in decoded) {
res.header(decoded.headers)
// when streaming binary data we cannot convert to plain string, specially if encrypted data
if (str?.toLowerCase().includes('application/octet-stream')) {
isBinaryContent = true
}
}
} catch (e) {
res.status(500)
res.write(uint8ArrayToString(chunk.subarray()))
closedResponse = true
res.end()
HTTP_LOGGER.error(e.message)
}
} catch (e) {
res.status(500)
res.write(uint8ArrayToString(chunk.subarray()))
closedResponse = true
res.end()
HTTP_LOGGER.error(e.message)
}
} else {
try {
if (isBinaryContent) {
// Binary content, could be encrypted
res.write(chunk.subarray())
} else {
const str = uint8ArrayToString(chunk.subarray())
res.write(str)
} else {
try {
if (isBinaryContent) {
// Binary content, could be encrypted
res.write(chunk.subarray())
} else {
const str = uint8ArrayToString(chunk.subarray())
res.write(str)
}
} catch (e) {
HTTP_LOGGER.error(e.message)
}
} catch (e) {
HTTP_LOGGER.error(e.message)
}
}
closedResponse = true
res.end()
}
closedResponse = true
res.end()
}

HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true)
HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true)

// TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()"
// even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P)
// All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have
// any access to main OceanNode, neither Provider or Indexer components
// probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode()
// (we kinda do it already on most handlers anyway)
// TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()"
// even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P)
// All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have
// any access to main OceanNode, neither Provider or Indexer components
// probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode()
// (we kinda do it already on most handlers anyway)

let response: P2PCommandResponse = null
// send to this peer (we might not need P2P connectivity)
if (
!hasP2PInterface ||
!req.body.node ||
req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node)
) {
// send to this node
response = await req.oceanNode.handleDirectProtocolCommand(
JSON.stringify(req.body),
sink
)
// UPDATED: we can just call the handler directly here, once we have them
// moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode
// These actions do not need P2P connections directly
} else if (hasP2PInterface) {
// send to another peer (Only here we need P2P networking)
response = await req.oceanNode
.getP2PNode()
.sendTo(req.body.node as string, JSON.stringify(req.body), sink)
} else {
response = {
stream: null,
status: {
httpStatus: 400,
error: 'Invalid or Non Existing P2P configuration'
let response: P2PCommandResponse = null
// send to this peer (we might not need P2P connectivity)
if (
!hasP2PInterface ||
!req.body.node ||
req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node)
) {
// send to this node
response = await req.oceanNode.handleDirectProtocolCommand(
JSON.stringify(req.body),
sink
)
// UPDATED: we can just call the handler directly here, once we have them
// moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode
// These actions do not need P2P connections directly
} else if (hasP2PInterface) {
// send to another peer (Only here we need P2P networking)
response = await req.oceanNode
.getP2PNode()
.sendTo(req.body.node as string, JSON.stringify(req.body), sink)
} else {
response = {
stream: null,
status: {
httpStatus: 400,
error: 'Invalid or Non Existing P2P configuration'
}
}
}
}

// only if response was not already sent
if (response.stream == null && !closedResponse) {
res.status(response.status.httpStatus)
res.write(response.status.error)
closedResponse = true
res.end()
// only if response was not already sent
if (response.stream == null && !closedResponse) {
try {
res.status(response.status.httpStatus)
res.write(response.status.error)
closedResponse = true
res.end()
} catch (e) {
HTTP_LOGGER.error(e.message)
}
}
} catch (err) {
HTTP_LOGGER.error(err.message)
}
}
)
4 changes: 2 additions & 2 deletions src/test/integration/logs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe('LogDatabase CRUD', () => {
const endTime = new Date() // current time

// Retrieve the latest log entries
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 200)
let logs = await database.logs.retrieveMultipleLogs(startTime, endTime, 300)
logs = logs.filter((log) => log.message === newLogEntry.message)

expect(logs?.length).to.equal(1)
Expand Down Expand Up @@ -459,7 +459,7 @@ describe('LogDatabase retrieveMultipleLogs with pagination', () => {
})

it('should return empty results for a non-existent page', async () => {
const nonExistentPage = 100 // Assuming this page doesn't exist
const nonExistentPage = 300 // Assuming this page doesn't exist
const logs = await database.logs.retrieveMultipleLogs(
new Date(Date.now() - 10000), // 10 seconds ago
new Date(), // now
Expand Down

0 comments on commit b55f596

Please sign in to comment.