Skip to content

Commit

Permalink
Merge pull request #586 from oceanprotocol/improve-try-catch
Browse files Browse the repository at this point in the history
more try catch
  • Loading branch information
paulo-ocean authored Aug 1, 2024
2 parents 17cff35 + c7fa7f9 commit 40b86b8
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 99 deletions.
18 changes: 7 additions & 11 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ export class OceanP2P extends EventEmitter {
const peerId = details.detail
P2P_LOGGER.debug('Connection established to:' + peerId.toString()) // Emitted when a peer has been found
try {
// DO WE REALLY NEED THIS?
this._libp2p.services.pubsub.connect(peerId.toString())
} catch (e) {}
} else {
/* empty */
} catch (e) {
P2P_LOGGER.error(e.message)
}
}
}

Expand Down Expand Up @@ -551,13 +552,11 @@ export class OceanP2P extends EventEmitter {
let stream
// dial/connect to the target node
try {
// stream= await this._libp2p.dialProtocol(peer, this._protocol)

stream = await this._libp2p.dialProtocol(peerId, this._protocol)
} catch (e) {
response.status.httpStatus = 404
response.status.error = 'Cannot connect to peer'
P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to connect to peer: ${peerId}`)
P2P_LOGGER.error(`Unable to connect to peer: ${peerId}`)
return response
}

Expand All @@ -575,17 +574,14 @@ export class OceanP2P extends EventEmitter {
sink
)
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to send P2P message: ${err.message}`
)
P2P_LOGGER.error(`Unable to send P2P message: ${err.message}`)
response.status.httpStatus = 404
response.status.error = err.message
}
} else {
response.status.httpStatus = 404
response.status.error = 'Unable to get remote P2P stream (null)'
P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, response.status.error)
P2P_LOGGER.error(response.status.error)
}

return response
Expand Down
185 changes: 97 additions & 88 deletions src/components/httpRoutes/commands.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-unreachable */
import express, { Request, Response } from 'express'
import { P2PCommandResponse } from '../../@types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
Expand Down Expand Up @@ -36,109 +37,117 @@ directCommandRoute.post(
'/directCommand',
express.json(),
async (req: Request, res: Response): Promise<void> => {
const validate = validateCommandParameters(req.body, [])
if (!validate.valid) {
res.status(validate.status).send(validate.reason)
return
}
try {
const validate = validateCommandParameters(req.body, [])
if (!validate.valid) {
res.status(validate.status).send(validate.reason)
return
}

let closedResponse = false
let closedResponse = false

// detect connection closed
res.on('close', () => {
if (!closedResponse) {
HTTP_LOGGER.error('TCP connection was closed before we could send a response!')
}
closedResponse = true
})
let isBinaryContent = false
const sink = async function (source: any) {
let first = true
for await (const chunk of source) {
if (first) {
first = false
try {
const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays
const decoded = JSON.parse(str)
// detect connection closed
res.on('close', () => {
if (!closedResponse) {
HTTP_LOGGER.error('TCP connection was closed before we could send a response!')
}
closedResponse = true
})
let isBinaryContent = false
const sink = async function (source: any) {
let first = true
for await (const chunk of source) {
if (first) {
first = false
try {
const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays
const decoded = JSON.parse(str)

res.status(decoded.httpStatus)
if ('headers' in decoded) {
res.header(decoded.headers)
// when streaming binary data we cannot convert to plain string, specially if encrypted data
if (str?.toLowerCase().includes('application/octet-stream')) {
isBinaryContent = true
res.status(decoded.httpStatus)
if ('headers' in decoded) {
res.header(decoded.headers)
// when streaming binary data we cannot convert to plain string, specially if encrypted data
if (str?.toLowerCase().includes('application/octet-stream')) {
isBinaryContent = true
}
}
} catch (e) {
res.status(500)
res.write(uint8ArrayToString(chunk.subarray()))
closedResponse = true
res.end()
HTTP_LOGGER.error(e.message)
}
} catch (e) {
res.status(500)
res.write(uint8ArrayToString(chunk.subarray()))
closedResponse = true
res.end()
HTTP_LOGGER.error(e.message)
}
} else {
try {
if (isBinaryContent) {
// Binary content, could be encrypted
res.write(chunk.subarray())
} else {
const str = uint8ArrayToString(chunk.subarray())
res.write(str)
} else {
try {
if (isBinaryContent) {
// Binary content, could be encrypted
res.write(chunk.subarray())
} else {
const str = uint8ArrayToString(chunk.subarray())
res.write(str)
}
} catch (e) {
HTTP_LOGGER.error(e.message)
}
} catch (e) {
HTTP_LOGGER.error(e.message)
}
}
closedResponse = true
res.end()
}
closedResponse = true
res.end()
}

HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true)
HTTP_LOGGER.logMessage('Sending command : ' + JSON.stringify(req.body), true)

// TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()"
// even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P)
// All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have
// any access to main OceanNode, neither Provider or Indexer components
// probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode()
// (we kinda do it already on most handlers anyway)
// TODO NOTES: We are sending all "/directCommand" requests to the P2P component as "req.oceanNode.getP2PNode()"
// even if we do not need any P2P functionality at all (as all our handlers are "inside" P2P)
// All ends up here => "handleProtocolCommands()" or here => "handleDirectProtocolCommands()", where we do not have
// any access to main OceanNode, neither Provider or Indexer components
// probably the handlers should be on the OceanNode level, and if they need P2P connectivity we pass them the getP2PNode()
// (we kinda do it already on most handlers anyway)

let response: P2PCommandResponse = null
// send to this peer (we might not need P2P connectivity)
if (
!hasP2PInterface ||
!req.body.node ||
req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node)
) {
// send to this node
response = await req.oceanNode.handleDirectProtocolCommand(
JSON.stringify(req.body),
sink
)
// UPDATED: we can just call the handler directly here, once we have them
// moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode
// These actions do not need P2P connections directly
} else if (hasP2PInterface) {
// send to another peer (Only here we need P2P networking)
response = await req.oceanNode
.getP2PNode()
.sendTo(req.body.node as string, JSON.stringify(req.body), sink)
} else {
response = {
stream: null,
status: {
httpStatus: 400,
error: 'Invalid or Non Existing P2P configuration'
let response: P2PCommandResponse = null
// send to this peer (we might not need P2P connectivity)
if (
!hasP2PInterface ||
!req.body.node ||
req.oceanNode.getP2PNode().isTargetPeerSelf(req.body.node)
) {
// send to this node
response = await req.oceanNode.handleDirectProtocolCommand(
JSON.stringify(req.body),
sink
)
// UPDATED: we can just call the handler directly here, once we have them
// moving some of the logic from "handleProtocolCommands()" and "handleDirectProtocolCommands()" to the OceanNode
// These actions do not need P2P connections directly
} else if (hasP2PInterface) {
// send to another peer (Only here we need P2P networking)
response = await req.oceanNode
.getP2PNode()
.sendTo(req.body.node as string, JSON.stringify(req.body), sink)
} else {
response = {
stream: null,
status: {
httpStatus: 400,
error: 'Invalid or Non Existing P2P configuration'
}
}
}
}

// only if response was not already sent
if (response.stream == null && !closedResponse) {
res.status(response.status.httpStatus)
res.write(response.status.error)
closedResponse = true
res.end()
// only if response was not already sent
if (response.stream == null && !closedResponse) {
try {
res.status(response.status.httpStatus)
res.write(response.status.error)
closedResponse = true
res.end()
} catch (e) {
HTTP_LOGGER.error(e.message)
}
}
} catch (err) {
HTTP_LOGGER.error(err.message)
}
}
)

0 comments on commit 40b86b8

Please sign in to comment.