Skip to content

Commit

Permalink
Merge pull request #659 from oceanprotocol/issue-644-crash-stream-reset
Browse files Browse the repository at this point in the history
Fix crash on connection reset - P2P streams
  • Loading branch information
paulo-ocean authored Sep 4, 2024
2 parents e6e3afd + d8012f1 commit 1404482
Showing 1 changed file with 59 additions and 22 deletions.
81 changes: 59 additions & 22 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,26 @@ export class ReadableString extends Readable {
}
}

export async function handleProtocolCommands(connection: any) {
const { remotePeer } = connection.connection
// close the stream after sending data, libp2p will handle stream status
async function closeStreamConnection(connection: any, remotePeer: string) {
if (connection) {
try {
P2P_LOGGER.debug('Closing connection to remote peer:' + remotePeer)
await connection.close()
} catch (e) {
P2P_LOGGER.error(`Error closing connection for peer ${remotePeer}: ${e.message}`)
}
}
}

export async function handleProtocolCommands(otherPeerConnection: any) {
const { remotePeer, remoteAddr } = otherPeerConnection.connection

// only write if stream is in 'open' status
const connectionStatus = otherPeerConnection.connection.status
P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true)
P2P_LOGGER.logMessage('Using ' + remoteAddr, true)

P2P_LOGGER.logMessage('Using ' + connection.connection.remoteAddr, true)
let status = null
let task: Command
let statusStream
Expand All @@ -51,41 +65,57 @@ export async function handleProtocolCommands(connection: any) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)

statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
pipe(statusStream, connection.stream.sink)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
try {
await pipe(statusStream, otherPeerConnection.stream.sink)
} catch (e) {
P2P_LOGGER.error(e)
}
}
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}
}

try {
// eslint-disable-next-line no-unreachable-loop
for await (const chunk of connection.stream.source) {
for await (const chunk of otherPeerConnection.stream.source) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
await pipe(statusStream, otherPeerConnection.stream.sink)
}

await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}
}
if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
await pipe(statusStream, otherPeerConnection.stream.sink)
}

await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
)
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}

Expand All @@ -107,20 +137,27 @@ export async function handleProtocolCommands(connection: any) {
sendStream = response.stream
}
statusStream = new ReadableString(JSON.stringify(status))
if (sendStream == null) pipe(statusStream, connection.stream.sink)
else {
const combinedStream = new StreamConcat([statusStream, sendStream], {
highWaterMark: JSON.stringify(status).length // important for reading chunks correctly on sink!
})
pipe(combinedStream, connection.stream.sink)

if (connectionStatus === 'open') {
if (sendStream == null) {
await pipe(statusStream, otherPeerConnection.stream.sink)
} else {
const combinedStream = new StreamConcat([statusStream, sendStream], {
highWaterMark: JSON.stringify(status).length // important for reading chunks correctly on sink!
})
await pipe(combinedStream, otherPeerConnection.stream.sink)
}
}

await closeStreamConnection(otherPeerConnection.connection, remotePeer)
} catch (err) {
P2P_LOGGER.logMessageWithEmoji(
'handleProtocolCommands Error: ' + err.message,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
}
}
}

0 comments on commit 1404482

Please sign in to comment.