Skip to content

Commit

Permalink
Bug/remove_pubsub (#652)
Browse files Browse the repository at this point in the history
* remove pubsub
* remove broadcast command & cleanup
  • Loading branch information
alexcos20 authored Aug 26, 2024
1 parent af75e1e commit d904ba9
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 178 deletions.
26 changes: 0 additions & 26 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,32 +404,6 @@ returns list of logs

---

## Broadcast Command

### `HTTP` POST /broadcastCommand

#### Description

returns an empty if command is valid

#### Parameters

| name | type | required | description |
| ------- | ------ | -------- | ---------------------------- |
| command | string | v | command name |
| ... | any | | any other command parameters |

#### Request

```json
{
"command": "echo",
"...": "..."
}
```

---

## Advertise Did

### `HTTP` GET /advertiseDid/?did=did:op:123"
Expand Down
6 changes: 0 additions & 6 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ export interface OceanNodeStatus {
supportedSchemas?: Schema[]
}

export interface P2PBroadcastResponse {
command: string // original broadcast command
message: any // original broadcast message
response: any // the actual response to the original command and message
}

export interface FindDDOResponse {
provider: string
id: string
Expand Down
5 changes: 0 additions & 5 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ export interface ICommandHandler {
validate(command: Command): ValidateParams
}

export interface BroadcastCommand {
command: string // the name of the command
message: any // the message to broadcast
}

export interface ComputeGetEnvironmentsCommand extends Command {
chainId?: number
}
Expand Down
29 changes: 0 additions & 29 deletions src/components/P2P/handleBroadcasts.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/components/P2P/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './handleBroadcasts.js'
export * from './handleProtocolCommands.js'
109 changes: 37 additions & 72 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import EventEmitter from 'node:events'
import clone from 'lodash.clonedeep'

import {
// handleBroadcasts,
// handlePeerConnect,
// handlePeerDiscovery,
// handlePeerDisconnect,
Expand All @@ -19,7 +18,7 @@ import { mdns } from '@libp2p/mdns'
import { yamux } from '@chainsafe/libp2p-yamux'
import { peerIdFromString } from '@libp2p/peer-id'
import { pipe } from 'it-pipe'
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
// import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'

import { tcp } from '@libp2p/tcp'
import { webSockets } from '@libp2p/websockets'
Expand All @@ -31,7 +30,7 @@ import { uPnPNAT } from '@libp2p/upnp-nat'
import { ping } from '@libp2p/ping'
import { dcutr } from '@libp2p/dcutr'
import { kadDHT, passthroughMapper } from '@libp2p/kad-dht'
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'

import { EVENTS, cidFromRawString } from '../../utils/index.js'
import { Transform } from 'stream'
Expand All @@ -40,11 +39,7 @@ import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode'
// eslint-disable-next-line camelcase
import is_ip_private from 'private-ip'
import ip from 'ip'
import {
GENERIC_EMOJIS,
LOG_LEVELS_STR,
getLoggerLevelEmoji
} from '../../utils/logging/Logger.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { INDEXER_DDO_EVENT_EMITTER } from '../Indexer/index.js'
import { P2P_LOGGER } from '../../utils/logging/common.js'
import { CoreHandlersRegistry } from '../core/handler/coreHandlersRegistry'
Expand Down Expand Up @@ -119,9 +114,10 @@ export class OceanP2P extends EventEmitter {
this._libp2p.addEventListener('peer:disconnect', (evt: any) => {
this.handlePeerDisconnect(evt)
})
this._libp2p.addEventListener('peer:discovery', (evt: any) => {
this.handlePeerDiscovery(evt)
this._libp2p.addEventListener('peer:discovery', (details: any) => {
this.handlePeerDiscovery(details)
})

this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
this._peers = []
this._connections = {}
Expand Down Expand Up @@ -161,9 +157,22 @@ export class OceanP2P extends EventEmitter {
P2P_LOGGER.debug('Connection closed to:' + peerId.toString()) // Emitted when a peer has been found
}

handlePeerDiscovery(details: any) {
const peerInfo = details.detail
P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString())
async handlePeerDiscovery(details: any) {
try {
const peerInfo = details.detail
P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString())
if (peerInfo.multiaddrs) {
await this._libp2p.peerStore.save(peerInfo.id, {
multiaddrs: peerInfo.multiaddrs
})
await this._libp2p.peerStore.patch(peerInfo.id, {
multiaddrs: peerInfo.multiaddrs
})
}
} catch (e) {
// no panic if it failed
// console.error(e)
}
}

handlePeerJoined(details: any) {
Expand Down Expand Up @@ -233,7 +242,6 @@ export class OceanP2P extends EventEmitter {
this._privateKey = config.keys.privateKey
/** @type {import('libp2p').Libp2pOptions} */
// start with some default, overwrite based on config later
let doPx = false
const bindInterfaces = []
if (config.p2pConfig.enableIPV4) {
P2P_LOGGER.info('Binding P2P sockets to IPV4')
Expand All @@ -258,7 +266,6 @@ export class OceanP2P extends EventEmitter {
config.p2pConfig.announceAddresses &&
config.p2pConfig.announceAddresses.length > 0
) {
doPx = true
addresses = {
listen: bindInterfaces,
announceFilter: (multiaddrs: any[]) =>
Expand All @@ -274,6 +281,7 @@ export class OceanP2P extends EventEmitter {
}
let servicesConfig = {
identify: identify(),
/*
pubsub: gossipsub({
fallbackToFloodsub: false,
batchPublish: false,
Expand All @@ -286,7 +294,7 @@ export class OceanP2P extends EventEmitter {
// canRelayMessage: true,
// enabled: true
allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol']
}),
}), */
dht: kadDHT({
// this is necessary because this node is not connected to the public network
// it can be removed if, for example bootstrappers are configured
Expand Down Expand Up @@ -372,7 +380,7 @@ export class OceanP2P extends EventEmitter {
}),
mdns({
interval: config.p2pConfig.mDNSInterval
}),
}) /*,
pubsubPeerDiscovery({
interval: config.p2pConfig.pubsubPeerDiscoveryInterval,
topics: [
Expand All @@ -381,7 +389,7 @@ export class OceanP2P extends EventEmitter {
// '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space
],
listenOnly: false
})
}) */
]
}
}
Expand All @@ -393,7 +401,7 @@ export class OceanP2P extends EventEmitter {
peerDiscovery: [
mdns({
interval: config.p2pConfig.mDNSInterval
}),
}) /*,
pubsubPeerDiscovery({
interval: config.p2pConfig.pubsubPeerDiscoveryInterval,
topics: [
Expand All @@ -402,37 +410,14 @@ export class OceanP2P extends EventEmitter {
// '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space
],
listenOnly: false
})
}) */
]
}
}
}
const node = await createLibp2p(options)
await node.start()

// node.services.pubsub.addEventListener( 'peer joined', (evt:any) => {handlePeerJoined(evt)})
// node.services.pubsub.addEventListener('peer left', (evt:any) => {handlePeerLeft(evt)})
// node.services.pubsub.addEventListener('subscription-change', (evt:any) => { handleSubscriptionCHange(evt)})

// this._libp2p.services.pubsub.on('peer joined', (peer:any) => {
// console.log('New peer joined us:', peer)
// })
// this._libp2p.services.pubsub.addEventListener('peer left', (evt:any) => {
// console.log('Peer left...', evt)
// })
// this._libp2p.services.pubsub.on('peer left', (peer:any) => {
// console.log('Peer left...', peer)
// })

/* since we don't have broadcasts implemented, comment this part of the code
node.services.pubsub.addEventListener('message', (message: any) => {
handleBroadcasts(this._topic, message)
})
*/

node.services.pubsub.subscribe(this._topic)
node.services.pubsub.publish(this._topic, encoding('online'))

const upnpService = (node.services as any).upnpNAT
if (config.p2pConfig.upnp && upnpService) {
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
Expand Down Expand Up @@ -489,24 +474,24 @@ export class OceanP2P extends EventEmitter {

async getOceanPeers(running: boolean = true, known: boolean = true) {
const peers: string[] = []
if (running) {
/* if (running) {
// get pubsub peers
const node = <any>this._libp2p
const newPeers = (await node.services.pubsub.getSubscribers(this._topic)).sort()
for (const peer of newPeers.slice(0)) {
if (!peers.includes(peer.toString)) peers.push(peer.toString())
}
}
} */
if (known) {
// get p2p peers and filter them by protocol
for (const peer of await this._libp2p.peerStore.all()) {
if (peer && peer.protocols) {
for (const protocol of peer.protocols) {
if (protocol === this._protocol) {
if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString())
}
}
}
// if (peer && peer.protocols) {
// for (const protocol of peer.protocols) {
// if (protocol === this._protocol) {
if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString())
// }
// }
// }
}
}

Expand All @@ -518,18 +503,6 @@ export class OceanP2P extends EventEmitter {
return Boolean(s.find((p: any) => p.toString() === peer.toString()))
}

async broadcast(_message: any) {
P2P_LOGGER.logMessage('Broadcasting:', true)
P2P_LOGGER.logMessageWithEmoji(
_message,
true,
getLoggerLevelEmoji(LOG_LEVELS_STR.LEVEL_INFO),
LOG_LEVELS_STR.LEVEL_INFO
)
const message = encoding(_message)
await this._libp2p.services.pubsub.publish(this._topic, message)
}

async getPeerDetails(peerName: string) {
try {
const peerId = peerIdFromString(peerName)
Expand Down Expand Up @@ -1013,11 +986,3 @@ export class OceanP2P extends EventEmitter {
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
}
}

function encoding(message: any) {
if (!(message instanceof Uint8Array)) {
return uint8ArrayFromString(message)
}

return message
}
27 changes: 1 addition & 26 deletions src/components/httpRoutes/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,10 @@ import express, { Request, Response } from 'express'
import { P2PCommandResponse } from '../../@types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

import { getDefaultLevel } from '../../utils/logging/Logger.js'

import { HTTP_LOGGER } from '../../utils/logging/common.js'
import { hasP2PInterface, sendMissingP2PResponse } from './index.js'
import { hasP2PInterface } from './index.js'
import { validateCommandParameters } from './validateCommands.js'

export const broadcastCommandRoute = express.Router()

broadcastCommandRoute.post(
'/broadcastCommand',
express.json(),
async (req: Request, res: Response): Promise<void> => {
const validate = validateCommandParameters(req.body, [])
if (!validate.valid) {
res.status(validate.status).send(validate.reason)
return
}

HTTP_LOGGER.log(getDefaultLevel(), `broadcastCommand received ${req.body}`, true)

if (hasP2PInterface) {
await req.oceanNode.getP2PNode().broadcast(JSON.stringify(req.body))
res.sendStatus(200)
} else {
sendMissingP2PResponse(res)
}
}
)

export const directCommandRoute = express.Router()
directCommandRoute.post(
'/directCommand',
Expand Down
4 changes: 1 addition & 3 deletions src/components/httpRoutes/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import express, { Response } from 'express'
import { getOceanPeersRoute, getP2PPeersRoute, getP2PPeerRoute } from './getOceanPeers.js'
import { advertiseDidRoute, getProvidersForDidRoute } from './dids.js'
import { broadcastCommandRoute, directCommandRoute } from './commands.js'
import { directCommandRoute } from './commands.js'
import { logRoutes } from './logs.js'
import { providerRoutes } from './provider.js'
import { aquariusRoutes } from './aquarius.js'
Expand Down Expand Up @@ -34,8 +34,6 @@ httpRoutes.use(getP2PPeerRoute)
httpRoutes.use(advertiseDidRoute)
// /getProvidersForDid
httpRoutes.use(getProvidersForDidRoute)
// /broadcastCommand
httpRoutes.use(broadcastCommandRoute)
// /directCommand
httpRoutes.use(directCommandRoute)
// /logs
Expand Down
4 changes: 0 additions & 4 deletions src/components/httpRoutes/routeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ routesNames.set('directCommand', {
method: 'post'
})

routesNames.set('broadcastCommand', {
path: '/broadcastCommand',
method: 'post'
})
// fileInfo
routesNames.set('fileInfo', {
path: `${SERVICES_API_BASE_PATH}/fileInfo`,
Expand Down
Loading

0 comments on commit d904ba9

Please sign in to comment.