Skip to content

Commit

Permalink
Merge branch 'develop' into feat/contacts-check-existence-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusbsilva137 authored Dec 17, 2024
2 parents 6f85339 + 0fcdc3e commit 3d4d404
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 183 deletions.
5 changes: 5 additions & 0 deletions .changeset/fifty-parrots-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/meteor': patch
---

Fixes an issue preventing the creation of normal direct message rooms due to an invalid federation configuration, allowing proper room creation under standard settings.
70 changes: 16 additions & 54 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os from 'os';

import { License, ServiceClassInternal } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { InstanceStatus, defaultPingInterval, indexExpire } from '@rocket.chat/instance-status';
import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models';
import EJSON from 'ejson';
import type { BrokerNode } from 'moleculer';
Expand Down Expand Up @@ -33,37 +33,13 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe

private transporter: Transporters.TCP | Transporters.NATS;

private isTransporterTCP = true;

private broker: ServiceBroker;

private troubleshootDisableInstanceBroadcast = false;

constructor() {
super();

const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA });
if (typeof tx === 'string') {
this.transporter = new Transporters.NATS({ url: tx });
this.isTransporterTCP = false;
} else {
this.transporter = new Transporters.TCP(tx);
}

if (this.isTransporterTCP) {
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
(this.broker.transit?.tx as any).nodes.disconnected(data?._id, false);
(this.broker.transit?.tx as any).nodes.nodes.delete(data?._id);
return;
}

if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) {
this.connectNode(data);
}
});
}

this.onEvent('license.module', async ({ module, valid }) => {
if (module === 'scalability' && valid) {
await this.startBroadcast();
Expand Down Expand Up @@ -93,17 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
}

async created() {
const transporter = getTransporter({
transporter: process.env.TRANSPORTER,
port: process.env.TCP_PORT,
extra: process.env.TRANSPORTER_EXTRA,
});

const activeInstances = InstanceStatusRaw.getActiveInstancesAddress();

this.transporter =
typeof transporter !== 'string'
? new Transporters.TCP({ ...transporter, urls: activeInstances })
: new Transporters.NATS({ url: transporter });

this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: this.transporter,
serializer: new EJSONSerializer(),
heartbeatInterval: defaultPingInterval,
heartbeatTimeout: indexExpire,
...getLogger(process.env),
});

if ((this.broker.transit?.tx as any)?.nodes?.localNode) {
(this.broker.transit?.tx as any).nodes.localNode.ipList = [hostIP];
}

this.broker.createService({
name: 'matrix',
events: {
Expand Down Expand Up @@ -176,31 +163,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
this.broadcastStarted = true;

StreamerCentral.on('broadcast', this.sendBroadcast.bind(this));

if (this.isTransporterTCP) {
await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
},
},
{
sort: {
_createdAt: -1,
},
},
).forEach(this.connectNode.bind(this));
}
}

private connectNode(record: any) {
if (record._id === InstanceStatus.id()) {
return;
}

const { host, tcpPort } = record.extraInformation;

(this.broker?.transit?.tx as any).addOfflineNode(record._id, host, tcpPort);
}

private sendBroadcast(streamName: string, eventName: string, args: unknown[]) {
Expand Down
16 changes: 9 additions & 7 deletions apps/meteor/ee/server/local-services/voip-freeswitch/service.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import { type IVoipFreeSwitchService, ServiceClassInternal } from '@rocket.chat/core-services';
import type { FreeSwitchExtension, ISetting, SettingValue } from '@rocket.chat/core-typings';
import type { FreeSwitchExtension } from '@rocket.chat/core-typings';
import { getDomain, getUserPassword, getExtensionList, getExtensionDetails } from '@rocket.chat/freeswitch';

import { settings } from '../../../../app/settings/server';

export class VoipFreeSwitchService extends ServiceClassInternal implements IVoipFreeSwitchService {
protected name = 'voip-freeswitch';

constructor(private getSetting: <T extends SettingValue = SettingValue>(id: ISetting['_id']) => T) {
constructor() {
super();
}

private getConnectionSettings(): { host: string; port: number; password: string; timeout: number } {
if (!this.getSetting('VoIP_TeamCollab_Enabled') && !process.env.FREESWITCHIP) {
if (!settings.get('VoIP_TeamCollab_Enabled') && !process.env.FREESWITCHIP) {
throw new Error('VoIP is disabled.');
}

const host = process.env.FREESWITCHIP || this.getSetting<string>('VoIP_TeamCollab_FreeSwitch_Host');
const host = process.env.FREESWITCHIP || settings.get<string>('VoIP_TeamCollab_FreeSwitch_Host');
if (!host) {
throw new Error('VoIP is not properly configured.');
}

const port = this.getSetting<number>('VoIP_TeamCollab_FreeSwitch_Port') || 8021;
const timeout = this.getSetting<number>('VoIP_TeamCollab_FreeSwitch_Timeout') || 3000;
const password = this.getSetting<string>('VoIP_TeamCollab_FreeSwitch_Password');
const port = settings.get<number>('VoIP_TeamCollab_FreeSwitch_Port') || 8021;
const timeout = settings.get<number>('VoIP_TeamCollab_FreeSwitch_Timeout') || 3000;
const password = settings.get<string>('VoIP_TeamCollab_FreeSwitch_Password');

return {
host,
Expand Down
3 changes: 1 addition & 2 deletions apps/meteor/ee/server/startup/services.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { api } from '@rocket.chat/core-services';
import { License } from '@rocket.chat/license';

import { settings } from '../../../app/settings/server/cached';
import { isRunningMs } from '../../../server/lib/isRunningMs';
import { FederationService } from '../../../server/services/federation/service';
import { LicenseService } from '../../app/license/server/license.internalService';
Expand All @@ -19,7 +18,7 @@ api.registerService(new LDAPEEService());
api.registerService(new LicenseService());
api.registerService(new MessageReadsService());
api.registerService(new OmnichannelEE());
api.registerService(new VoipFreeSwitchService((id) => settings.get(id)));
api.registerService(new VoipFreeSwitchService());

// when not running micro services we want to start up the instance intercom
if (!isRunningMs()) {
Expand Down
3 changes: 2 additions & 1 deletion apps/meteor/server/database/watchCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ const onlyCollections = DBWATCHER_ONLY_COLLECTIONS.split(',')
.filter(Boolean);

export function getWatchCollections(): string[] {
const collections = [InstanceStatus.getCollectionName()];
const collections = [];

// add back to the list of collections in case db watchers are enabled
if (!dbWatchersDisabled) {
collections.push(InstanceStatus.getCollectionName());
collections.push(Users.getCollectionName());
collections.push(Messages.getCollectionName());
collections.push(LivechatInquiry.getCollectionName());
Expand Down
47 changes: 46 additions & 1 deletion apps/meteor/server/models/raw/InstanceStatus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IInstanceStatus } from '@rocket.chat/core-typings';
import type { IInstanceStatusModel } from '@rocket.chat/model-typings';
import type { Db } from 'mongodb';
import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand All @@ -17,4 +17,49 @@ export class InstanceStatusRaw extends BaseRaw<IInstanceStatus> implements IInst
async getActiveInstanceCount(): Promise<number> {
return this.col.countDocuments({ _updatedAt: { $gt: new Date(Date.now() - process.uptime() * 1000 - 2000) } });
}

async getActiveInstancesAddress(): Promise<string[]> {
const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, tcpPort: 1 } } }).toArray();
return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`);
}

async removeInstanceById(_id: IInstanceStatus['_id']): Promise<DeleteResult> {
return this.deleteOne({ _id });
}

async setDocumentHeartbeat(documentId: string): Promise<UpdateResult> {
return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } });
}

async upsertInstance(instance: Partial<IInstanceStatus>): Promise<ModifyResult<IInstanceStatus>> {
return this.findOneAndUpdate(
{
_id: instance._id,
},
{
$set: instance,
$currentDate: {
_createdAt: true,
_updatedAt: true,
},
},
{
upsert: true,
returnDocument: 'after',
},
);
}

async updateConnections(_id: IInstanceStatus['_id'], conns: number) {
return this.updateOne(
{
_id,
},
{
$set: {
'extraInformation.conns': conns,
},
},
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ export class FederationHooks {
callbacks.add(
'federation.beforeCreateDirectMessage',
async (members: IUser[]): Promise<void> => {
if (!members) {
if (!members || !isFederationEnabled()) {
return;
}

throwIfFederationNotEnabledOrNotReady();

await callback(members);
},
callbacks.priority.HIGH,
Expand Down
17 changes: 11 additions & 6 deletions apps/meteor/server/startup/watchDb.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { api } from '@rocket.chat/core-services';
import { api, dbWatchersDisabled } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { MongoInternals } from 'meteor/mongo';

Expand All @@ -19,12 +19,17 @@ watcher.watch().catch((err: Error) => {
process.exit(1);
});

setInterval(function _checkDatabaseWatcher() {
if (watcher.isLastDocDelayed()) {
SystemLogger.error('No real time data received recently');
}
}, 20000);
if (!dbWatchersDisabled) {
setInterval(function _checkDatabaseWatcher() {
if (watcher.isLastDocDelayed()) {
SystemLogger.error('No real time data received recently');
}
}, 20000);
}

export function isLastDocDelayed(): boolean {
if (dbWatchersDisabled) {
return true;
}
return watcher.isLastDocDelayed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,18 @@ describe('Federation - Infrastructure - RocketChat - Hooks', () => {
it('should execute the callback when everything is correct', () => {
const stub = sinon.stub();
FederationHooks.canCreateDirectMessageFromUI(stub);
isFederationEnabled.returns(true);
hooks['federation-v2-can-create-direct-message-from-ui-ce']([]);
expect(stub.calledWith([])).to.be.true;
});

it('should not execute callback or throw error when federation is disabled', () => {
const stub = sinon.stub();
FederationHooks.canCreateDirectMessageFromUI(stub);
isFederationEnabled.returns(false);
hooks['federation-v2-can-create-direct-message-from-ui-ce']([]);
expect(stub.calledWith([])).to.be.false;
});
});

describe('#afterMessageReacted()', () => {
Expand Down
10 changes: 6 additions & 4 deletions apps/meteor/tests/unit/server/lib/freeswitch.tests.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { expect } from 'chai';
import { describe } from 'mocha';
import proxyquire from 'proxyquire';
import sinon from 'sinon';

import { settings } from '../../../../app/settings/server/cached';
import { VoipFreeSwitchService } from '../../../../ee/server/local-services/voip-freeswitch/service';

const VoipFreeSwitch = new VoipFreeSwitchService((id) => settings.get(id));
const { VoipFreeSwitchService } = proxyquire.noCallThru().load('../../../../ee/server/local-services/voip-freeswitch/service', {
'../../../../app/settings/server': { get: sinon.stub() },
});

const VoipFreeSwitch = new VoipFreeSwitchService();
// Those tests still need a proper freeswitch environment configured in order to run
// So for now they are being deliberately skipped on CI
describe.skip('VoIP', () => {
Expand Down
Loading

0 comments on commit 3d4d404

Please sign in to comment.