Skip to content

Commit

Permalink
🌟 #1958 Bootstrap Knowledge graph service in backend (#2081)
Browse files Browse the repository at this point in the history
* #1958 Bootstrap Knowledge graph service in backend

* #1958 Implement subscribe & publish events for workspace and channel creation

* #1958 Implement subscribe & publish events for user, message and company creation

* #1958 Fix error when send data to the Knowledge graph api and add active option in the default.json knowledge graph object

* #1958 Last improvements

* #1958 Rename company_idx to forwarded_companies in default.json

* #1958 Fix tests and error related to configuration
  • Loading branch information
stephanevieira75 authored May 4, 2022
1 parent 456b4d5 commit bc01338
Show file tree
Hide file tree
Showing 15 changed files with 509 additions and 16 deletions.
10 changes: 9 additions & 1 deletion twake/backend/node/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@
"plugins": {
"server": "plugins:3100"
},
"knowledge-graph": {
"endpoint": "http://host-gateway:8888",
"use": false,
"sensitive_data": false,
"forwarded_companies": []
},
"services": [
"auth",
"push",
Expand All @@ -171,6 +177,8 @@
"previews",
"counter",
"statistics",
"online"
"cron",
"online",
"knowledge-graph"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export abstract class TwakeService<T extends TwakeServiceProvider>

constructor(protected options?: TwakeServiceOptions<TwakeServiceConfiguration>) {
this.state = new BehaviorSubject<TwakeServiceState>(TwakeServiceState.Ready);
// REMOVE ME, we should import config from framework folder instead
this.configuration = options?.configuration;
this.logger = getLogger(`core.platform.services.${this.name}Service`);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import axios, { AxiosInstance } from "axios";

import {
KnowledgeGraphCreateBodyRequest,
KnowledgeGraphCreateCompanyObjectData,
KnowledgeGraphCreateWorkspaceObjectData,
KnowledgeGraphCreateUserObjectData,
KnowledgeGraphCreateChannelObjectData,
KnowledgeGraphCreateMessageObjectData,
} from "./types";

import Workspace from "../../../../services/workspaces/entities/workspace";
import Company from "../../../../services/user/entities/company";
import User from "../../../../services/user/entities/user";
import { Channel } from "../../../../services/channels/entities";
import { Message } from "../../../../services/messages/entities/messages";
import { getLogger, TwakeLogger } from "../../framework";

export default class KnowledgeGraphAPIClient {
protected readonly version = "1.0.0";
protected readonly axiosInstance: AxiosInstance = axios.create();
readonly apiUrl: string;
readonly logger: TwakeLogger = getLogger("knowledge-graph-api-client");

constructor(apiUrl: string) {
this.apiUrl = apiUrl;
}

public onCompanyCreated(company: Partial<Company>): void {
this.axiosInstance.post<
KnowledgeGraphCreateBodyRequest<KnowledgeGraphCreateCompanyObjectData[]>
>(`${this.apiUrl}/graph/create/company`, {
records: [
{
key: "null",
value: {
id: "Company",
properties: {
company_id: company.id,
company_name: company.name,
},
},
},
],
});
}

public async onWorkspaceCreated(workspace: Partial<Workspace>): Promise<void> {
const response = await this.axiosInstance.post<
KnowledgeGraphCreateBodyRequest<KnowledgeGraphCreateWorkspaceObjectData[]>
>(`${this.apiUrl}/graph/create/workspace`, {
records: [
{
key: "null",
value: {
id: "Workspace",
properties: {
company_id: workspace.company_id,
workspace_name: workspace.name,
workspace_id: workspace.id,
},
},
},
],
});

if (response.statusText === "OK") {
this.logger.info("onWorkspaceCreated %o", response.config.data);
}
}

public async onUserCreated(companyId: string, user: Partial<User>): Promise<void> {
const response = await this.axiosInstance.post<
KnowledgeGraphCreateBodyRequest<KnowledgeGraphCreateUserObjectData[]>
>(`${this.apiUrl}/graph/create/user`, {
records: [
{
key: "null",
value: {
id: "User",
properties: {
user_id: user.id,
email: user.email_canonical,
username: user.username_canonical,
user_last_activity: user.last_activity.toLocaleString(),
first_name: user.first_name,
user_created_at: user.creation_date.toLocaleString(),
last_name: user.last_name,
company_id: companyId,
},
},
},
],
});

if (response.statusText === "OK") {
this.logger.info("onUserCreated %o", response.config.data);
}
}

public async onChannelCreated(channel: Partial<Channel>): Promise<void> {
const response = await this.axiosInstance.post<
KnowledgeGraphCreateBodyRequest<KnowledgeGraphCreateChannelObjectData[]>
>(`${this.apiUrl}/graph/create/channel`, {
records: [
{
key: "null",
value: {
id: "Channel",
properties: {
channel_id: channel.id,
channel_name: channel.name,
channel_owner: channel.owner,
workspace_id: channel.workspace_id,
},
},
},
],
});

if (response.statusText === "OK") {
this.logger.info("onChannelCreated %o", response.config.data);
}
}

public async onMessageCreated(
channelId: string,
message: Partial<Message>,
sensitiveData: boolean,
): Promise<void> {
const response = await this.axiosInstance.post<
KnowledgeGraphCreateBodyRequest<KnowledgeGraphCreateMessageObjectData[]>
>(`${this.apiUrl}/graph/create/message`, {
records: [
{
key: "null",
value: {
id: "Message",
properties: {
message_thread_id: message.thread_id,
message_created_at: message.created_at.toLocaleString(),
message_content: sensitiveData ? message.text : "secret",
type_message: message.type,
message_updated_at: message.updated_at.toLocaleString(),
user_id: message.user_id,
channel_id: channelId,
},
},
},
],
});

if (response.statusText === "OK") {
this.logger.info("onMessageCreated %o", response.config.data);
}
}
}
145 changes: 145 additions & 0 deletions twake/backend/node/src/core/platform/services/knowledge-graph/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import { Configuration, Consumes, getLogger, TwakeLogger, TwakeService } from "../../framework";
import { localEventBus } from "../../framework/pubsub";
import KnowledgeGraphAPI from "./provider";
import Workspace from "../../../../services/workspaces/entities/workspace";
import Company from "../../../../services/user/entities/company";
import User from "../../../../services/user/entities/user";
import { Channel } from "../../../../services/channels/entities";
import { Message } from "../../../../services/messages/entities/messages";
import { KnowledgeGraphGenericEventPayload, KnowledgeGraphEvents } from "./types";
import KnowledgeGraphAPIClient from "./api-client";

@Consumes([])
export default class KnowledgeGraphService
extends TwakeService<KnowledgeGraphAPI>
implements KnowledgeGraphAPI
{
readonly name = "knowledge-graph";
readonly version = "1.0.0";
protected kgAPIClient: KnowledgeGraphAPIClient = this.getKnowledgeGraphApiClient();
logger: TwakeLogger = getLogger("knowledge-graph-service");

async doInit(): Promise<this> {
const use = this.getConfigurationEntry<boolean>("use");

if (!use) {
this.logger.warn("Knowledge graph is not used");

return this;
}

localEventBus.subscribe<KnowledgeGraphGenericEventPayload<Company>>(
KnowledgeGraphEvents.COMPANY_CREATED,
this.onCompanyCreated.bind(this),
);

localEventBus.subscribe<KnowledgeGraphGenericEventPayload<Workspace>>(
KnowledgeGraphEvents.WORKSPACE_CREATED,
this.onWorkspaceCreated.bind(this),
);

localEventBus.subscribe<KnowledgeGraphGenericEventPayload<Channel>>(
KnowledgeGraphEvents.CHANNEL_CREATED,
this.onChannelCreated.bind(this),
);

localEventBus.subscribe<KnowledgeGraphGenericEventPayload<Message>>(
KnowledgeGraphEvents.MESSAGE_CREATED,
this.onMessageCreated.bind(this),
);

localEventBus.subscribe<KnowledgeGraphGenericEventPayload<User>>(
KnowledgeGraphEvents.USER_CREATED,
this.onUserCreated.bind(this),
);

return this;
}

async onCompanyCreated(data: KnowledgeGraphGenericEventPayload<Company>): Promise<void> {
const forwardedCompanies = this.getConfigurationEntry<string[]>("forwarded_companies");
this.logger.info(`${KnowledgeGraphEvents.COMPANY_CREATED} %o`, data);

if (
this.kgAPIClient &&
(forwardedCompanies.includes(data.resource.id) || forwardedCompanies.length === 0)
) {
this.kgAPIClient.onCompanyCreated(data.resource);
}
}

async onWorkspaceCreated(data: KnowledgeGraphGenericEventPayload<Workspace>): Promise<void> {
const forwardedCompanies = this.getConfigurationEntry<string[]>("forwarded_companies");
this.logger.info(`${KnowledgeGraphEvents.WORKSPACE_CREATED} %o`, data);

if (
this.kgAPIClient &&
(forwardedCompanies.includes(data.resource.company_id) || forwardedCompanies.length === 0)
) {
this.kgAPIClient.onWorkspaceCreated(data.resource);
}
}

async onChannelCreated(data: KnowledgeGraphGenericEventPayload<Channel>): Promise<void> {
const forwardedCompanies = this.getConfigurationEntry<string[]>("forwarded_companies");
this.logger.info(`${KnowledgeGraphEvents.CHANNEL_CREATED} %o`, data);

if (
this.kgAPIClient &&
(forwardedCompanies.includes(data.resource.company_id) || forwardedCompanies.length === 0)
) {
this.kgAPIClient.onChannelCreated(data.resource);
}
}

async onMessageCreated(data: KnowledgeGraphGenericEventPayload<Message>): Promise<void> {
const forwardedCompanies = this.getConfigurationEntry<string[]>("forwarded_companies");
const sensitiveData = this.getConfigurationEntry<boolean>("sensitive_data");

this.logger.debug(`${KnowledgeGraphEvents.MESSAGE_CREATED} %o`, data);

if (
this.kgAPIClient &&
(forwardedCompanies.includes(data.resource.cache.company_id) ||
forwardedCompanies.length === 0)
) {
this.kgAPIClient.onMessageCreated(
data.resource.cache.company_id,
data.resource,
sensitiveData,
);
}
}

async onUserCreated(data: KnowledgeGraphGenericEventPayload<User>): Promise<void> {
const forwardedCompanies = this.getConfigurationEntry<string[]>("forwarded_companies");
this.logger.info(`${KnowledgeGraphEvents.USER_CREATED} %o`, data);

const companyId = data.resource.cache.companies.find(v => forwardedCompanies.includes(v));

if (this.kgAPIClient && (companyId || forwardedCompanies.length === 0)) {
this.kgAPIClient.onUserCreated(companyId, data.resource);
}
}

private getConfigurationEntry<T>(key: string): T {
const configuration = new Configuration("knowledge-graph");
return configuration.get(key);
}

private getKnowledgeGraphApiClient(): KnowledgeGraphAPIClient {
const endpoint = this.getConfigurationEntry<string>("endpoint");

if (endpoint && endpoint.length) {
this.kgAPIClient = new KnowledgeGraphAPIClient(endpoint);
} else {
this.logger.info("KnowledgeGraph - No endpoint defined in default.json");
}

return this.kgAPIClient;
}

api(): KnowledgeGraphAPI {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Company from "../../../../services/user/entities/company";
import { Channel } from "../../../../services/channels/entities";
import { TwakeServiceProvider } from "../../framework";
import { KnowledgeGraphGenericEventPayload } from "./types";
import Workspace from "../../../../services/workspaces/entities/workspace";
import { Message } from "../../../../services/messages/entities/messages";
import User from "../../../../services/user/entities/user";

export default interface KnowledgeGraphAPI extends TwakeServiceProvider {
onCompanyCreated(data: KnowledgeGraphGenericEventPayload<Company>): void;
onWorkspaceCreated(data: KnowledgeGraphGenericEventPayload<Workspace>): void;
onChannelCreated(data: KnowledgeGraphGenericEventPayload<Channel>): void;
onMessageCreated(data: KnowledgeGraphGenericEventPayload<Message>): void;
onUserCreated(data: KnowledgeGraphGenericEventPayload<User>): void;
}
Loading

0 comments on commit bc01338

Please sign in to comment.