diff --git a/packages/react-sdk/package.json b/packages/react-sdk/package.json index c677e92e..87be1f8e 100644 --- a/packages/react-sdk/package.json +++ b/packages/react-sdk/package.json @@ -77,7 +77,6 @@ "@xmtp/content-type-remote-attachment": "^1.0.7", "@xmtp/content-type-reply": "^1.0.0", "@xmtp/xmtp-js": "^10.2.0", - "async-mutex": "^0.4.0", "date-fns": "^2.30.0", "dexie": "^3.2.4", "dexie-react-hooks": "^1.1.6", diff --git a/packages/react-sdk/src/helpers/caching/conversations.ts b/packages/react-sdk/src/helpers/caching/conversations.ts index 08d07dff..0a343586 100644 --- a/packages/react-sdk/src/helpers/caching/conversations.ts +++ b/packages/react-sdk/src/helpers/caching/conversations.ts @@ -1,7 +1,6 @@ import type { Conversation, Client, InvitationContext } from "@xmtp/xmtp-js"; import type { Table } from "dexie"; import type Dexie from "dexie"; -import { Mutex } from "async-mutex"; import type { CachedMetadata, CachedMetadataValues } from "./db"; export type CachedConversation = { @@ -93,8 +92,6 @@ export const getConversationByTopic = async ( return conversation; }; -const updateConversationMutex = new Mutex(); - /** * Update properties of a cached conversation */ @@ -104,19 +101,18 @@ export const updateConversation = async ( Pick >, db: Dexie, -) => - updateConversationMutex.runExclusive(async () => { - const conversationsTable = db.table( - "conversations", - ) as CachedConversationsTable; - const existing = await conversationsTable - .where("topic") - .equals(topic) - .first(); - if (existing) { - await conversationsTable.update(existing, update); - } - }); +) => { + const conversationsTable = db.table( + "conversations", + ) as CachedConversationsTable; + const existing = await conversationsTable + .where("topic") + .equals(topic) + .first(); + if (existing) { + await conversationsTable.update(existing, update); + } +}; /** * Update metadata of a cached conversation using the specified namespace @@ -174,8 +170,6 @@ export const toCachedConversation = ( walletAddress, }); -const saveConversationMutex = new Mutex(); - /** * Save a conversation to the cache * @@ -184,21 +178,20 @@ const saveConversationMutex = new Mutex(); export const saveConversation = async ( conversation: CachedConversation, db: Dexie, -) => - saveConversationMutex.runExclusive(async () => { - const conversations = db.table("conversations") as CachedConversationsTable; +) => { + const conversations = db.table("conversations") as CachedConversationsTable; - const existing = await conversations - .where("topic") - .equals(conversation.topic) - .first(); + const existing = await conversations + .where("topic") + .equals(conversation.topic) + .first(); - if (existing) { - return existing as CachedConversationWithId; - } + if (existing) { + return existing as CachedConversationWithId; + } - // eslint-disable-next-line no-param-reassign - conversation.id = await conversations.add(conversation); + // eslint-disable-next-line no-param-reassign + conversation.id = await conversations.add(conversation); - return conversation as CachedConversationWithId; - }); + return conversation as CachedConversationWithId; +}; diff --git a/packages/react-sdk/src/helpers/caching/messages.ts b/packages/react-sdk/src/helpers/caching/messages.ts index 11c25a68..128c6088 100644 --- a/packages/react-sdk/src/helpers/caching/messages.ts +++ b/packages/react-sdk/src/helpers/caching/messages.ts @@ -3,7 +3,6 @@ import { ContentTypeText, decodeContent } from "@xmtp/xmtp-js"; import type { Table } from "dexie"; import type Dexie from "dexie"; import { isAfter } from "date-fns"; -import { Mutex } from "async-mutex"; import { v4 } from "uuid"; import type { CachedMessageProcessors, @@ -135,8 +134,6 @@ export const deleteMessage = async ( } }; -const updateMessageMutex = new Mutex(); - /** * Update properties of a cached message */ @@ -156,14 +153,10 @@ export const updateMessage = async ( >, db: Dexie, ) => { - await updateMessageMutex.runExclusive(async () => { - const messagesTable = db.table("messages") as CachedMessagesTable; - await messagesTable.update(message, update); - }); + const messagesTable = db.table("messages") as CachedMessagesTable; + await messagesTable.update(message, update); }; -const updateMessageMetadataMutex = new Mutex(); - /** * Update metadata of a cached message using the specified namespace * @@ -174,12 +167,11 @@ export const updateMessageMetadata = async ( namespace: string, data: CachedMetadataValues, db: Dexie, -) => - updateMessageMetadataMutex.runExclusive(async () => { - const metadata = message.metadata || {}; - metadata[namespace] = data; - return updateMessage(message, { metadata }, db); - }); +) => { + const metadata = message.metadata || {}; + metadata[namespace] = data; + return updateMessage(message, { metadata }, db); +}; export type PrepareMessageOptions = Pick & Pick, "contentType"> & { @@ -218,8 +210,6 @@ export const prepareMessageForSending = ({ }; }; -const updateMessageAfterSendingMutex = new Mutex(); - /** * Update some message properties after it's successfully sent */ @@ -229,22 +219,18 @@ export const updateMessageAfterSending = async ( xmtpID: string, db: Dexie, ) => - updateMessageAfterSendingMutex.runExclusive(async () => - updateMessage( - message, - { - hasSendError: false, - isSending: false, - sendOptions: undefined, - sentAt, - xmtpID, - }, - db, - ), + updateMessage( + message, + { + hasSendError: false, + isSending: false, + sendOptions: undefined, + sentAt, + xmtpID, + }, + db, ); -const processMessageMutex = new Mutex(); - export type ProcessMessageOptions = { client: Client; conversation: CachedConversation; @@ -283,104 +269,95 @@ export const processMessage = async ( validators, }: ProcessMessageOptions, removeExisting = false, -) => - processMessageMutex.runExclusive(async () => { - const existingMessage = await getMessageByXmtpID(message.xmtpID, db); - // don't re-process an existing message - if (existingMessage && existingMessage.status === "processed") { - return message; - } - - // don't process invalid message content - const isContentValid = validators[message.contentType]; - if (isContentValid && !isContentValid(message.content)) { - return message; - } - - let persistedMessage: CachedMessageWithId | undefined; - const namespace = namespaces[message.contentType]; - - // internal persist function with preset namespace - const persist: InternalPersistMessage = async ({ - metadata, - update, - } = {}) => { - const updatedMetadata = { ...message.metadata }; - if (metadata && namespace) { - updatedMetadata[namespace] = metadata; - } - const updatedMessage = { - ...message, - ...update, - }; +) => { + const existingMessage = await getMessageByXmtpID(message.xmtpID, db); + // don't re-process an existing message + if (existingMessage && existingMessage.status === "processed") { + return message; + } - if (Object.keys(updatedMetadata).length > 0) { - updatedMessage.metadata = updatedMetadata; - } + // don't process invalid message content + const isContentValid = validators[message.contentType]; + if (isContentValid && !isContentValid(message.content)) { + return message; + } - persistedMessage = await saveMessage(updatedMessage, db); - return persistedMessage; - }; + let persistedMessage: CachedMessageWithId | undefined; + const namespace = namespaces[message.contentType]; - // internal updater function with preset namespace - const updateConversationMetadata = async (data: CachedMetadataValues) => { - await _updateConversationMetadata( - conversation.topic, - namespace, - data, - db, - ); + // internal persist function with preset namespace + const persist: InternalPersistMessage = async ({ metadata, update } = {}) => { + const updatedMetadata = { ...message.metadata }; + if (metadata && namespace) { + updatedMetadata[namespace] = metadata; + } + const updatedMessage = { + ...message, + ...update, }; - // message content type is not supported, skip processing - if (message.content === undefined) { - // don't persist the message if it already exists in the cache - if (!(await getMessageByXmtpID(message.xmtpID, db))) { - // persist the message to cache so that it can be processed later - const savedMessage = await saveMessage(message, db); - return savedMessage; - } - return message; + if (Object.keys(updatedMetadata).length > 0) { + updatedMessage.metadata = updatedMetadata; } - // remove existing message if requested - if ( - removeExisting && - message.id && - (await getMessageByXmtpID(message.xmtpID, db)) - ) { - await deleteMessage(message as CachedMessageWithId, db); - } + persistedMessage = await saveMessage(updatedMessage, db); + return persistedMessage; + }; - if (processors[message.contentType]) { - // run all content processors for this content type - await Promise.all( - processors[message.contentType].map((processor) => - processor({ - client, - conversation, - db, - message: message as CachedMessageWithId, - processors, - persist, - updateConversationMetadata, - }), - ), - ); - } + // internal updater function with preset namespace + const updateConversationMetadata = async (data: CachedMetadataValues) => { + await _updateConversationMetadata(conversation.topic, namespace, data, db); + }; - // update conversation's last message time - if (isAfter(message.sentAt, conversation.updatedAt)) { - await setConversationUpdatedAt(conversation.topic, message.sentAt, db); + // message content type is not supported, skip processing + if (message.content === undefined) { + // don't persist the message if it already exists in the cache + if (!(await getMessageByXmtpID(message.xmtpID, db))) { + // persist the message to cache so that it can be processed later + const savedMessage = await saveMessage(message, db); + return savedMessage; } + return message; + } - // if the message was cached, update its `status` to `processed` - if (persistedMessage) { - await updateMessage(persistedMessage, { status: "processed" }, db); - } + // remove existing message if requested + if ( + removeExisting && + message.id && + (await getMessageByXmtpID(message.xmtpID, db)) + ) { + await deleteMessage(message as CachedMessageWithId, db); + } + + if (processors[message.contentType]) { + // run all content processors for this content type + await Promise.all( + processors[message.contentType].map((processor) => + processor({ + client, + conversation, + db, + message: message as CachedMessageWithId, + processors, + persist, + updateConversationMetadata, + }), + ), + ); + } + + // update conversation's last message time + if (isAfter(message.sentAt, conversation.updatedAt)) { + await setConversationUpdatedAt(conversation.topic, message.sentAt, db); + } - return persistedMessage ?? message; - }); + // if the message was cached, update its `status` to `processed` + if (persistedMessage) { + await updateMessage(persistedMessage, { status: "processed" }, db); + } + + return persistedMessage ?? message; +}; /** * Reprocessing a message if it has the following requirements: @@ -480,8 +457,6 @@ export type ProcessUnprocessedMessagesOptions = Omit< reprocess?: typeof reprocessMessage; }; -const processUnprocessedMessagesMutex = new Mutex(); - /** * Process all unprocessed messages in the cache */ @@ -493,28 +468,26 @@ export const processUnprocessedMessages = async ({ validators, reprocess = reprocessMessage, }: ProcessUnprocessedMessagesOptions) => { - await processUnprocessedMessagesMutex.runExclusive(async () => { - const unprocessed = await getUnprocessedMessages(db); - await Promise.all( - unprocessed.map(async (unprocessedMessage) => { - // get message's conversation from cache - const conversation = await getCachedConversationByTopic( - unprocessedMessage.conversationTopic, + const unprocessed = await getUnprocessedMessages(db); + await Promise.all( + unprocessed.map(async (unprocessedMessage) => { + // get message's conversation from cache + const conversation = await getCachedConversationByTopic( + unprocessedMessage.conversationTopic, + db, + ); + // must have a conversation already in the cache + if (conversation) { + await reprocess({ + conversation, + client, db, - ); - // must have a conversation already in the cache - if (conversation) { - await reprocess({ - conversation, - client, - db, - message: unprocessedMessage, - namespaces, - processors, - validators, - }); - } - }), - ); - }); + message: unprocessedMessage, + namespaces, + processors, + validators, + }); + } + }), + ); }; diff --git a/yarn.lock b/yarn.lock index 3ed4329c..4ec6f011 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7572,7 +7572,6 @@ __metadata: "@xmtp/content-type-reply": ^1.0.0 "@xmtp/tsconfig": "workspace:*" "@xmtp/xmtp-js": ^10.2.0 - async-mutex: ^0.4.0 date-fns: ^2.30.0 dexie: ^3.2.4 dexie-react-hooks: ^1.1.6