Skip to content

Commit

Permalink
feat: add cross-server event broadcasting
Browse files Browse the repository at this point in the history
  • Loading branch information
KernelDeimos committed Jun 15, 2024
1 parent 3ae0773 commit 1207a15
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 2 deletions.
1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"response-time": "^2.3.2",
"seedrandom": "^3.0.5",
"socket.io": "^4.6.2",
"socket.io-client": "^4.6.2",
"ssh2": "^1.13.0",
"string-hash": "^1.1.3",
"string-length": "^6.0.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/CoreModule.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ const install = async ({ services, app, useapi }) => {

const { ScriptService } = require('./services/ScriptService');
services.registerService('script', ScriptService);

const { BroadcastService } = require('./services/BroadcastService');
services.registerService('broadcast', BroadcastService);
}

const install_legacy = async ({ services }) => {
Expand Down
131 changes: 131 additions & 0 deletions packages/backend/src/services/BroadcastService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
const { AdvancedBase } = require("@heyputer/puter-js-common");
const { Endpoint } = require("../util/expressutil");
const { UserActorType } = require("./auth/Actor");
const BaseService = require("./BaseService");

class Peer extends AdvancedBase {
static ONLINE = Symbol('ONLINE');
static OFFLINE = Symbol('OFFLINE');

static MODULES = {
sioclient: require('socket.io-client'),
};

constructor (svc_broadcast, config) {
super();
this.svc_broadcast = svc_broadcast;
this.log = this.svc_broadcast.log;
this.config = config;
}

send (data) {
if ( ! this.socket ) return;
this.socket.send(data)
}

get state () {
try {
if ( this.socket?.connected ) return this.constructor.ONLINE;
} catch (e) {
console.error('could not get peer state', e);
}
return this.constructor.OFFLINE;
}

connect () {
const address = this.config.address;
const socket = this.modules.sioclient(address, {
transports: ['websocket'],
path: '/wssinternal',
reconnection: true,
extraHeaders: {
...(this.config.host ? {
Host: this.config.host,
} : {})
}
});
socket.on('connect', () => {
this.log.info(`connected`, {
address: this.config.address
});
});
socket.on('disconnect', () => {
this.log.info(`disconnected`, {
address: this.config.address
});
});
socket.on('connect_error', e => {
this.log.info(`connection error`, {
address: this.config.address,
message: e.message,
});
console.log(e);
});
socket.on('error', e => {
this.log.info('error', {
message: e.message,
});
});

this.socket = socket;
}
}

class BroadcastService extends BaseService {
static MODULES = {
express: require('express'),
// ['socket.io']: require('socket.io'),
};

_construct () {
this.peers_ = [];
}

async _init () {
for ( const peer_config of this.config.peers ) {
const peer = new Peer(this, peer_config);
this.peers_.push(peer);
peer.connect();
}

const svc_event = this.services.get('event');
svc_event.on('outer.*', this.on_event.bind(this));
}

async on_event (key, data, meta) {
if ( meta.from_outside ) return;

for ( const peer of this.peers_ ) {
if ( peer.state !== Peer.ONLINE ) continue;
peer.send({ key, data, meta });
}
}

async ['__on_install.websockets'] (_, { server }) {
const svc_event = this.services.get('event');

const io = require('socket.io')(server, {
cors: { origin: '*' },
path: '/wssinternal',
});

io.on('connection', async socket => {
socket.on('message', ({ key, data, meta }) => {
if ( meta.from_outside ) {
this.log.noticeme('possible over-sending');
return;
}

meta.from_outside = true;
svc_event.emit(key, data, meta);
});
});


this.log.noticeme(
require('node:util').inspect(this.config)
);
}
}

module.exports = { BroadcastService };
26 changes: 24 additions & 2 deletions packages/backend/src/services/EventService.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class ScopedEventBus {
class EventService extends BaseService {
async _construct () {
this.listeners_ = {};
this.global_listeners_ = [];
}

emit (key, data) {
emit (key, data, meta) {
meta = meta ?? {};
const parts = key.split('.');
for ( let i = 0; i < parts.length; i++ ) {
const part = i === parts.length - 1
Expand All @@ -55,7 +57,7 @@ class EventService extends BaseService {
// event dispatch.
(async () => {
try {
await callback(key, data);
await callback(key, data, meta);
} catch (e) {
this.errors.report('event-service.emit', {
source: e,
Expand All @@ -66,6 +68,22 @@ class EventService extends BaseService {
})();
}
}

for ( const callback of this.global_listeners_ ) {
// IIAFE wrapper to catch errors without blocking
// event dispatch.
(async () => {
try {
await callback(key, data, meta);
} catch (e) {
this.errors.report('event-service.emit', {
source: e,
trace: true,
alarm: true,
});
}
})();
}

}

Expand All @@ -86,6 +104,10 @@ class EventService extends BaseService {

return det;
}

on_all (callback) {
this.global_listeners_.push(callback);
}

get_scoped (scope) {
return new ScopedEventBus(this, scope);
Expand Down
34 changes: 34 additions & 0 deletions packages/backend/src/services/WSPushService.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class WSPushService extends AdvancedBase {
this._on_upload_progress.bind(this));
this.svc_event.on('fs.storage.progress.*',
this._on_upload_progress.bind(this));
this.svc_event.on('outer.gui.*',
this._on_outer_gui.bind(this));
}

async _on_fs_create (key, data) {
Expand Down Expand Up @@ -70,6 +72,11 @@ class WSPushService extends AdvancedBase {
for ( const user_id of user_id_list ) {
io.to(user_id).emit('item.added', response);
}

this.svc_event.emit('outer.gui.item.added', {
user_id_list,
response,
});
}

async _on_fs_update (key, data) {
Expand Down Expand Up @@ -104,6 +111,11 @@ class WSPushService extends AdvancedBase {
for ( const user_id of user_id_list ) {
io.to(user_id).emit('item.updated', response);
}

this.svc_event.emit('outer.gui.item.updated', {
user_id_list,
response,
});
}

async _on_fs_move (key, data) {
Expand Down Expand Up @@ -139,6 +151,11 @@ class WSPushService extends AdvancedBase {
for ( const user_id of user_id_list ) {
io.to(user_id).emit('item.moved', response);
}

this.svc_event.emit('outer.gui.item.moved', {
user_id_list,
response,
});
}

async _on_fs_pending (key, data) {
Expand Down Expand Up @@ -172,6 +189,10 @@ class WSPushService extends AdvancedBase {
for ( const user_id of user_id_list ) {
io.to(user_id).emit('item.pending', response);
}
this.svc_event.emit('outer.gui.item.pending', {
user_id_list,
response,
});
}

async _on_upload_progress (key, data) {
Expand Down Expand Up @@ -225,6 +246,19 @@ class WSPushService extends AdvancedBase {
})
})
}

async _on_outer_gui (key, { user_id_list, response }, meta) {
if ( ! meta.from_outside ) return;

key = key.slice('outer.gui.'.length);

const { socketio } = this.modules;

const io = socketio.getio();
for ( const user_id of user_id_list ) {
io.to(user_id).emit(key, response);
}
}
}

module.exports = {
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/services/WebServerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class WebServerService extends BaseService {
socket.broadcast.to(socket.user.id).emit('trash.is_empty', msg);
});
});

await this.services.emit('install.websockets', { server });
}

async _init () {
Expand Down

0 comments on commit 1207a15

Please sign in to comment.