Skip to content

Commit

Permalink
feat: fetch and parse full gmail message (twentyhq#5160)
Browse files Browse the repository at this point in the history
first part of https://github.com/twentyhq/twenty/issues/4108
related PR twentyhq#5081

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
  • Loading branch information
rostaklein and charlesBochet authored May 20, 2024
1 parent b5d3396 commit a981344
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 257 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
"@types/lodash.camelcase": "^4.3.7",
"@types/lodash.merge": "^4.6.7",
"@types/lodash.pick": "^4.3.7",
"@types/mailparser": "^3.4.4",
"@types/nodemailer": "^6.4.14",
"@types/passport-microsoft": "^1.0.3",
"add": "^2.0.6",
"addressparser": "^1.0.1",
"afterframe": "^1.0.2",
"apollo-server-express": "^3.12.0",
"apollo-upload-client": "^17.0.0",
Expand Down Expand Up @@ -127,7 +127,6 @@
"lodash.snakecase": "^4.1.1",
"lodash.upperfirst": "^4.3.1",
"luxon": "^3.3.0",
"mailparser": "^3.6.5",
"microdiff": "^1.3.2",
"nest-commander": "^3.12.0",
"next": "14.0.4",
Expand Down Expand Up @@ -233,6 +232,7 @@
"@swc/helpers": "~0.5.2",
"@testing-library/jest-dom": "^6.1.5",
"@testing-library/react": "14.0.0",
"@types/addressparser": "^1.0.3",
"@types/apollo-upload-client": "^17.0.2",
"@types/bcrypt": "^5.0.0",
"@types/better-sqlite3": "^7.6.8",
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 10;
export const GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 20;
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Injectable, Logger } from '@nestjs/common';

import { AxiosResponse } from 'axios';
import { simpleParser } from 'mailparser';
import planer from 'planer';
import addressparser from 'addressparser';
import { gmail_v1 } from 'googleapis';

import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/types/message-or-thread-query';
import { GmailMessageParsedResponse } from 'src/modules/messaging/types/gmail-message-parsed-response';
import { FetchByBatchesService } from 'src/modules/messaging/services/fetch-by-batch/fetch-by-batch.service';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/services/utils/format-address-object-as-participants.util';
import { assert, assertNotNull } from 'src/utils/assert';

@Injectable()
export class FetchMessagesByBatchesService {
Expand All @@ -19,9 +20,9 @@ export class FetchMessagesByBatchesService {
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
workspaceId?: string,
connectedAccountId?: string,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
workspaceId: string,
connectedAccountId: string,
): Promise<GmailMessage[]> {
let startTime = Date.now();
const batchResponses = await this.fetchByBatchesService.fetchAllByBatches(
queries,
Expand All @@ -38,8 +39,11 @@ export class FetchMessagesByBatchesService {

startTime = Date.now();

const formattedResponse =
await this.formatBatchResponsesAsGmailMessages(batchResponses);
const formattedResponse = this.formatBatchResponsesAsGmailMessages(
batchResponses,
workspaceId,
connectedAccountId,
);

endTime = Date.now();

Expand All @@ -52,109 +56,172 @@ export class FetchMessagesByBatchesService {
return formattedResponse;
}

async formatBatchResponseAsGmailMessage(
private formatBatchResponseAsGmailMessage(
responseCollection: AxiosResponse<any, any>,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const parsedResponses = this.fetchByBatchesService.parseBatch(
responseCollection,
) as GmailMessageParsedResponse[];

const errors: any = [];
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const parsedResponses =
this.fetchByBatchesService.parseBatch(responseCollection);

const sanitizeString = (str: string) => {
return str.replace(/\0/g, '');
};

const formattedResponse = Promise.all(
parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (message.error) {
errors.push(message.error);
const formattedResponse = parsedResponses.map(
(response): GmailMessage | null => {
if ('error' in response) {
if (response.error.code === 404) {
return null;
}

throw response.error;
}

const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);

if (!from) {
this.logger.log(
`From value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log(
`To, Delivered-To, Bcc or Cc value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return;
return null;
}

const { historyId, id, threadId, internalDate, raw } = message;

const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));

try {
const parsed = await simpleParser(body, {
skipHtmlToText: true,
skipImageLinks: true,
skipTextToHtml: true,
maxHtmlLengthToParse: 0,
});

const { subject, messageId, from, to, cc, bcc, text, attachments } =
parsed;

if (!from) throw new Error('From value is missing');

const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text)
try {
textWithoutReplyQuotations = planer.extractFrom(
text,
'text/plain',
);
} catch (error) {
console.log(
'Error while trying to remove reply quotations',
error,
);
}

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId: messageId || '',
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from.value[0].address || '',
fromDisplayName: from.value[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
} catch (error) {
console.log('Error', error);

errors.push(error);
const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}
}),

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
},
);

const filteredMessages = (await formattedResponse).filter(
(message) => message,
const filteredMessages = formattedResponse.filter((message) =>
assertNotNull(message),
) as GmailMessage[];

return { messages: filteredMessages, errors };
return filteredMessages;
}

async formatBatchResponsesAsGmailMessages(
private formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[],
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const messagesAndErrors = await Promise.all(
batchResponses.map(async (response) => {
return this.formatBatchResponseAsGmailMessage(response);
}),
);
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const messageBatches = batchResponses.map((response) => {
return this.formatBatchResponseAsGmailMessage(
response,
workspaceId,
connectedAccountId,
);
});

return messageBatches.flat();
}

private parseGmailMessage(message: gmail_v1.Schema$Message) {
const subject = this.getPropertyFromHeaders(message, 'Subject');
const rawFrom = this.getPropertyFromHeaders(message, 'From');
const rawTo = this.getPropertyFromHeaders(message, 'To');
const rawDeliveredTo = this.getPropertyFromHeaders(message, 'Delivered-To');
const rawCc = this.getPropertyFromHeaders(message, 'Cc');
const rawBcc = this.getPropertyFromHeaders(message, 'Bcc');
const messageId = this.getPropertyFromHeaders(message, 'Message-ID');
const id = message.id;
const threadId = message.threadId;
const historyId = message.historyId;
const internalDate = message.internalDate;

assert(id);
assert(messageId);
assert(threadId);
assert(historyId);
assert(internalDate);

const bodyData = this.getBodyData(message);
const text = bodyData ? Buffer.from(bodyData, 'base64').toString() : '';

return {
id,
headerMessageId: messageId,
threadId,
historyId,
internalDate,
subject,
from: rawFrom ? addressparser(rawFrom) : undefined,
deliveredTo: rawDeliveredTo ? addressparser(rawDeliveredTo) : undefined,
to: rawTo ? addressparser(rawTo) : undefined,
cc: rawCc ? addressparser(rawCc) : undefined,
bcc: rawBcc ? addressparser(rawBcc) : undefined,
text,
attachments: [],
};
}

const messages = messagesAndErrors.map((item) => item.messages).flat();
private getBodyData(message: gmail_v1.Schema$Message) {
const firstPart = message.payload?.parts?.[0];

const errors = messagesAndErrors.map((item) => item.errors).flat();
if (firstPart?.mimeType === 'text/plain') {
return firstPart?.body?.data;
}

return firstPart?.parts?.find((part) => part.mimeType === 'text/plain')
?.body?.data;
}

private getPropertyFromHeaders(
message: gmail_v1.Schema$Message,
property: string,
) {
const header = message.payload?.headers?.find(
(header) => header.name?.toLowerCase() === property.toLowerCase(),
);

return { messages, errors };
return header?.value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class GmailFetchMessageContentFromCacheService {
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);

try {
const { messages: messagesToSave, errors } =
const messagesToSave =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
Expand All @@ -194,22 +194,6 @@ export class GmailFetchMessageContentFromCacheService {
return [];
}

if (errors.length) {
const errorsCanBeIgnored = errors.every(
(error) => error.code === 404,
);

if (!errorsCanBeIgnored) {
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify(
errors,
null,
2,
)}`,
);
}
}

const messageExternalIdsAndIdsMap =
await this.messageService.saveMessagesWithinTransaction(
messagesToSave,
Expand Down Expand Up @@ -292,21 +276,19 @@ export class GmailFetchMessageContentFromCacheService {
messageIdsToFetch,
);

if (error?.message?.code === 429) {
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);
await this.messageChannelRepository.updateSyncStatus(
gmailMessageChannelId,
MessageChannelSyncStatus.FAILED,
workspaceId,
);

await this.messageChannelRepository.updateSyncStatus(
gmailMessageChannelId,
MessageChannelSyncStatus.FAILED,
workspaceId,
);
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);

throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
);
}
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
);
}
}

Expand Down
Loading

0 comments on commit a981344

Please sign in to comment.