Skip to content

Commit

Permalink
Merge branch 'develop' into issue-506-agreementid-stop
Browse files Browse the repository at this point in the history
  • Loading branch information
paulo-ocean committed Aug 5, 2024
2 parents c3b60f4 + dc415f6 commit cddadc1
Show file tree
Hide file tree
Showing 26 changed files with 534 additions and 313 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v2
with:
node-version: '18.20.4'
node-version: 'v20.16.0'
- name: Cache node_modules
uses: actions/cache@v2
env:
Expand All @@ -43,7 +43,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
node: ['18.20.4']
node: ['18.20.4', 'v20.16.0', 'v22.5.1']

steps:
- uses: actions/checkout@v3
Expand All @@ -67,7 +67,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v2
with:
node-version: '18.20.4'
node-version: 'v20.16.0'
- name: Cache node_modules
uses: actions/cache@v2
env:
Expand Down Expand Up @@ -100,7 +100,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v2
with:
node-version: '18.20.4'
node-version: 'v20.16.0'
- name: Cache node_modules
uses: actions/cache@v2
env:
Expand Down Expand Up @@ -189,7 +189,7 @@ jobs:
- name: Set up Node.js
uses: actions/setup-node@v2
with:
node-version: '18.20.4'
node-version: 'v20.16.0'

- name: Cache node_modules
uses: actions/cache@v2
Expand Down
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v18.20.4
v20.16.0
51 changes: 38 additions & 13 deletions dashboard/src/components/IndexQueue.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from '@mui/material'
import styles from './Dashboard/index.module.css'
import { useAdminContext } from '@/context/AdminProvider'
import Alert from '@mui/material/Alert'

interface QueueItem {
txId: string
Expand All @@ -19,36 +20,49 @@ interface QueueItem {
export default function IndexQueue() {
const [queue, setQueue] = useState<QueueItem[]>([])
const { networks } = useAdminContext()
const [avoidAskQueue, setAvoidAskQueue] = useState<boolean>(false)

let intervalId: any = null
useEffect(() => {
const fetchQueue = () => {
fetch('/api/services/indexQueue')
.then((response) => response.json())
.then((data) => {
const transformedQueue = data.queue.map((item: any) => {
const network = networks.find((net) => net.chainId === item.chainId)
return {
txId: item.txId,
chainId: item.chainId,
chain: network ? network.network : 'Unknown Network'
.then((response) => {
if (response.status === 400) {
console.warn('Cannot fetch queue: Node is not running Indexer')
setAvoidAskQueue(true)
if (intervalId) {
clearInterval(intervalId) // Stop doing this, there is no point, since we don't have Indexer
}
})
setQueue(transformedQueue)
} else {
response.json().then((data) => {
const transformedQueue = data.queue.map((item: any) => {
const network = networks.find((net) => net.chainId === item.chainId)
return {
txId: item.txId,
chainId: item.chainId,
chain: network ? network.network : 'Unknown Network'
}
})
setQueue(transformedQueue)
})
}
})
.catch((error) => {
console.error('Error fetching queue:', error)
})
}

fetchQueue() // Initial fetch
let pollingInterval = 2000 // Default polling interval
let pollingInterval = 10000 // Default polling interval (10 seconds)
if (process.env.INDEXER_INTERVAL) {
pollingInterval = Number(process.env.INDEXER_INTERVAL)
}
const intervalId = setInterval(fetchQueue, pollingInterval)
intervalId = setInterval(fetchQueue, pollingInterval)

return () => {
clearInterval(intervalId) // Clear interval on component unmount
if (intervalId) {
clearInterval(intervalId) // Clear interval on component unmount
}
}
}, [])

Expand Down Expand Up @@ -88,6 +102,17 @@ export default function IndexQueue() {
) : (
<p>Indexing queue is empty.</p>
)}
{avoidAskQueue && (
<Alert
style={{ width: 640 }}
severity="warning"
onClose={() => {
setAvoidAskQueue(false)
}}
>
Node is not running Indexer. No need to get queue at this point!
</Alert>
)}
</div>
)
}
3 changes: 3 additions & 0 deletions env.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `MAX_REQ_PER_SECOND`: Number of requests per second allowed by the same client. Example: `3`
- `MAX_CHECKSUM_LENGTH`: Define the maximum length for a file if checksum is required (Mb). Example: `10`
- `LOG_LEVEL`: Define the default log level. Example: `debug`
- `LOG_CONSOLE`: Write logs to the console. Default is `false`, but becomes `true` if neither `LOG_FILES` or `LOG_DB` are set.
- `LOG_FILES`: Write logs to files. Default is `false`
- `LOG_DB`: Write logs to noSQL database. Default is `false`

## HTTP

Expand Down
29 changes: 0 additions & 29 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
"@libp2p/interface-address-manager": "^3.0.1",
"@libp2p/kad-dht": "^12.1.1",
"@libp2p/mdns": "^10.1.1",
"@libp2p/mplex": "^10.1.1",
"@libp2p/peer-id": "^4.1.4",
"@libp2p/peer-id-factory": "^4.1.4",
"@libp2p/ping": "^1.1.1",
Expand Down
5 changes: 5 additions & 0 deletions src/@types/DDO/Metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export interface MetadataAlgorithm {
* @type {string}
*/
rawcode?: string
/**
* Format of the algorithm
* @type {string}
*/
format?: string

/**
* Object describing the Docker container image.
Expand Down
2 changes: 1 addition & 1 deletion src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ export const processChunkLogs = async (
if (!allowed.length) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Metadata Proof validator not allowed`,
`Metadata Proof validators list is empty`,
true
)
continue
Expand Down
54 changes: 39 additions & 15 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,58 @@ export async function handleProtocolCommands(connection: any) {
let statusStream
let sendStream = null

const buildWrongCommandStatus = function (errorCode: number, message: string) {
status = {
httpStatus: errorCode,
error: message
}
return status
}

const denyList = await (await getConfiguration()).denyList
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
status = {
httpStatus: 403,
error: 'Unauthorized request'
}
statusStream = new ReadableString(JSON.stringify(status))

statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Unauthorized request'))
)
pipe(statusStream, connection.stream.sink)
return
}
}

/* eslint no-unreachable-loop: ["error", { "ignore": ["ForInStatement", "ForOfStatement"] }] */
for await (const chunk of connection.stream.source) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
status = { httpStatus: 400, error: 'Invalid command' }
statusStream = new ReadableString(JSON.stringify(status))
try {
// eslint-disable-next-line no-unreachable-loop
for await (const chunk of connection.stream.source) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
}
if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(400, 'Invalid command'))
)
pipe(statusStream, connection.stream.sink)
return
}
break
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
)
return
}
P2P_LOGGER.logMessage('Performing task: ' + JSON.stringify(task), true)

P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true)
// we get the handler from the running instance
// no need to create a new instance of Handler on every request
const handler: Handler = this.getCoreHandlers().getHandler(task.command)
Expand Down
48 changes: 30 additions & 18 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { bootstrap } from '@libp2p/bootstrap'
import { noise } from '@chainsafe/libp2p-noise'
import { mdns } from '@libp2p/mdns'
import { mplex } from '@libp2p/mplex'
import { yamux } from '@chainsafe/libp2p-yamux'
import { peerIdFromString } from '@libp2p/peer-id'
import { pipe } from 'it-pipe'
Expand Down Expand Up @@ -152,10 +151,11 @@ export class OceanP2P extends EventEmitter {
const peerId = details.detail
P2P_LOGGER.debug('Connection established to:' + peerId.toString()) // Emitted when a peer has been found
try {
// DO WE REALLY NEED THIS?
this._libp2p.services.pubsub.connect(peerId.toString())
} catch (e) {}
} else {
/* empty */
} catch (e) {
P2P_LOGGER.error(e.message)
}
}
}

Expand Down Expand Up @@ -322,7 +322,7 @@ export class OceanP2P extends EventEmitter {
},
peerId: config.keys.peerId,
transports,
streamMuxers: [yamux(), mplex()],
streamMuxers: [yamux()],
connectionEncryption: [
noise()
// plaintext()
Expand Down Expand Up @@ -552,26 +552,38 @@ export class OceanP2P extends EventEmitter {
let stream
// dial/connect to the target node
try {
// stream= await this._libp2p.dialProtocol(peer, this._protocol)

stream = await this._libp2p.dialProtocol(peerId, this._protocol)
} catch (e) {
response.status.httpStatus = 404
response.status.error = 'Cannot connect to peer'
P2P_LOGGER.error(`Unable to connect to peer: ${peerId}`)
return response
}

response.stream = stream
pipe(
// Source data
[uint8ArrayFromString(message)],
// Write to the stream, and pass its output to the next function
stream,
// this is the anayze function
// doubler as any,
// Sink function
sink
)
if (stream) {
response.stream = stream
try {
await pipe(
// Source data
[uint8ArrayFromString(message)],
// Write to the stream, and pass its output to the next function
stream,
// this is the anayze function
// doubler as any,
// Sink function
sink
)
} catch (err) {
P2P_LOGGER.error(`Unable to send P2P message: ${err.message}`)
response.status.httpStatus = 404
response.status.error = err.message
}
} else {
response.status.httpStatus = 404
response.status.error = 'Unable to get remote P2P stream (null)'
P2P_LOGGER.error(response.status.error)
}

return response
}

Expand Down
Loading

0 comments on commit cddadc1

Please sign in to comment.