diff --git a/src/channel.ts b/src/channel.ts index 98181682b..47be99ab1 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,4 +1,6 @@ import { ChannelState } from './channel_state'; +import { MessageComposer } from './messageComposer'; +import { MessageReceiptsTracker } from './messageDelivery'; import { generateChannelTempCid, logChatPromiseExecution, @@ -74,7 +76,6 @@ import type { } from './types'; import type { Role } from './permissions'; import type { CustomChannelData } from './custom_types'; -import { MessageComposer } from './messageComposer'; /** * Channel - The Channel class manages it's own state. @@ -110,6 +111,7 @@ export class Channel { disconnected: boolean; push_preferences?: PushPreference; public readonly messageComposer: MessageComposer; + public readonly messageReceiptsTracker: MessageReceiptsTracker; /** * constructor - Create a channel @@ -158,6 +160,13 @@ export class Channel { client: this._client, compositionContext: this, }); + + this.messageReceiptsTracker = new MessageReceiptsTracker({ + locateMessage: (timestampMs) => { + const msg = this.state.findMessageByTimestamp(timestampMs); + return msg && { timestampMs, msgId: msg.id }; + }, + }); } /** @@ -1131,16 +1140,26 @@ export class Channel { } /** - * markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled + * markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled. Syncs the message delivery report candidates local state. * * @param {MarkReadOptions} data * @return {Promise} Description */ async markRead(data: MarkReadOptions = {}) { + return await this.getClient().messageDeliveryReporter.markRead(this, data); + } + + /** + * markReadRequest - Send the mark read event for this user, only works if the `read_events` setting is enabled + * + * @param {MarkReadOptions} data + * @return {Promise} Description + */ + async markAsReadRequest(data: MarkReadOptions = {}) { this._checkInitialized(); if (!this.getConfig()?.read_events && !this.getClient()._isUsingServerAuth()) { - return Promise.resolve(null); + return null; } return await this.getClient().post(this._channelURL() + '/read', { @@ -1554,6 +1573,7 @@ export class Channel { { method: 'upsertChannels' }, ); + this.getClient().syncDeliveredCandidates([this]); return state; } @@ -1874,18 +1894,50 @@ export class Channel { break; case 'message.read': if (event.user?.id && event.created_at) { + const previousReadState = channelState.read[event.user.id]; channelState.read[event.user.id] = { + // in case we already have delivery information + ...previousReadState, last_read: new Date(event.created_at), last_read_message_id: event.last_read_message_id, user: event.user, unread_messages: 0, }; + this.messageReceiptsTracker.onMessageRead({ + user: event.user, + readAt: event.created_at, + lastReadMessageId: event.last_read_message_id, + }); + const client = this.getClient(); - if (event.user?.id === this.getClient().user?.id) { + const isOwnEvent = event.user?.id === client.user?.id; + + if (isOwnEvent) { channelState.unreadCount = 0; + client.syncDeliveredCandidates([this]); } } break; + case 'message.delivered': + // todo: update also on thread + if (event.user?.id && event.created_at) { + const previousReadState = channelState.read[event.user.id]; + channelState.read[event.user.id] = { + ...previousReadState, + last_delivered_at: event.last_delivered_at + ? new Date(event.last_delivered_at) + : undefined, + last_delivered_message_id: event.last_delivered_message_id, + user: event.user, + }; + + this.messageReceiptsTracker.onMessageDelivered({ + user: event.user, + deliveredAt: event.created_at, + lastDeliveredMessageId: event.last_delivered_message_id, + }); + } + break; case 'user.watching.start': case 'user.updated': if (event.user?.id) { @@ -1921,8 +1973,9 @@ export class Channel { break; case 'message.new': if (event.message) { + const client = this.getClient(); /* if message belongs to current user, always assume timestamp is changed to filter it out and add again to avoid duplication */ - const ownMessage = event.user?.id === this.getClient().user?.id; + const ownMessage = event.user?.id === client.user?.id; const isThreadMessage = event.message.parent_id && !event.message.show_in_channel; @@ -1947,6 +2000,8 @@ export class Channel { last_read: new Date(event.created_at as string), user: event.user, unread_messages: 0, + last_delivered_at: new Date(event.created_at as string), + last_delivered_message_id: event.message.id, }; } else { channelState.read[userId].unread_messages += 1; @@ -1957,6 +2012,8 @@ export class Channel { if (this._countMessageAsUnread(event.message)) { channelState.unreadCount = channelState.unreadCount + 1; } + + client.syncDeliveredCandidates([this]); } break; case 'message.updated': @@ -2057,11 +2114,13 @@ export class Channel { break; case 'notification.mark_unread': { const ownMessage = event.user?.id === this.getClient().user?.id; - if (!(ownMessage && event.user)) break; + if (!ownMessage || !event.user) break; const unreadCount = event.unread_messages ?? 0; - + const currentState = channelState.read[event.user.id]; channelState.read[event.user.id] = { + // keep the message delivery info + ...currentState, first_unread_message_id: event.first_unread_message_id, last_read: new Date(event.last_read_at as string), last_read_message_id: event.last_read_message_id, @@ -2070,6 +2129,11 @@ export class Channel { }; channelState.unreadCount = unreadCount; + this.messageReceiptsTracker.onNotificationMarkUnread({ + user: event.user, + lastReadAt: event.last_read_at, + lastReadMessageId: event.last_read_message_id, + }); break; } case 'channel.updated': @@ -2286,6 +2350,8 @@ export class Channel { this.state.unreadCount = this.state.read[read.user.id].unread_messages; } } + + this.messageReceiptsTracker.ingestInitial(state.read); } return { diff --git a/src/channel_state.ts b/src/channel_state.ts index 070a7883f..e49f719c4 100644 --- a/src/channel_state.ts +++ b/src/channel_state.ts @@ -31,6 +31,38 @@ type ChannelReadStatus = Record< } >; +const messageSetBounds = ( + a: LocalMessage[] | MessageResponse[], + b: LocalMessage[] | MessageResponse[], +) => ({ + newestMessageA: new Date(a[0]?.created_at ?? 0), + oldestMessageA: new Date(a.slice(-1)[0]?.created_at ?? 0), + newestMessageB: new Date(b[0]?.created_at ?? 0), + oldestMessageB: new Date(b.slice(-1)[0]?.created_at ?? 0), +}); + +const aContainsOrEqualsB = (a: LocalMessage[], b: LocalMessage[]) => { + const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } = + messageSetBounds(a, b); + return newestMessageA >= newestMessageB && oldestMessageB >= oldestMessageA; +}; + +const aOverlapsB = (a: LocalMessage[], b: LocalMessage[]) => { + const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } = + messageSetBounds(a, b); + return ( + oldestMessageA < oldestMessageB && + oldestMessageB < newestMessageA && + newestMessageA < newestMessageB + ); +}; + +const messageSetsOverlapByTimestamp = (a: LocalMessage[], b: LocalMessage[]) => + aContainsOrEqualsB(a, b) || + aContainsOrEqualsB(b, a) || + aOverlapsB(a, b) || + aOverlapsB(b, a); + /** * ChannelState - A container class for the channel state. */ @@ -867,6 +899,41 @@ export class ChannelState { return this.messageSets[messageSetIndex].messages.find((m) => m.id === messageId); } + findMessageByTimestamp( + timestampMs: number, + parentMessageId?: string, + exactTsMatch: boolean = false, + ): LocalMessage | null { + if ( + (parentMessageId && !this.threads[parentMessageId]) || + this.messageSets.length === 0 + ) + return null; + const setIndex = this.findMessageSetByOldestTimestamp(timestampMs); + const targetMsgSet = this.messageSets[setIndex]?.messages; + if (!targetMsgSet?.length) return null; + const firstMsgTimestamp = targetMsgSet[0].created_at.getTime(); + const lastMsgTimestamp = targetMsgSet.slice(-1)[0].created_at.getTime(); + const isOutOfBound = + timestampMs < firstMsgTimestamp || lastMsgTimestamp < timestampMs; + if (isOutOfBound && exactTsMatch) return null; + + let msgIndex = 0, + hi = targetMsgSet.length - 1; + while (msgIndex < hi) { + const mid = (msgIndex + hi) >>> 1; + if (timestampMs <= targetMsgSet[mid].created_at.getTime()) hi = mid; + else msgIndex = mid + 1; + } + + const foundMessage = targetMsgSet[msgIndex]; + return !exactTsMatch + ? foundMessage + : foundMessage.created_at.getTime() === timestampMs + ? foundMessage + : null; + } + private switchToMessageSet(index: number) { const currentMessages = this.messageSets.find((s) => s.isCurrent); if (!currentMessages) { @@ -889,6 +956,26 @@ export class ChannelState { ); } + /** + * Identifies the set index into which a message set would pertain if its first item's creation date corresponded to oldestTimestampMs. + * @param oldestTimestampMs + */ + private findMessageSetByOldestTimestamp = (oldestTimestampMs: number): number => { + let lo = 0, + hi = this.messageSets.length; + while (lo < hi) { + const mid = (lo + hi) >>> 1; + const msgSet = this.messageSets[mid]; + // should not happen + if (msgSet.messages.length === 0) return -1; + + const oldestMessageTimestampInSet = msgSet.messages[0].created_at.getTime(); + if (oldestMessageTimestampInSet <= oldestTimestampMs) hi = mid; + else lo = mid + 1; + } + return lo; + }; + private findTargetMessageSet( newMessages: (MessageResponse | LocalMessage)[], addIfDoesNotExist = true, @@ -896,39 +983,85 @@ export class ChannelState { ) { let messagesToAdd: (MessageResponse | LocalMessage)[] = newMessages; let targetMessageSetIndex!: number; + if (newMessages.length === 0) + return { targetMessageSetIndex: 0, messagesToAdd: newMessages }; if (addIfDoesNotExist) { - const overlappingMessageSetIndices = this.messageSets + const overlappingMessageSetIndicesByMsgIds = this.messageSets .map((_, i) => i) .filter((i) => this.areMessageSetsOverlap(this.messageSets[i].messages, newMessages), ); + const overlappingMessageSetIndicesByTimestamp = this.messageSets + .map((_, i) => i) + .filter((i) => + messageSetsOverlapByTimestamp( + this.messageSets[i].messages, + newMessages.map(formatMessage), + ), + ); switch (messageSetToAddToIfDoesNotExist) { case 'new': - if (overlappingMessageSetIndices.length > 0) { - targetMessageSetIndex = overlappingMessageSetIndices[0]; + if (overlappingMessageSetIndicesByMsgIds.length > 0) { + targetMessageSetIndex = overlappingMessageSetIndicesByMsgIds[0]; + } else if (overlappingMessageSetIndicesByTimestamp.length > 0) { + targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0]; // No new message set is created if newMessages only contains thread replies } else if (newMessages.some((m) => !m.parent_id)) { - this.messageSets.push({ - messages: [], - isCurrent: false, - isLatest: false, - pagination: DEFAULT_MESSAGE_SET_PAGINATION, - }); - targetMessageSetIndex = this.messageSets.length - 1; + // find the index to insert the set + const setIngestIndex = this.findMessageSetByOldestTimestamp( + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + new Date(newMessages[0].created_at!).getTime(), + ); + if (setIngestIndex === -1) { + this.messageSets.push({ + messages: [], + isCurrent: false, + isLatest: false, + pagination: DEFAULT_MESSAGE_SET_PAGINATION, + }); + targetMessageSetIndex = this.messageSets.length - 1; + } else { + const isLatest = setIngestIndex === 0; + this.messageSets.splice(setIngestIndex, 0, { + messages: [], + isCurrent: false, + isLatest, + pagination: DEFAULT_MESSAGE_SET_PAGINATION, // fixme: it is problematic decide about pagination without having data + }); + if (isLatest) { + this.messageSets.slice(1).forEach((set) => { + set.isLatest = false; + }); + } + targetMessageSetIndex = setIngestIndex; + } } break; case 'current': - targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent); + // determine if there is another set to which it would match taken into consideration the timestamp + if (overlappingMessageSetIndicesByTimestamp.length > 0) { + targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0]; + } else { + targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent); + } break; case 'latest': - targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest); + // determine if there is another set to which it would match taken into consideration the timestamp + if (overlappingMessageSetIndicesByTimestamp.length > 0) { + targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0]; + } else { + targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest); + } break; default: targetMessageSetIndex = -1; } // when merging the target set will be the first one from the overlapping message sets - const mergeTargetMessageSetIndex = overlappingMessageSetIndices.splice(0, 1)[0]; - const mergeSourceMessageSetIndices = [...overlappingMessageSetIndices]; + const mergeTargetMessageSetIndex = overlappingMessageSetIndicesByMsgIds.splice( + 0, + 1, + )[0]; + const mergeSourceMessageSetIndices = [...overlappingMessageSetIndicesByMsgIds]; if ( mergeTargetMessageSetIndex !== undefined && mergeTargetMessageSetIndex !== targetMessageSetIndex diff --git a/src/client.ts b/src/client.ts index 2b03e66e5..794ca2d56 100644 --- a/src/client.ts +++ b/src/client.ts @@ -240,6 +240,7 @@ import type { QueryChannelsRequestType, } from './channel_manager'; import { ChannelManager } from './channel_manager'; +import { MessageDeliveryReporter } from './messageDelivery'; import { NotificationManager } from './notifications'; import { ReminderManager } from './reminders'; import { StateStore } from './store'; @@ -272,7 +273,7 @@ export type MessageComposerSetupState = { export class StreamChat { private static _instance?: unknown | StreamChat; // type is undefined|StreamChat, unknown is due to TS limitations with statics - + messageDeliveryReporter: MessageDeliveryReporter; _user?: OwnUserResponse | UserResponse; appSettingsPromise?: Promise; activeChannels: { @@ -503,6 +504,7 @@ export class StreamChat { this.threads = new ThreadManager({ client: this }); this.polls = new PollManager({ client: this }); this.reminders = new ReminderManager({ client: this }); + this.messageDeliveryReporter = new MessageDeliveryReporter({ client: this }); } /** @@ -2010,7 +2012,7 @@ export class StreamChat { channels.push(c); } - + this.syncDeliveredCandidates(channels); return channels; } @@ -4701,19 +4703,20 @@ export class StreamChat { } /** - * Send the mark delivered event for this user, only works if the `delivery_receipts` setting is enabled + * Send the mark delivered event for this user * * @param {MarkDeliveredOptions} data * @return {Promise} Description */ - async markChannelsDelivered(data?: MarkDeliveredOptions) { - const deliveryReceiptsEnabled = - this.user?.privacy_settings?.delivery_receipts?.enabled; - if (!deliveryReceiptsEnabled) return; - + async markChannelsDelivered(data: MarkDeliveredOptions) { + if (!data?.latest_delivered_messages?.length) return; return await this.post( this.baseURL + '/channels/delivered', data ?? {}, ); } + + syncDeliveredCandidates(collections: Channel[]) { + this.messageDeliveryReporter.syncDeliveredCandidates(collections); + } } diff --git a/src/events.ts b/src/events.ts index 94993dc20..3d293d3d6 100644 --- a/src/events.ts +++ b/src/events.ts @@ -31,6 +31,7 @@ export const EVENT_MAP = { 'notification.mark_unread': true, 'notification.message_new': true, 'notification.mutes_updated': true, + 'notification.reminder_due': true, 'notification.removed_from_channel': true, 'notification.thread_message_new': true, 'poll.closed': true, @@ -41,6 +42,9 @@ export const EVENT_MAP = { 'reaction.deleted': true, 'reaction.new': true, 'reaction.updated': true, + 'reminder.created': true, + 'reminder.deleted': true, + 'reminder.updated': true, 'thread.updated': true, 'typing.start': true, 'typing.stop': true, @@ -67,10 +71,4 @@ export const EVENT_MAP = { 'capabilities.changed': true, 'live_location_sharing.started': true, 'live_location_sharing.stopped': true, - - // Reminder events - 'reminder.created': true, - 'reminder.updated': true, - 'reminder.deleted': true, - 'notification.reminder_due': true, }; diff --git a/src/index.ts b/src/index.ts index 202df56fb..5f5daf375 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,6 +8,7 @@ export * from './connection'; export * from './events'; export * from './insights'; export * from './messageComposer'; +export * from './messageDelivery'; export * from './middleware'; export * from './moderation'; export * from './notifications'; diff --git a/src/messageDelivery/MessageDeliveryReporter.ts b/src/messageDelivery/MessageDeliveryReporter.ts new file mode 100644 index 000000000..b0b39d2c9 --- /dev/null +++ b/src/messageDelivery/MessageDeliveryReporter.ts @@ -0,0 +1,259 @@ +import type { StreamChat } from '../client'; +import { Channel } from '../channel'; +import type { ThreadUserReadState } from '../thread'; +import { Thread } from '../thread'; +import type { + EventAPIResponse, + LocalMessage, + MarkDeliveredOptions, + MarkReadOptions, +} from '../types'; +import { throttle } from '../utils'; + +const MAX_DELIVERED_MESSAGE_COUNT_IN_PAYLOAD = 100 as const; +const MARK_AS_DELIVERED_BUFFER_TIMEOUT = 1000 as const; +const MARK_AS_READ_THROTTLE_TIMEOUT = 1000 as const; + +const isChannel = (item: Channel | Thread): item is Channel => item instanceof Channel; +const isThread = (item: Channel | Thread): item is Thread => item instanceof Thread; + +type MessageId = string; +type ChannelThreadCompositeId = string; + +export type AnnounceDeliveryOptions = Omit< + MarkDeliveredOptions, + 'latest_delivered_messages' +>; + +export type MessageDeliveryReporterOptions = { + client: StreamChat; +}; + +export class MessageDeliveryReporter { + protected client: StreamChat; + + protected deliveryReportCandidates: Map = + new Map(); + protected nextDeliveryReportCandidates: Map = + new Map(); + + protected markDeliveredRequestPromise: Promise | null = null; + protected markDeliveredTimeout: ReturnType | null = null; + + constructor({ client }: MessageDeliveryReporterOptions) { + this.client = client; + } + + private get markDeliveredRequestInFlight() { + return this.markDeliveredRequestPromise !== null; + } + private get hasTimer() { + return this.markDeliveredTimeout !== null; + } + private get hasDeliveryCandidates() { + return this.deliveryReportCandidates.size > 0; + } + + /** + * Build latest_delivered_messages payload from an arbitrary buffer (deliveryReportCandidates / nextDeliveryReportCandidates) + */ + private confirmationsFrom(map: Map) { + return Array.from(map.entries()).map(([key, messageId]) => { + const [type, id, parent_id] = key.split(':'); + return parent_id + ? { cid: `${type}:${id}`, id: messageId, parent_id } + : { cid: key, id: messageId }; + }); + } + + private confirmationsFromDeliveryReportCandidates() { + const entries = Array.from(this.deliveryReportCandidates); + const sendBuffer = new Map(entries.slice(0, MAX_DELIVERED_MESSAGE_COUNT_IN_PAYLOAD)); + this.deliveryReportCandidates = new Map( + entries.slice(MAX_DELIVERED_MESSAGE_COUNT_IN_PAYLOAD), + ); + + return { latest_delivered_messages: this.confirmationsFrom(sendBuffer), sendBuffer }; + } + + /** + * Generate candidate key for storing in the candidates buffer + * @param collection + * @private + */ + private candidateKeyFor( + collection: Channel | Thread, + ): ChannelThreadCompositeId | undefined { + if (isChannel(collection)) return collection.cid; + if (isThread(collection)) return `${collection.channel.cid}:${collection.id}`; + } + + /** + * Retrieve the reference to the latest message in the state that is nor read neither reported as delivered + * @param collection + */ + private getNextDeliveryReportCandidate = ( + collection: Channel | Thread, + ): { key: ChannelThreadCompositeId; id: MessageId | null } | undefined => { + const ownUserId = this.client.user?.id; + if (!ownUserId) return; + + let latestMessages: LocalMessage[] = []; + let lastDeliveredAt: Date | undefined; + let lastReadAt: Date | undefined; + let key: string | undefined = undefined; + + if (isChannel(collection)) { + latestMessages = collection.state.latestMessages; + const ownReadState = collection.state.read[ownUserId] ?? {}; + lastReadAt = ownReadState?.last_read; + lastDeliveredAt = ownReadState?.last_delivered_at; + key = collection.cid; + } else if (isThread(collection)) { + latestMessages = collection.state.getLatestValue().replies; + const ownReadState = + collection.state.getLatestValue().read[ownUserId] ?? ({} as ThreadUserReadState); + lastReadAt = ownReadState?.lastReadAt; + // @ts-expect-error lastDeliveredAt is not defined yet on ThreadUserReadState + lastDeliveredAt = ownReadState?.lastDeliveredAt; + key = `${collection.channel.cid}:${collection.id}`; + // todo: remove return statement once marking messages as delivered in thread is supported + return; + } else { + return; + } + + if (!key) return; + + const [latestMessage] = latestMessages.slice(-1); + + const wholeCollectionIsRead = + !latestMessage || lastReadAt >= latestMessage.created_at; + if (wholeCollectionIsRead) return { key, id: null }; + const wholeCollectionIsMarkedDelivered = + !latestMessage || (lastDeliveredAt ?? 0) >= latestMessage.created_at; + if (wholeCollectionIsMarkedDelivered) return { key, id: null }; + + return { key, id: latestMessage.id || null }; + }; + + /** + * Updates the delivery candidates buffer with the latest delivery candidates + * @param collection + */ + private trackDeliveredCandidate(collection: Channel | Thread) { + if (isChannel(collection) && !collection.getConfig()?.read_events) return; + if (isThread(collection) && !collection.channel.getConfig()?.read_events) return; + const candidate = this.getNextDeliveryReportCandidate(collection); + if (!candidate?.key) return; + const buffer = this.markDeliveredRequestInFlight + ? this.nextDeliveryReportCandidates + : this.deliveryReportCandidates; + if (candidate.id === null) buffer.delete(candidate.key); + else buffer.set(candidate.key, candidate.id); + } + + /** + * Removes candidate from the delivery report buffer + * @param collection + * @private + */ + private removeCandidateFor(collection: Channel | Thread) { + const candidateKey = this.candidateKeyFor(collection); + if (!candidateKey) return; + this.deliveryReportCandidates.delete(candidateKey); + this.nextDeliveryReportCandidates.delete(candidateKey); + } + + /** + * Records the latest message delivered for Channel or Thread instances and schedules the next report + * if not already scheduled and candidates exist. + * Should be used for WS handling (message.new) as well as for ingesting HTTP channel query results. + * @param collections + */ + public syncDeliveredCandidates(collections: (Channel | Thread)[]) { + if (this.client.user?.privacy_settings?.delivery_receipts?.enabled === false) return; + for (const c of collections) this.trackDeliveredCandidate(c); + this.announceDeliveryBuffered(); + } + + /** + * Fires delivery announcement request followed by immediate delivery candidate buffer reset. + * @param options + */ + public announceDelivery = (options?: AnnounceDeliveryOptions) => { + if (this.markDeliveredRequestInFlight || !this.hasDeliveryCandidates) return; + + const { latest_delivered_messages, sendBuffer } = + this.confirmationsFromDeliveryReportCandidates(); + if (!latest_delivered_messages.length) return; + + const payload = { ...options, latest_delivered_messages }; + + const postFlightReconcile = () => { + this.markDeliveredRequestPromise = null; + + // promote anything that arrived during request + for (const [k, v] of this.nextDeliveryReportCandidates.entries()) { + this.deliveryReportCandidates.set(k, v); + } + this.nextDeliveryReportCandidates = new Map(); + + // checks internally whether there are candidates to announce + this.announceDeliveryBuffered(options); + }; + + const handleError = () => { + // repopulate relevant candidates for the next report + for (const [k, v] of Object.entries(sendBuffer)) { + if (!this.deliveryReportCandidates.has(k)) { + this.deliveryReportCandidates.set(k, v); + } + } + postFlightReconcile(); + }; + + this.markDeliveredRequestPromise = this.client + .markChannelsDelivered(payload) + .then(postFlightReconcile, handleError); + }; + + public announceDeliveryBuffered = (options?: AnnounceDeliveryOptions) => { + if (this.hasTimer || this.markDeliveredRequestInFlight || !this.hasDeliveryCandidates) + return; + this.markDeliveredTimeout = setTimeout(() => { + this.markDeliveredTimeout = null; + this.announceDelivery(options); + }, MARK_AS_DELIVERED_BUFFER_TIMEOUT); + }; + + /** + * Delegates the mark-read call to the Channel or Thread instance + * @param collection + * @param options + */ + public markRead = async (collection: Channel | Thread, options?: MarkReadOptions) => { + let result: EventAPIResponse | null = null; + if (isChannel(collection)) { + result = await collection.markAsReadRequest(options); + } else if (isThread(collection)) { + result = await collection.channel.markAsReadRequest({ + ...options, + thread_id: collection.id, + }); + } + + this.removeCandidateFor(collection); + return result; + }; + + /** + * Throttles the MessageDeliveryReporter.markRead call + * @param collection + * @param options + */ + public throttledMarkRead = throttle(this.markRead, MARK_AS_READ_THROTTLE_TIMEOUT, { + leading: false, + trailing: true, + }); +} diff --git a/src/messageDelivery/MessageReceiptsTracker.ts b/src/messageDelivery/MessageReceiptsTracker.ts new file mode 100644 index 000000000..06860314b --- /dev/null +++ b/src/messageDelivery/MessageReceiptsTracker.ts @@ -0,0 +1,417 @@ +import type { ReadResponse, UserResponse } from '../types'; + +type UserId = string; +type MessageId = string; +export type MsgRef = { timestampMs: number; msgId: MessageId }; +export type OwnMessageReceiptsTrackerMessageLocator = ( + timestampMs: number, +) => MsgRef | null; +export type UserProgress = { + user: UserResponse; + lastReadRef: MsgRef; // MIN_REF if none + lastDeliveredRef: MsgRef; // MIN_REF if none; always >= readRef +}; + +// ---------- ordering utilities ---------- + +const MIN_REF: MsgRef = { timestampMs: Number.NEGATIVE_INFINITY, msgId: '' } as const; + +const compareRefsAsc = (a: MsgRef, b: MsgRef) => + a.timestampMs !== b.timestampMs ? a.timestampMs - b.timestampMs : 0; + +const findIndex = (arr: T[], target: MsgRef, keyOf: (x: T) => MsgRef): number => { + let lo = 0, + hi = arr.length; + while (lo < hi) { + const mid = (lo + hi) >>> 1; + if (compareRefsAsc(keyOf(arr[mid]), target) >= 0) hi = mid; + else lo = mid + 1; + } + return lo; +}; + +/** + * For insertion after the last equal item. E.g. array [a] exists and b is being inserted -> we want [a,b], not [b,a]. + * @param arr + * @param target + * @param keyOf + */ +const findUpperIndex = (arr: T[], target: MsgRef, keyOf: (x: T) => MsgRef): number => { + let lo = 0, + hi = arr.length; + while (lo < hi) { + const mid = (lo + hi) >>> 1; + if (compareRefsAsc(keyOf(arr[mid]), target) > 0) hi = mid; + else lo = mid + 1; + } + return lo; +}; + +const insertByKey = ( + arr: UserProgress[], + item: UserProgress, + keyOf: (x: UserProgress) => MsgRef, +) => arr.splice(findUpperIndex(arr, keyOf(item), keyOf), 0, item); + +const removeByOldKey = ( + arr: UserProgress[], + item: UserProgress, + oldKey: MsgRef, + keyOf: (x: UserProgress) => MsgRef, +) => { + // Find the plateau for oldKey, scan to match by user id + let i = findIndex(arr, oldKey, keyOf); + while (i < arr.length && compareRefsAsc(keyOf(arr[i]), oldKey) === 0) { + if (arr[i].user.id === item.user.id) { + arr.splice(i, 1); + return; + } + i++; + } +}; + +export type OwnMessageReceiptsTrackerOptions = { + locateMessage: OwnMessageReceiptsTrackerMessageLocator; +}; + +/** + * MessageReceiptsTracker + * -------------------------------- + * Tracks **other participants’** delivery/read progress toward **own (outgoing) messages** + * within a **single timeline** (one channel/thread). + * + * How it works + * ------------ + * - Each user has a compact progress record: + * - `lastReadRef`: latest message they have **read** + * - `lastDeliveredRef`: latest message they have **received** (always `>= lastReadRef`) + * - Internally keeps two arrays sorted **ascending by timestamp**: + * - `readSorted` (by `lastReadRef`) + * - `deliveredSorted` (by `lastDeliveredRef`) + * - Queries like “who read message M?” become a **binary search + suffix slice**. + * + * Construction + * ------------ + * `new MessageReceiptsTracker({locateMessage})` + * - `locateMessage(timestamp) => MsgRef | null` must resolve a message ref representation - `{ timestamp, msgId }`. + * - If `locateMessage` returns `null`, the event is ignored (message unknown locally). + * + * Event ingestion + * --------------- + * - `ingestInitial(rows: ReadResponse[])`: Builds initial state from server snapshot. + * If a user’s `last_read` is ahead of `last_delivered_at`, the tracker enforces + * the invariant `lastDeliveredRef >= lastReadRef`. + * - `onMessageRead(user, readAtISO)`: + * Advances the user’s read; also bumps delivered to match if needed. + * - `onMessageDelivered(user, deliveredAtISO)`: + * Advances the user’s delivered to `max(currentRead, deliveredAt)`. + * + * Queries + * ------- + * - `readersForMessage(msgRef) : UserResponse[]` → users with `lastReadRef >= msgRef` + * - `deliveredForMessage(msgRef) : UserResponse[]` → users with `lastDeliveredRef >= msgRef` + * - `deliveredNotReadForMessage(msgRef): UserResponse[]` → delivered but `lastReadRef < msgRef` + * - `usersWhoseLastReadIs : UserResponse[]` → users for whom `msgRef` is their *last read* (exact match) + * - `usersWhoseLastDeliveredIs : UserResponse[]` → users for whom `msgRef` is their *last delivered* (exact match) + * - `groupUsersByLastReadMessage : Record → mapping of messages to their readers + * - `groupUsersByLastDeliveredMessage : Record → mapping of messages to their receivers + * - `hasUserRead(msgRef, userId) : boolean` + * - `hasUserDelivered(msgRef, userId) : boolean` + * + * Complexity + * ---------- + * - Update on read/delivered: **O(log U)** (binary search + one splice) per event, where U is count of users stored by tracker. + * - Query lists: **O(log U + K)** where `K` is the number of returned users (suffix length). + * - Memory: **O(U)** - tracker’s memory grows linearly with the number of users in the channel/thread and does not depend on the number of messages. + * + * Scope & notes + * ------------- + * - One tracker instance is **scoped to a single timeline**. Instantiate per channel/thread. + * - Ordering is by **ascending timestamp**; ties are kept stable by inserting at the end of the + * equal-timestamp plateau (upper-bound insertion), preserving intuitive arrival order. + * - This tracker models **others’ progress toward own messages**; + */ +export class MessageReceiptsTracker { + private byUser = new Map(); + private readSorted: UserProgress[] = []; // asc by lastReadRef + private deliveredSorted: UserProgress[] = []; // asc by lastDeliveredRef + private locateMessage: OwnMessageReceiptsTrackerMessageLocator; + + constructor({ locateMessage }: OwnMessageReceiptsTrackerOptions) { + this.locateMessage = locateMessage; + } + + /** Build initial state from server snapshots (single pass + sort). */ + ingestInitial(responses: ReadResponse[]) { + this.byUser.clear(); + this.readSorted = []; + this.deliveredSorted = []; + for (const r of responses) { + const lastReadTimestamp = r.last_read ? new Date(r.last_read).getTime() : null; + const lastDeliveredTimestamp = r.last_delivered_at + ? new Date(r.last_delivered_at).getTime() + : null; + const lastReadRef = lastReadTimestamp + ? (this.locateMessage(lastReadTimestamp) ?? MIN_REF) + : MIN_REF; + let lastDeliveredRef = lastDeliveredTimestamp + ? (this.locateMessage(lastDeliveredTimestamp) ?? MIN_REF) + : MIN_REF; + const isReadAfterDelivered = compareRefsAsc(lastDeliveredRef, lastReadRef) < 0; + if (isReadAfterDelivered) lastDeliveredRef = lastReadRef; + + const userProgress: UserProgress = { user: r.user, lastReadRef, lastDeliveredRef }; + this.byUser.set(r.user.id, userProgress); + this.readSorted.splice( + findIndex(this.readSorted, lastReadRef, (up) => up.lastReadRef), + 0, + userProgress, + ); + this.deliveredSorted.splice( + findIndex(this.deliveredSorted, lastDeliveredRef, (up) => up.lastDeliveredRef), + 0, + userProgress, + ); + } + } + + /** message.delivered — user device confirmed delivery up to and including messageId. */ + onMessageDelivered({ + user, + deliveredAt, + lastDeliveredMessageId, + }: { + user: UserResponse; + deliveredAt: string; + lastDeliveredMessageId?: string; + }) { + const timestampMs = new Date(deliveredAt).getTime(); + const msgRef = lastDeliveredMessageId + ? { timestampMs, msgId: lastDeliveredMessageId } + : this.locateMessage(new Date(deliveredAt).getTime()); + if (!msgRef) return; + const userProgress = this.ensureUser(user); + + const newDelivered = + compareRefsAsc(msgRef, userProgress.lastReadRef) < 0 + ? userProgress.lastReadRef + : msgRef; // max(read, loc) + // newly announced delivered is older than or equal what is already registered + if (compareRefsAsc(newDelivered, userProgress.lastDeliveredRef) <= 0) return; + + removeByOldKey( + this.deliveredSorted, + userProgress, + userProgress.lastDeliveredRef, + (x) => x.lastDeliveredRef, + ); + userProgress.lastDeliveredRef = newDelivered; + insertByKey(this.deliveredSorted, userProgress, (x) => x.lastDeliveredRef); + } + + /** message.read — user read up to and including messageId. */ + onMessageRead({ + user, + readAt, + lastReadMessageId, + }: { + user: UserResponse; + readAt: string; + lastReadMessageId?: string; + }) { + const timestampMs = new Date(readAt).getTime(); + const msgRef = lastReadMessageId + ? { timestampMs, msgId: lastReadMessageId } + : this.locateMessage(timestampMs); + if (!msgRef) return; + const userProgress = this.ensureUser(user); + // newly announced read message is older than or equal the already recorded last read message + if (compareRefsAsc(msgRef, userProgress.lastReadRef) <= 0) return; + + // move in readSorted + removeByOldKey( + this.readSorted, + userProgress, + userProgress.lastReadRef, + (x) => x.lastReadRef, + ); + userProgress.lastReadRef = msgRef; + insertByKey(this.readSorted, userProgress, (x) => x.lastReadRef); + + // keep delivered >= read + if (compareRefsAsc(userProgress.lastDeliveredRef, userProgress.lastReadRef) < 0) { + removeByOldKey( + this.deliveredSorted, + userProgress, + userProgress.lastDeliveredRef, + (x) => x.lastDeliveredRef, + ); + userProgress.lastDeliveredRef = userProgress.lastReadRef; + insertByKey(this.deliveredSorted, userProgress, (x) => x.lastDeliveredRef); + } + } + + /** notification.mark_unread — user marked messages unread starting at `first_unread_message_id`. + * Sets lastReadRef to the event’s last_read_* values. Delivery never moves backward. + * The event is sent only to the user that triggered the action (own user), so we will never adjust read ref + * for other users - we will not see changes in the UI for other users. However, this implementation does not + * take into consideration this fact and is ready to handle the mark-unread event for any user. + */ + onNotificationMarkUnread({ + user, + lastReadAt, + lastReadMessageId, + }: { + user: UserResponse; + lastReadAt?: string; + lastReadMessageId?: string; + }) { + const userProgress = this.ensureUser(user); + + const newReadRef: MsgRef = lastReadAt + ? { timestampMs: new Date(lastReadAt).getTime(), msgId: lastReadMessageId ?? '' } + : { ...MIN_REF }; + + // If no change, exit early. + if ( + compareRefsAsc(newReadRef, userProgress.lastReadRef) === 0 && + newReadRef.msgId === userProgress.lastReadRef.msgId + ) { + return; + } + + removeByOldKey( + this.readSorted, + userProgress, + userProgress.lastReadRef, + (x) => x.lastReadRef, + ); + userProgress.lastReadRef = newReadRef; + insertByKey(this.readSorted, userProgress, (x) => x.lastReadRef); + + // Maintain invariant delivered >= read. + if (compareRefsAsc(userProgress.lastDeliveredRef, userProgress.lastReadRef) < 0) { + removeByOldKey( + this.deliveredSorted, + userProgress, + userProgress.lastDeliveredRef, + (x) => x.lastDeliveredRef, + ); + userProgress.lastDeliveredRef = userProgress.lastReadRef; + insertByKey(this.deliveredSorted, userProgress, (x) => x.lastDeliveredRef); + } + } + + /** All users who READ this message. */ + readersForMessage(msgRef: MsgRef): UserResponse[] { + const index = findIndex(this.readSorted, msgRef, ({ lastReadRef }) => lastReadRef); + return this.readSorted.slice(index).map((x) => x.user); + } + + /** All users who have it DELIVERED (includes readers). */ + deliveredForMessage(msgRef: MsgRef): UserResponse[] { + const pos = findIndex( + this.deliveredSorted, + msgRef, + ({ lastDeliveredRef }) => lastDeliveredRef, + ); + return this.deliveredSorted.slice(pos).map((x) => x.user); + } + + /** Users who delivered but have NOT read. */ + deliveredNotReadForMessage(msgRef: MsgRef): UserResponse[] { + const pos = findIndex( + this.deliveredSorted, + msgRef, + ({ lastDeliveredRef }) => lastDeliveredRef, + ); + const usersDeliveredNotRead: UserResponse[] = []; + for (let i = pos; i < this.deliveredSorted.length; i++) { + const userProgress = this.deliveredSorted[i]; + if (compareRefsAsc(userProgress.lastReadRef, msgRef) < 0) + usersDeliveredNotRead.push(userProgress.user); + } + return usersDeliveredNotRead; + } + + /** Users for whom `msgRef` is their *last read* (exact match). */ + usersWhoseLastReadIs(msgRef: MsgRef): UserResponse[] { + if (!msgRef.msgId) return []; + const start = findIndex(this.readSorted, msgRef, (x) => x.lastReadRef); + const end = findUpperIndex(this.readSorted, msgRef, (x) => x.lastReadRef); + const users: UserResponse[] = []; + for (let i = start; i < end; i++) { + const up = this.readSorted[i]; + if (up.lastReadRef.msgId === msgRef.msgId) users.push(up.user); + } + return users; + } + + /** Users for whom `msgRef` is their *last delivered* (exact match). */ + usersWhoseLastDeliveredIs(msgRef: MsgRef): UserResponse[] { + if (!msgRef.msgId) return []; + const start = findIndex(this.deliveredSorted, msgRef, (x) => x.lastDeliveredRef); + const end = findUpperIndex(this.deliveredSorted, msgRef, (x) => x.lastDeliveredRef); + const users: UserResponse[] = []; + for (let i = start; i < end; i++) { + const up = this.deliveredSorted[i]; + if (up.lastDeliveredRef.msgId === msgRef.msgId) users.push(up.user); + } + return users; + } + + // ---- queries: per-user status ---- + + hasUserRead(msgRef: MsgRef, userId: string): boolean { + const up = this.byUser.get(userId); + return !!up && compareRefsAsc(up.lastReadRef, msgRef) >= 0; + } + + hasUserDelivered(msgRef: MsgRef, userId: string): boolean { + const up = this.byUser.get(userId); + return !!up && compareRefsAsc(up.lastDeliveredRef, msgRef) >= 0; + } + + getUserProgress(userId: string): UserProgress | null { + const userProgress = this.byUser.get(userId); + if (!userProgress) return null; + return userProgress; + } + + groupUsersByLastReadMessage(): Record { + return Array.from(this.byUser.values()).reduce>( + (acc, userProgress) => { + const msgId = userProgress.lastReadRef.msgId; + if (!msgId) return acc; + if (!acc[msgId]) acc[msgId] = []; + acc[msgId].push(userProgress.user); + return acc; + }, + {}, + ); + } + + groupUsersByLastDeliveredMessage(): Record { + return Array.from(this.byUser.values()).reduce>( + (acc, userProgress) => { + const msgId = userProgress.lastDeliveredRef.msgId; + if (!msgId) return acc; + if (!acc[msgId]) acc[msgId] = []; + acc[msgId].push(userProgress.user); + return acc; + }, + {}, + ); + } + + private ensureUser(user: UserResponse): UserProgress { + let up = this.byUser.get(user.id); + if (!up) { + up = { user, lastReadRef: MIN_REF, lastDeliveredRef: MIN_REF }; + this.byUser.set(user.id, up); + insertByKey(this.readSorted, up, (x) => x.lastReadRef); + insertByKey(this.deliveredSorted, up, (x) => x.lastDeliveredRef); + } + return up; + } +} diff --git a/src/messageDelivery/index.ts b/src/messageDelivery/index.ts new file mode 100644 index 000000000..46907ca04 --- /dev/null +++ b/src/messageDelivery/index.ts @@ -0,0 +1,2 @@ +export * from './MessageDeliveryReporter'; +export * from './MessageReceiptsTracker'; diff --git a/src/thread.ts b/src/thread.ts index 9e30c226b..bf6f77812 100644 --- a/src/thread.ts +++ b/src/thread.ts @@ -534,7 +534,7 @@ export class Thread extends WithSubscriptions { return null; } - return await this.channel.markRead({ thread_id: this.id }); + return await this.client.messageDeliveryReporter.markRead(this); }; private throttledMarkAsRead = throttle( diff --git a/src/types.ts b/src/types.ts index 83f547082..c595813e3 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1468,8 +1468,8 @@ export type Event = CustomEventData & { ai_state?: AIState; channel?: ChannelResponse; channel_custom?: CustomChannelData; - channel_member_count?: number; channel_id?: string; + channel_member_count?: number; channel_type?: string; cid?: string; clear_history?: boolean; diff --git a/src/utils.ts b/src/utils.ts index 0aae6ce8d..ef255ccb7 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -748,7 +748,8 @@ export const debounce = any>( }; // works exactly the same as lodash.throttle -export const throttle = unknown>( +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const throttle = any>( fn: T, timeout = 200, { leading = true, trailing = false }: { leading?: boolean; trailing?: boolean } = {}, diff --git a/test/unit/channel.test.js b/test/unit/channel.test.js index e0bace4c8..87bbb5492 100644 --- a/test/unit/channel.test.js +++ b/test/unit/channel.test.js @@ -124,11 +124,11 @@ describe('Channel count unread', function () { it('countUnread should return correct count when multiple message sets are loaded into state', () => { expect(channel.countUnread(lastRead)).to.be.equal(0); channel.state.addMessagesSorted([ - generateMsg({ date: '2021-01-01T00:00:00' }), - generateMsg({ date: '2022-01-01T00:00:00' }), + generateMsg({ date: '2026-01-01T00:00:00' }), + generateMsg({ date: '2026-02-01T00:00:00' }), ]); channel.state.addMessagesSorted( - [generateMsg({ date: '2020-01-01T00:00:00' })], + [generateMsg({ date: '2006-01-01T00:00:00' })], false, true, true, @@ -165,7 +165,7 @@ describe('Channel count unread', function () { generateMsg({ date: '2022-01-01T00:00:00' }), ]); channel.state.addMessagesSorted( - [generateMsg({ date: '2020-01-01T00:00:00' })], + [generateMsg({ date: '2010-01-01T00:00:00' })], false, true, true, @@ -355,6 +355,8 @@ describe('Channel _handleChannelEvent', function () { expect(channel.state.unreadCount).to.be.equal(30); }); + it('does not override the delivery information in the read status', () => {}); + it('message.truncate removes all messages if "truncated_at" is "now"', function () { const messages = [ { created_at: '2021-01-01T00:01:00' }, @@ -652,6 +654,8 @@ describe('Channel _handleChannelEvent', function () { last_read_message_id: '6', user, unread_messages: initialCountUnread, + last_delivered_at: new Date(1000).toISOString(), + last_delivered_message_id: 'delivered-msg-id', }; notificationMarkUnreadEvent = { type: 'notification.mark_unread', @@ -690,6 +694,12 @@ describe('Channel _handleChannelEvent', function () { expect(channel.state.read[user.id].unread_messages).to.be.equal( event.unread_messages, ); + expect(channel.state.read[user.id].last_delivered_at).toBe( + initialReadState.last_delivered_at, + ); + expect(channel.state.read[user.id].last_delivered_message_id).toBe( + initialReadState.last_delivered_message_id, + ); }); it('should not update channel read state produced for another user or user is missing', () => { @@ -718,6 +728,160 @@ describe('Channel _handleChannelEvent', function () { }); }); + describe('message.read', () => { + let initialCountUnread; + let initialReadState; + let messageReadEvent; + + beforeEach(() => { + initialCountUnread = 100; + initialReadState = { + last_read: new Date(1500).toISOString(), + last_read_message_id: '6', + user, + unread_messages: initialCountUnread, + last_delivered_at: new Date(1000).toISOString(), + last_delivered_message_id: 'delivered-msg-id', + }; + messageReadEvent = { + type: 'message.read', + created_at: new Date(2000).toISOString(), + cid: channel.cid, + channel_member_count: 100, + channel_type: channel.type, + channel_id: channel.id, + user, + last_read_message_id: '6b1006ad-7a6d-49d1-82d9-5ee5e8167e49', + }; + }); + + it('should update channel read state produced for current user', () => { + channel.state.unreadCount = initialCountUnread; + channel.state.read[user.id] = initialReadState; + const event = messageReadEvent; + + channel._handleChannelEvent(event); + + expect(channel.state.unreadCount).toBe(0); + expect(new Date(channel.state.read[user.id].last_read).getTime()).toBe( + new Date(messageReadEvent.created_at).getTime(), + ); + expect(channel.state.read[user.id].last_read_message_id).toBe( + event.last_read_message_id, + ); + expect(channel.state.read[user.id].unread_messages).toBe(0); + expect(channel.state.read[user.id].last_delivered_at).toBe( + initialReadState.last_delivered_at, + ); + expect(channel.state.read[user.id].last_delivered_message_id).toBe( + initialReadState.last_delivered_message_id, + ); + }); + + it('should update channel read state produced for another user', () => { + const anotherUser = { id: 'another-user' }; + channel.state.unreadCount = initialCountUnread; + channel.state.read[anotherUser.id] = initialReadState; + const event = { ...messageReadEvent, user: anotherUser }; + + channel._handleChannelEvent(event); + + expect(channel.state.unreadCount).toBe(initialCountUnread); + expect(new Date(channel.state.read[anotherUser.id].last_read).getTime()).toBe( + new Date(messageReadEvent.created_at).getTime(), + ); + expect(channel.state.read[anotherUser.id].last_read_message_id).toBe( + event.last_read_message_id, + ); + expect(channel.state.read[anotherUser.id].unread_messages).toBe(0); + expect(channel.state.read[anotherUser.id].last_delivered_at).toBe( + initialReadState.last_delivered_at, + ); + expect(channel.state.read[anotherUser.id].last_delivered_message_id).toBe( + initialReadState.last_delivered_message_id, + ); + }); + }); + + describe('message.delivered', () => { + let initialCountUnread; + let initialReadState; + let messageDeliveredEvent; + + beforeEach(() => { + initialCountUnread = 100; + initialReadState = { + last_read: new Date(1500).toISOString(), + last_read_message_id: '6', + user, + unread_messages: initialCountUnread, + last_delivered_at: new Date(1000).toISOString(), + last_delivered_message_id: 'delivered-msg-id', + }; + messageDeliveredEvent = { + type: 'message.delivered', + created_at: new Date(2000).toISOString(), + cid: channel.cid, + channel_member_count: 100, + channel_type: channel.type, + channel_id: channel.id, + user, + last_delivered_message_id: 'fd403be5-9207-48db-8bd7-13bd65ffbea6', + last_delivered_at: new Date(2000).toISOString(), + }; + }); + + it('should update channel read state produced for current user', () => { + channel.state.unreadCount = initialCountUnread; + channel.state.read[user.id] = initialReadState; + + channel._handleChannelEvent(messageDeliveredEvent); + + expect(channel.state.unreadCount).toBe(initialReadState.unread_messages); + expect(new Date(channel.state.read[user.id].last_read).getTime()).toBe( + new Date(initialReadState.last_read).getTime(), + ); + expect(channel.state.read[user.id].last_read_message_id).toBe( + initialReadState.last_read_message_id, + ); + expect(channel.state.read[user.id].unread_messages).toBe( + initialReadState.unread_messages, + ); + expect(new Date(channel.state.read[user.id].last_delivered_at).getTime()).toBe( + new Date(messageDeliveredEvent.last_delivered_at).getTime(), + ); + expect(channel.state.read[user.id].last_delivered_message_id).toBe( + messageDeliveredEvent.last_delivered_message_id, + ); + }); + + it('should update channel read state produced for another user', () => { + const anotherUser = { id: 'another-user' }; + channel.state.unreadCount = initialCountUnread; + channel.state.read[anotherUser.id] = initialReadState; + const event = { ...messageDeliveredEvent, user: anotherUser }; + + channel._handleChannelEvent(event); + + expect(channel.state.unreadCount).toBe(initialCountUnread); + expect(new Date(channel.state.read[anotherUser.id].last_read).getTime()).toBe( + new Date(initialReadState.last_read).getTime(), + ); + expect(channel.state.read[anotherUser.id].last_read_message_id).toBe( + initialReadState.last_read_message_id, + ); + expect(channel.state.read[anotherUser.id].unread_messages).toBe( + initialReadState.unread_messages, + ); + expect( + new Date(channel.state.read[anotherUser.id].last_delivered_at).getTime(), + ).toBe(new Date(event.last_delivered_at).getTime()); + expect(channel.state.read[anotherUser.id].last_delivered_message_id).toBe( + event.last_delivered_message_id, + ); + }); + }); + it('should include unread_messages for message events from another user', () => { channel.state.read['id'] = { unread_messages: 2, diff --git a/test/unit/channel_state.test.js b/test/unit/channel_state.test.js index 5a2e9f4aa..165b8c8ca 100644 --- a/test/unit/channel_state.test.js +++ b/test/unit/channel_state.test.js @@ -10,6 +10,8 @@ import { generateUUIDv4 as uuidv4 } from '../../src/utils'; import { vi, describe, beforeEach, afterEach, it, expect } from 'vitest'; +const toISOString = (timestampMs) => new Date(timestampMs).toISOString(); + describe('ChannelState addMessagesSorted', function () { it('empty state add single messages', async function () { const state = new ChannelState(); @@ -219,12 +221,15 @@ describe('ChannelState addMessagesSorted', function () { it('should add messages to new message set', () => { const state = new ChannelState(); state.addMessagesSorted([ - generateMsg({ id: '12' }), - generateMsg({ id: '13' }), - generateMsg({ id: '14' }), + generateMsg({ id: '12', date: toISOString(100) }), + generateMsg({ id: '13', date: toISOString(200) }), + generateMsg({ id: '14', date: toISOString(300) }), ]); state.addMessagesSorted( - [generateMsg({ id: '0' }), generateMsg({ id: '1' })], + [ + generateMsg({ id: '0', date: toISOString(1000) }), + generateMsg({ id: '1', date: toISOString(1100) }), + ], false, false, true, @@ -235,9 +240,13 @@ describe('ChannelState addMessagesSorted', function () { expect(state.messages[0].id).to.be.equal('12'); expect(state.messages[1].id).to.be.equal('13'); expect(state.messages[2].id).to.be.equal('14'); - expect(state.messageSets[1].messages.length).to.be.equal(2); - expect(state.messageSets[1].messages[0].id).to.be.equal('0'); - expect(state.messageSets[1].messages[1].id).to.be.equal('1'); + // set with ids 0,1 is added at the beginning as the newest set is inserted earlier + expect(state.messageSets[0].messages.map((m) => m.id)).toStrictEqual(['0', '1']); + expect(state.messageSets[1].messages.map((m) => m.id)).toStrictEqual([ + '12', + '13', + '14', + ]); }); it('should add messages to current message set', () => { @@ -276,17 +285,165 @@ describe('ChannelState addMessagesSorted', function () { expect(state.latestMessages[2].id).to.be.equal('14'); }); + it('adds message page sorted', () => { + const state = new ChannelState(); + + // load first page + state.addMessagesSorted( + [ + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), + generateMsg({ id: '14', date: toISOString(1400) }), + ], + false, + false, + true, + 'latest', + ); + + // jump to a start + state.addMessagesSorted( + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '2', date: toISOString(200) }), + ], + false, + false, + true, + 'new', + ); + state.messageSets[0].isCurrent = false; + state.messageSets[1].isCurrent = true; + // jump to a end + + state.addMessagesSorted( + [generateMsg({ id: '10', date: toISOString(1000) })], + false, + false, + true, + 'new', + ); + + state.addMessagesSorted( + [ + generateMsg({ id: '8', date: toISOString(800) }), + generateMsg({ id: '9', date: toISOString(900) }), + ], + false, + false, + true, + 'new', + ); + + state.addMessagesSorted( + [ + generateMsg({ id: '4', date: toISOString(400) }), + generateMsg({ id: '5', date: toISOString(500) }), + generateMsg({ id: '6', date: toISOString(600) }), + ], + false, + false, + true, + 'new', + ); + + state.addMessagesSorted( + [generateMsg({ id: '1500', date: toISOString(1500) })], + false, + false, + true, + 'new', + ); + + const toTimestamp = (msg) => new Date(msg.created_at).getTime(); + expect(state.messageSets.length).to.eql(6); + expect(state.messageSets[0].messages.map(toTimestamp)).toStrictEqual([1500]); + expect(state.messageSets[1].messages.map(toTimestamp)).toStrictEqual([ + 1200, 1300, 1400, + ]); + expect(state.messageSets[2].messages.map(toTimestamp)).toStrictEqual([1000]); + expect(state.messageSets[3].messages.map(toTimestamp)).toStrictEqual([800, 900]); + expect(state.messageSets[4].messages.map(toTimestamp)).toStrictEqual([400, 500, 600]); + expect(state.messageSets[5].messages.map(toTimestamp)).toStrictEqual([100, 200]); + }); + + it('inputs messages pertaining to different sets into corresponding message set and breaks the state', () => { + const state = new ChannelState(); + + // load first page + state.addMessagesSorted( + [ + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '14', date: toISOString(1400) }), + ], + false, + false, + true, + 'latest', + ); + + state.addMessagesSorted( + [ + generateMsg({ id: '6', date: toISOString(600) }), + generateMsg({ id: '8', date: toISOString(800) }), + ], + false, + false, + true, + 'new', + ); + + state.addMessagesSorted( + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '3', date: toISOString(300) }), + ], + false, + false, + true, + 'new', + ); + + state.addMessagesSorted( + [ + generateMsg({ id: '7', date: 700 }), + generateMsg({ id: '2', date: 200 }), + generateMsg({ id: '13', date: toISOString(1300) }), + ], + false, + false, + true, + 'new', + ); + + const toTimestamp = (msg) => new Date(msg.created_at).getTime(); + expect(state.messageSets.length).to.eql(4); + expect(state.messageSets[0].messages.map(toTimestamp)).toStrictEqual([1200, 1400]); + expect(state.messageSets[1].messages.map(toTimestamp)).toStrictEqual([ + 200, 700, 1300, + ]); + expect(state.messageSets[2].messages.map(toTimestamp)).toStrictEqual([600, 800]); + expect(state.messageSets[3].messages.map(toTimestamp)).toStrictEqual([100, 300]); + }); + it(`should add messages to latest message set when it's not currently active`, () => { const state = new ChannelState(); state.addMessagesSorted( - [generateMsg({ id: '12' }), generateMsg({ id: '13' }), generateMsg({ id: '14' })], + [ + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), + generateMsg({ id: '14', date: toISOString(1400) }), + ], false, false, true, 'latest', ); state.addMessagesSorted( - [generateMsg({ id: '0' }), generateMsg({ id: '1' })], + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '2', date: toISOString(200) }), + ], false, false, true, @@ -294,10 +451,91 @@ describe('ChannelState addMessagesSorted', function () { ); state.messageSets[0].isCurrent = false; state.messageSets[1].isCurrent = true; - state.addMessagesSorted([generateMsg({ id: '15' })], false, false, true, 'latest'); + state.addMessagesSorted( + [generateMsg({ id: '15', date: toISOString(1500) })], + false, + false, + true, + 'latest', + ); + + expect(state.latestMessages.map((m) => m.id)).toStrictEqual(['12', '13', '14', '15']); + }); + + it('adjusts the latest set flag according to actual message creation date', () => { + const state = new ChannelState(); + state.addMessagesSorted( + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '2', date: toISOString(200) }), + ], + false, + false, + true, + 'latest', + ); + expect(state.latestMessages.map((m) => m.id)).toStrictEqual(['1', '2']); + + state.addMessagesSorted( + [ + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), + generateMsg({ id: '14', date: toISOString(1400) }), + ], + false, + false, + true, + 'new', + ); + expect(state.latestMessages.map((m) => m.id)).toStrictEqual(['12', '13', '14']); + expect(state.messageSets.filter((s) => s.isLatest).length).toBe(1); + }); + + it("the messageSetToAddToIfDoesNotExist: 'latest' should be ignored if the messages do not belong to the latest set based on their creation timestamp", () => { + const state = new ChannelState(); + state.addMessagesSorted( + [ + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), + generateMsg({ id: '14', date: toISOString(1400) }), + ], + false, + false, + true, + 'latest', + ); + state.addMessagesSorted( + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '2', date: toISOString(200) }), + ], + false, + false, + true, + 'new', + ); + expect(state.messageSets[0].isCurrent).toBeTruthy(); + expect(state.messageSets[1].isCurrent).toBeFalsy(); - expect(state.latestMessages.length).to.be.equal(4); - expect(state.latestMessages[3].id).to.be.equal('15'); + state.addMessagesSorted( + [generateMsg({ id: '15', date: toISOString(150) })], + false, + false, + true, + 'latest', + ); + + expect(state.messageSets[0].messages.map((m) => m.id)).toStrictEqual([ + '12', + '13', + '14', + ]); + expect(state.latestMessages.map((m) => m.id)).toStrictEqual(['12', '13', '14']); + expect(state.messageSets[1].messages.map((m) => m.id)).toStrictEqual([ + '1', + '15', + '2', + ]); }); it(`shouldn't create new message set for thread replies`, () => { @@ -407,19 +645,33 @@ describe('ChannelState addMessagesSorted', function () { it(`should do nothing if message is not available locally`, () => { const state = new ChannelState(); state.addMessagesSorted([ - generateMsg({ id: '12' }), - generateMsg({ id: '13' }), - generateMsg({ id: '14' }), + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), + generateMsg({ id: '14', date: toISOString(1400) }), ]); - state.addMessagesSorted([generateMsg({ id: '5' })], false, false, true, 'new'); state.addMessagesSorted( - [generateMsg({ id: '1' }), generateMsg({ id: '2' })], + [generateMsg({ id: '5', date: toISOString(500) })], + false, + false, + true, + 'new', + ); + state.addMessagesSorted( + [ + generateMsg({ id: '1', date: toISOString(100) }), + generateMsg({ id: '2', date: toISOString(200) }), + ], false, false, true, 'new', ); - state.addMessagesSorted([generateMsg({ id: '8' })], false, false, false); + state.addMessagesSorted( + [generateMsg({ id: '8', date: toISOString(800) })], + false, + false, + false, + ); expect(state.latestMessages.length).to.be.equal(3); expect(state.messages.length).to.be.equal(3); @@ -501,18 +753,18 @@ describe('ChannelState addMessagesSorted', function () { it('when new messages overlap with latest messages', () => { const state = new ChannelState(); const overlap = [ - generateMsg({ id: '11', date: '2020-01-01T00:00:10.001Z' }), - generateMsg({ id: '12', date: '2020-01-01T00:00:21.002Z' }), - generateMsg({ id: '13', date: '2020-01-01T00:00:24.003Z' }), + generateMsg({ id: '11', date: toISOString(1100) }), + generateMsg({ id: '12', date: toISOString(1200) }), + generateMsg({ id: '13', date: toISOString(1300) }), ]; const messages = [ ...overlap, - generateMsg({ id: '14', date: '2020-01-01T00:00:33.000Z' }), - generateMsg({ id: '15', date: '2020-01-01T00:00:43.000Z' }), + generateMsg({ id: '14', date: toISOString(1400) }), + generateMsg({ id: '15', date: toISOString(1500) }), ]; state.addMessagesSorted(messages); const newMessages = [ - generateMsg({ id: '10', date: '2020-01-01T00:00:03.000Z' }), + generateMsg({ id: '10', date: toISOString(1000) }), ...overlap, ]; state.addMessagesSorted(newMessages, false, true, true, 'new'); @@ -559,39 +811,32 @@ describe('ChannelState addMessagesSorted', function () { it('when new messages overlap with messages, but not current or latest messages', () => { const state = new ChannelState(); - const overlap = [generateMsg({ id: '11', date: '2020-01-01T00:00:10.001Z' })]; - const latestMessages = [ - generateMsg({ id: '20', date: '2020-01-01T00:10:10.001Z' }), - ]; + const overlap = [generateMsg({ id: '11', date: toISOString(1100) })]; + const latestMessages = [generateMsg({ id: '20', date: toISOString(2000) })]; state.addMessagesSorted(latestMessages); - const currentMessages = [ - generateMsg({ id: '8', date: '2020-01-01T00:00:03.001Z' }), - ]; + const currentMessages = [generateMsg({ id: '8', date: toISOString(800) })]; state.addMessagesSorted(currentMessages, false, true, true, 'new'); state.messageSets[0].isCurrent = false; state.messageSets[1].isCurrent = true; const otherMessages = [ - generateMsg({ id: '10', date: '2020-01-01T00:00:09.001Z' }), + generateMsg({ id: '10', date: toISOString(1000) }), ...overlap, ]; state.addMessagesSorted(otherMessages, false, true, true, 'new'); const newMessages = [ ...overlap, - generateMsg({ id: '12', date: '2020-01-01T00:00:11.001Z' }), + generateMsg({ id: '12', date: toISOString(1200) }), ]; state.addMessagesSorted(newMessages, false, true, true, 'new'); - expect(state.latestMessages.length).to.be.equal(1); - expect(state.latestMessages[0].id).to.be.equal('20'); - expect(state.messages.length).to.be.equal(1); - expect(state.messages[0].id).to.be.equal('8'); expect(state.messageSets.length).to.be.equal(3); - expect(state.messageSets[0].messages).to.be.equal(state.latestMessages); - expect(state.messageSets[1].messages).to.be.equal(state.messages); - expect(state.messageSets[2].messages.length).to.be.equal(3); - expect(state.messageSets[2].messages[0].id).to.be.equal('10'); - expect(state.messageSets[2].messages[1].id).to.be.equal('11'); - expect(state.messageSets[2].messages[2].id).to.be.equal('12'); + expect(state.latestMessages.map(({ id }) => id)).toStrictEqual(['20']); + expect(state.messages.map(({ id }) => id)).toStrictEqual(['8']); + expect(state.messageSets.map((s) => s.messages.map(({ id }) => id))).toStrictEqual([ + ['20'], + ['10', '11', '12'], + ['8'], + ]); }); it('when current messages overlap with latest', () => { @@ -1099,7 +1344,7 @@ describe('ChannelState clean', () => { cid: channel.cid, type: 'typing.start', user: { id: 'other' }, - received_at: new Date(Date.now() - 10000).toISOString(), + received_at: toISOString(Date.now() - 10000), }); expect(channel.state.typing['other']).not.to.be.undefined; @@ -1243,12 +1488,12 @@ describe('latestMessages', () => { it('should return latest messages - if they are not the current message set', () => { const state = new ChannelState(); const latestMessages = [ - generateMsg({ id: '1' }), - generateMsg({ id: '2' }), - generateMsg({ id: '3' }), + generateMsg({ id: '2', date: toISOString(200) }), + generateMsg({ id: '3', date: toISOString(300) }), + generateMsg({ id: '4', date: toISOString(400) }), ]; state.addMessagesSorted(latestMessages); - const newMessages = [generateMsg({ id: '0' })]; + const newMessages = [generateMsg({ id: '1', date: toISOString(100) })]; state.addMessagesSorted(newMessages, false, true, true, 'new'); state.messageSets[0].isCurrent = false; state.messageSets[1].isCurrent = true; @@ -1262,16 +1507,16 @@ describe('latestMessages', () => { it('should return latest messages - if they are not the current message set and new messages received', () => { const state = new ChannelState(); const latestMessages = [ - generateMsg({ id: '1' }), - generateMsg({ id: '2' }), - generateMsg({ id: '3' }), + generateMsg({ id: '2', date: toISOString(200) }), + generateMsg({ id: '3', date: toISOString(300) }), + generateMsg({ id: '4', date: toISOString(400) }), ]; state.addMessagesSorted(latestMessages); - const newMessages = [generateMsg({ id: '0' })]; + const newMessages = [generateMsg({ id: '1', date: toISOString(100) })]; state.addMessagesSorted(newMessages, false, true, true, 'new'); state.messageSets[0].isCurrent = false; state.messageSets[1].isCurrent = true; - const latestMessage = generateMsg({ id: '4' }); + const latestMessage = generateMsg({ id: '5', date: toISOString(500) }); state.addMessagesSorted([latestMessage], false, true, true, 'latest'); expect(state.latestMessages.length).to.be.equal(latestMessages.length + 1); @@ -1307,8 +1552,20 @@ describe('loadMessageIntoState', () => { it('should switch message sets if message is available locally, but in a different set', async () => { const state = new ChannelState(); - state.addMessagesSorted([generateMsg({ id: '8' })], false, true, true, 'latest'); - state.addMessagesSorted([generateMsg({ id: '5' })], false, true, true, 'new'); + state.addMessagesSorted( + [generateMsg({ id: '8', date: toISOString(800) })], + false, + true, + true, + 'latest', + ); + state.addMessagesSorted( + [generateMsg({ id: '5', date: toISOString(500) })], + false, + true, + true, + 'new', + ); await state.loadMessageIntoState('5'); expect(state.messageSets[0].isCurrent).to.be.equal(false); @@ -1317,8 +1574,20 @@ describe('loadMessageIntoState', () => { it('should switch to latest message set', async () => { const state = new ChannelState(); - state.addMessagesSorted([generateMsg({ id: '8' })], false, true, true, 'latest'); - state.addMessagesSorted([generateMsg({ id: '5' })], false, true, true, 'new'); + state.addMessagesSorted( + [generateMsg({ id: '8', date: toISOString(800) })], + false, + true, + true, + 'latest', + ); + state.addMessagesSorted( + [generateMsg({ id: '5', date: toISOString(500) })], + false, + true, + true, + 'new', + ); state.messageSets[0].isCurrent = false; state.messageSets[1].isCurrent = true; await state.loadMessageIntoState('latest'); @@ -1328,8 +1597,11 @@ describe('loadMessageIntoState', () => { it('should load message from backend and switch to the new message set', async () => { const state = new ChannelState(); - state.addMessagesSorted([generateMsg({ id: '5' }), generateMsg({ id: '6' })]); - const newMessages = [generateMsg({ id: '8' })]; + state.addMessagesSorted([ + generateMsg({ id: '5', date: toISOString(500) }), + generateMsg({ id: '6', date: toISOString(600) }), + ]); + const newMessages = [generateMsg({ id: '8', date: toISOString(800) })]; state._channel = { query: () => { state.addMessagesSorted(newMessages, false, true, true, 'new'); @@ -1344,8 +1616,8 @@ describe('loadMessageIntoState', () => { describe('if message is a thread reply', () => { it('should do nothing if parent message and reply are available locally in the current set', async () => { const state = new ChannelState(); - const parentMessage = generateMsg({ id: '5' }); - const reply = generateMsg({ id: '8', parent_id: '5' }); + const parentMessage = generateMsg({ id: '5', date: toISOString(500) }); + const reply = generateMsg({ id: '8', date: toISOString(800), parent_id: '5' }); state.addMessagesSorted([parentMessage]); state.addMessagesSorted([reply]); @@ -1357,8 +1629,8 @@ describe('loadMessageIntoState', () => { it('should change message set if parent message and reply are available locally', async () => { const state = new ChannelState(); - const parentMessage = generateMsg({ id: '5' }); - const reply = generateMsg({ id: '8', parent_id: '5' }); + const parentMessage = generateMsg({ id: '5', date: toISOString(500) }); + const reply = generateMsg({ id: '8', date: toISOString(800), parent_id: '5' }); state.addMessagesSorted([parentMessage]); state.addMessagesSorted([reply]); const otherMessages = [generateMsg(), generateMsg()]; @@ -1389,8 +1661,8 @@ describe('loadMessageIntoState', () => { it('should load parent message and reply from backend, and switch to new message set', async () => { const state = new ChannelState(); - const parentMessage = generateMsg({ id: '5' }); - const reply = generateMsg({ id: '8', parent_id: '5' }); + const parentMessage = generateMsg({ id: '5', date: toISOString(500) }); + const reply = generateMsg({ id: '8', date: toISOString(800), parent_id: '5' }); state._channel = { getReplies: () => state.addMessagesSorted([reply], false, false, true, 'current'), query: () => state.addMessagesSorted([parentMessage], false, true, true, 'new'), @@ -1463,3 +1735,98 @@ describe('findMessage', () => { }); }); }); + +describe('find message by timestamp', () => { + it('finds the message with matching timestamp', () => { + const state = new ChannelState(); + const expectedFoundMsg = generateMsg({ + id: '2', + created_at: toISOString(200), + }); + state.addMessagesSorted([ + generateMsg({ id: '12', created_at: toISOString(1200) }), + generateMsg({ id: '13', created_at: toISOString(1300) }), + generateMsg({ id: '14', created_at: toISOString(1400) }), + ]); + state.addMessagesSorted( + [ + generateMsg({ id: '1', created_at: toISOString(100) }), + expectedFoundMsg, + generateMsg({ id: '3', created_at: toISOString(300) }), + generateMsg({ id: '4', created_at: toISOString(400) }), + ], + false, + false, + true, + 'new', + ); + state.addMessagesSorted( + [ + generateMsg({ id: '6', created_at: toISOString(600) }), + generateMsg({ id: '7', created_at: toISOString(700) }), + ], + false, + false, + true, + 'new', + ); + + const foundMessage = state.findMessageByTimestamp( + new Date(expectedFoundMsg.created_at).getTime(), + ); + expect(foundMessage.id).toBe(expectedFoundMsg.id); + }); + + it('finds the first message if multiple messages with the same timestamp', () => { + const state = new ChannelState(); + const expectedFoundMessage = generateMsg({ + id: '2', + created_at: toISOString(200), + }); + const msgWithSameTimestamp = { ...expectedFoundMessage, id: '3' }; + state.addMessagesSorted([ + generateMsg({ id: '12', created_at: toISOString(1200) }), + generateMsg({ id: '13', created_at: toISOString(1300) }), + generateMsg({ id: '14', created_at: toISOString(1400) }), + ]); + state.addMessagesSorted( + [ + generateMsg({ id: '1', created_at: toISOString(100) }), + expectedFoundMessage, + msgWithSameTimestamp, + generateMsg({ id: '3.5', created_at: toISOString(300) }), + generateMsg({ id: '4', created_at: toISOString(400) }), + ], + false, + false, + true, + 'new', + ); + state.addMessagesSorted( + [ + generateMsg({ id: '6', created_at: toISOString(600) }), + generateMsg({ id: '7', created_at: toISOString(700) }), + ], + false, + false, + true, + 'new', + ); + + const foundMessage = state.findMessageByTimestamp( + new Date(msgWithSameTimestamp.created_at).getTime(), + ); + expect(foundMessage.id).toBe(expectedFoundMessage.id); + }); + + it('returns null if the message is not found', () => { + const state = new ChannelState(); + state.addMessagesSorted([ + generateMsg({ id: '12', created_at: toISOString(1200) }), + generateMsg({ id: '13', created_at: toISOString(1300) }), + generateMsg({ id: '14', created_at: toISOString(1400) }), + ]); + const foundMessage = state.findMessageByTimestamp(200); + expect(foundMessage).toBeNull(); + }); +}); diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 849fc8d6a..60d92534d 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -1432,3 +1432,43 @@ describe('X-Stream-Client header', () => { }); }); }); + +describe('markChannelsDelivered', () => { + let client; + const user = { id: 'user' }; + + beforeEach(() => { + client = new StreamChat('', ''); + + vi.spyOn(client, 'post').mockResolvedValue({ + ok: true, + }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('prevents triggering the request with empty payload', async () => { + await client.markChannelsDelivered(); + expect(client.post).not.toHaveBeenCalled(); + + await client.markChannelsDelivered({}); + expect(client.post).not.toHaveBeenCalled(); + + await client.markChannelsDelivered({ latest_delivered_messages: [] }); + expect(client.post).not.toHaveBeenCalled(); + + await client.markChannelsDelivered({ user, user_id: user.id }); + expect(client.post).not.toHaveBeenCalled(); + }); + + it('triggers the request with at least on channel to report', async () => { + const delivered = [{ cid: 'cid', id: 'message-id' }]; + await client.markChannelsDelivered({ latest_delivered_messages: delivered }); + expect(client.post).toHaveBeenCalledWith( + 'https://chat.stream-io-api.com/channels/delivered', + { latest_delivered_messages: delivered }, + ); + }); +}); diff --git a/test/unit/messageDelivery/MessageDeliveryReporter.test.ts b/test/unit/messageDelivery/MessageDeliveryReporter.test.ts new file mode 100644 index 000000000..a3681381c --- /dev/null +++ b/test/unit/messageDelivery/MessageDeliveryReporter.test.ts @@ -0,0 +1,448 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { getClientWithUser } from '../test-utils/getClient'; +import type { Channel, Event, EventAPIResponse, StreamChat } from '../../../src'; + +const channelType = 'messaging'; +const channelId = 'channelId'; +const ownUser = { + id: 'me', + privacy_settings: { delivery_receipts: { enabled: true } }, +}; + +const otherUser = { + id: 'otherUser', +}; +const mkMsg = (id: string, at: string | number | Date) => + ({ id, created_at: new Date(at) }) as any; + +describe('MessageDeliveryReporter', () => { + let client: StreamChat; + let channel: Channel; + + beforeEach(async () => { + vi.useFakeTimers(); + client = getClientWithUser(ownUser); + (client as any).user.privacy_settings.delivery_receipts.enabled = undefined; + + channel = client.channel(channelType, channelId); + channel.initialized = true; + client.configs[channel.cid] = { + created_at: '', + read_events: true, + reminders: false, + updated_at: '', + }; + }); + + afterEach(() => { + vi.runOnlyPendingTimers(); + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('announces delivery after the buffer window', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({ ok: true } as any); + + // last_read < last message + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + (channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') }; + + client.syncDeliveredCandidates([channel]); + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + + // throttle window (MessageDeliveryReporter uses 1000ms) + vi.advanceTimersByTime(1000); + // trailing request is not triggered as there are no delivery candidates to report + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: channel.cid, + id: 'm1', + }, + ], + }); + }); + + it('announces at max 100 candidates per request', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({ ok: true } as any); + + // last_read < last message + const channels = Array.from({ length: 110 }, (_, i) => { + const channel = client.channel(channelType, i.toString()); + channel.initialized = true; + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + (channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') }; + return channel; + }); + channels.forEach((ch) => { + client.configs[ch.cid] = { + created_at: '', + read_events: true, + reminders: false, + updated_at: '', + }; + }); + + client.syncDeliveredCandidates(channels); + vi.advanceTimersByTime(1000); + // trailing request is not triggered as there are no delivery candidates to report + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + expect( + markChannelsDeliveredSpy.mock.calls[0][0].latest_delivered_messages.length, + ).toBe(100); + // @ts-expect-error accessing protected property deliveryReportCandidates + expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(10); + expect( + // @ts-expect-error accessing protected property deliveryReportCandidates + Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()), + ).toEqual(channels.slice(100).map((channel) => channel.cid)); + + await Promise.resolve(); + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(2); + expect( + markChannelsDeliveredSpy.mock.calls[1][0].latest_delivered_messages.length, + ).toBe(10); + // @ts-expect-error accessing protected property deliveryReportCandidates + expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(0); + }); + + it('does nothing when delivery receipts are disabled', async () => { + (client as any).user.privacy_settings.delivery_receipts.enabled = false; + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({ ok: true } as any); + + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + (channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') }; + + client.syncDeliveredCandidates([channel]); + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('does nothing when read events are disabled in channel config', async () => { + client.configs[channel.cid] = { + created_at: '', + read_events: false, + reminders: false, + updated_at: '', + }; + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({ ok: true } as any); + + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + (channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') }; + + client.syncDeliveredCandidates([channel]); + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('does not report if latest message is older than last_delivered_at in read state', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({ ok: true } as any); + + (channel.state as any).latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + (channel.state as any).read['me'] = { + last_read: new Date('2025-01-01T09:00:00Z'), + last_delivered_at: new Date('2025-01-01T11:00:00Z'), + }; + + client.syncDeliveredCandidates([channel]); + + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('coalesces multiple announceDeliveryBuffered calls into a single request', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + channel.state.latestMessages = [mkMsg('m1', 1000)]; + (channel.state as any).read['me'] = { last_read: new Date(0) }; + + client.syncDeliveredCandidates([channel]); + + client.messageDeliveryReporter.announceDeliveryBuffered(); + client.messageDeliveryReporter.announceDeliveryBuffered(); + client.messageDeliveryReporter.announceDeliveryBuffered(); + + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + }); + + it('updates the candidate to the newest message before the throttle fires', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') }; + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')]; + + client.syncDeliveredCandidates([channel]); + + // newer message arrives before throttle fires + channel.state.latestMessages.push(mkMsg('m2', '2025-01-01T10:05:00Z')); + client.syncDeliveredCandidates([channel]); + + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: channel.cid, + id: 'm2', + }, + ], + }); + }); + + it('does not start a second request while one is in-flight; queues new candidate for after', async () => { + // first call stays in-flight until we resolve it + let resolveFirstMarkDelivered!: ( + value: EventAPIResponse | PromiseLike | undefined, + ) => void; + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockImplementationOnce(() => new Promise((r) => (resolveFirstMarkDelivered = r))) + .mockResolvedValueOnce({ ok: true } as any); // second request + + const ch1 = client.channel('messaging', 'ch1'); + ch1.initialized = true; + (ch1.state as any).read['me'] = { last_read: new Date(0) }; + (ch1.state as any).latestMessages = [mkMsg('m1', 1000)]; + + const ch2 = client.channel('messaging', 'ch2'); + ch2.initialized = true; + + client.configs[ch1.cid] = { + created_at: '', + read_events: true, + reminders: false, + updated_at: '', + }; + + client.configs[ch2.cid] = { + created_at: '', + read_events: true, + reminders: false, + updated_at: '', + }; + client.syncDeliveredCandidates([ch1]); + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: 'messaging:ch1', + id: 'm1', + }, + ], + }); + + // While request is in-flight, a new candidate (different channel) arrives. + (ch2.state as any).read['me'] = { last_read: new Date(0) }; + (ch2.state as any).latestMessages = [mkMsg('n1', 2000)]; + client.syncDeliveredCandidates([ch2]); + + // Trying to announce during in-flight should be a no-op for sending + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + + // Settle the first request + resolveFirstMarkDelivered({ ok: true } as any); + await Promise.resolve(); + + // Now announce again; the queued candidate should be sent + client.messageDeliveryReporter.announceDeliveryBuffered(); + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(2); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: 'messaging:ch2', + id: 'n1', + }, + ], + }); + }); + + it('removes the pending delivery candidate upon channel.markRead', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + vi.spyOn(channel, 'markAsReadRequest').mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = [mkMsg('m1', 1000)]; + + client.syncDeliveredCandidates([channel]); + + await channel.markRead(); + + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('does not remove the pending delivery candidate after failed markRead request', async () => { + const markChannelsDeliveredSpy = vi.spyOn(client, 'markChannelsDelivered'); + vi.spyOn(channel, 'markAsReadRequest').mockRejectedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = [mkMsg('m1', 1000)]; + + client.syncDeliveredCandidates([channel]); + + try { + await channel.markRead(); + } catch (error) {} + + vi.advanceTimersByTime(1000); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: channel.cid, + id: 'm1', + }, + ], + }); + }); + + it('handles message.new via channel event: schedules and sends delivered for newest', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = []; + + // simulate incoming message.new event + const ev: Event = { + type: 'message.new', + created_at: new Date('2025-01-01T10:00:00Z').toISOString(), + user: otherUser, + message: mkMsg('m1', '2025-01-01T10:00:00Z') as any, + }; + + channel._handleChannelEvent(ev); + + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: channel.cid, + id: 'm1', + }, + ], + }); + }); + + it('prevents tracking own new messages', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = []; + + // simulate incoming message.new event + const ev: Event = { + type: 'message.new', + created_at: new Date('2025-01-01T10:00:00Z').toISOString(), + user: ownUser, + message: mkMsg('m1', '2025-01-01T10:00:00Z') as any, + }; + + channel._handleChannelEvent(ev); + + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('syncs delivery candidates upon own message.read event and prevents reporting delivery', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z') as any]; + + client.syncDeliveredCandidates([channel]); + + const ev: Event = { + type: 'message.read', + created_at: new Date('2025-01-01T10:00:00Z').toISOString(), + last_read_message_id: 'm1', + message: mkMsg('m1', '2025-01-01T10:00:00Z') as any, + user: ownUser, + }; + + channel._handleChannelEvent(ev); + + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).not.toHaveBeenCalled(); + }); + + it('does not sync delivery candidates upon other user message.read event and reports delivery', async () => { + const markChannelsDeliveredSpy = vi + .spyOn(client, 'markChannelsDelivered') + .mockResolvedValue({} as any); + + (channel.state as any).read['me'] = { last_read: new Date(0) }; + channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z') as any]; + + client.syncDeliveredCandidates([channel]); + + const ev: Event = { + type: 'message.read', + created_at: new Date('2025-01-01T10:00:00Z').toISOString(), + last_read_message_id: 'm1', + message: mkMsg('m1', '2025-01-01T10:00:00Z') as any, + user: otherUser, + }; + + channel._handleChannelEvent(ev); + + vi.advanceTimersByTime(1000); + + expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1); + expect(markChannelsDeliveredSpy).toHaveBeenCalledWith({ + latest_delivered_messages: [ + { + cid: channel.cid, + id: 'm1', + }, + ], + }); + }); + + it('throttles markRead (burst collapses to one underlying request)', async () => { + const spy = vi.spyOn(channel, 'markAsReadRequest').mockResolvedValue({} as any); + + // burst + client.messageDeliveryReporter.throttledMarkRead(channel); + client.messageDeliveryReporter.throttledMarkRead(channel); + client.messageDeliveryReporter.throttledMarkRead(channel); + + expect(spy).not.toHaveBeenCalled(); + vi.advanceTimersByTime(1000); + expect(spy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/test/unit/messageDelivery/MessageReceiptsTracker.test.ts b/test/unit/messageDelivery/MessageReceiptsTracker.test.ts new file mode 100644 index 000000000..380aad2d2 --- /dev/null +++ b/test/unit/messageDelivery/MessageReceiptsTracker.test.ts @@ -0,0 +1,464 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { + MessageReceiptsTracker, + type MsgRef, + ReadResponse, + UserResponse, +} from '../../../src'; + +const ownUserId = 'author'; +const U = (id: string): UserResponse => ({ id, name: id }); // matches UserResponse shape for the service + +// Timeline: 4 messages with ascending timestamps +const msgs = [ + { id: 'm1', ts: 1000 }, + { id: 'm2', ts: 2000 }, + { id: 'm3', ts: 3000 }, + { id: 'm4', ts: 4000 }, +] as const; + +const byTs = new Map(msgs.map((m) => [m.ts, m])); +const ref = (ts: number): MsgRef => ({ timestampMs: ts, msgId: byTs.get(ts)!.id }); + +// Message locator used by the service (O(1) lookup by exact timestamp) +const makeLocator = () => (timestampMs?: number) => { + if (!timestampMs) return null; + const m = byTs.get(timestampMs); + return m ? { timestampMs: m.ts, msgId: m.id } : null; +}; + +// ISO builders (service parses Date strings) +const iso = (ts: number) => new Date(ts).toISOString(); + +// Extract ids from user arrays for easier assertions +const ids = (users: any[]) => users.map((u) => u.id); + +// ---------------------------------------------------------------- + +describe('MessageDeliveryReadTracker', () => { + let tracker: MessageReceiptsTracker; + + beforeEach(() => { + tracker = new MessageReceiptsTracker({ locateMessage: makeLocator() }); + }); + + describe('ingestInitial', () => { + it('builds initial state and enforces delivered >= read', () => { + const alice = U('alice'); + const bob = U('bob'); + + // Alice read m2, delivered m1 -> delivered must be bumped to m2 + // Bob delivered m3, haven't read any message -> read stays MIN, delivered m3 + const snapshot: ReadResponse[] = [ + { + user: alice, + last_read: iso(2000), + last_delivered_at: iso(1000), + }, + { + user: bob, + last_read: iso(500), + last_delivered_at: iso(3000), + }, + ]; + + tracker.ingestInitial(snapshot); + + const pAlice = tracker.getUserProgress('alice')!; + const pBob = tracker.getUserProgress('bob')!; + + expect(pAlice.lastReadRef).toEqual(ref(2000)); + expect(pAlice.lastDeliveredRef).toEqual(ref(2000)); // bumped up + + expect(pBob.lastReadRef.timestampMs).toBe(Number.NEGATIVE_INFINITY); + expect(pBob.lastDeliveredRef).toEqual(ref(3000)); + + // Readers of m2: Alice only + expect(ids(tracker.readersForMessage(ref(2000)))).toEqual(['alice']); + // Delivered for m2: Alice (m2) and Bob (m3) + expect(ids(tracker.deliveredForMessage(ref(2000)))).toEqual(['alice', 'bob']); + // Delivered-not-read for m2: Bob only + expect(ids(tracker.deliveredNotReadForMessage(ref(2000)))).toEqual(['bob']); + }); + + it('includes own read state', () => { + const ownUser = U(ownUserId); + + const snapshot: ReadResponse[] = [ + { + user: ownUser, + last_read: iso(2000), + last_delivered_at: iso(1000), + }, + ]; + + tracker.ingestInitial(snapshot); + + expect(tracker.getUserProgress(ownUserId)!.user).toStrictEqual(ownUser); + }); + }); + + describe('onMessageRead', () => { + it('creates user on first read and keeps delivered >= read', () => { + const carol = U('carol'); + const p0 = tracker.getUserProgress('carol'); + expect(p0).toBeNull(); + + // first read at m3 + tracker.onMessageRead({ user: carol, readAt: iso(3000) }); + + const p1 = tracker.getUserProgress('carol')!; + expect(p1.lastReadRef).toEqual(ref(3000)); + expect(p1.lastDeliveredRef).toEqual(ref(3000)); // bumped + + // older/equal reads are no-ops + tracker.onMessageRead({ user: carol, readAt: iso(2000) }); + tracker.onMessageRead({ user: carol, readAt: iso(3000) }); + const p2 = tracker.getUserProgress('carol')!; + expect(p2.lastReadRef).toEqual(ref(3000)); + expect(p2.lastDeliveredRef).toEqual(ref(3000)); + + // later read moves forward and bumps delivered + tracker.onMessageRead({ user: carol, readAt: iso(4000) }); + const p3 = tracker.getUserProgress('carol')!; + expect(p3.lastReadRef).toEqual(ref(4000)); + expect(p3.lastDeliveredRef).toEqual(ref(4000)); + }); + + it('ignores read events with unknown timestamps (locator returns null)', () => { + // re-init with a locator that knows only m1..m3 (m4 is unknown) + const locator = (ts?: number) => + ts && ts <= 3000 ? { timestampMs: ts, msgId: byTs.get(ts)!.id } : null; + tracker = new MessageReceiptsTracker({ locateMessage: locator }); + + const dave = U('dave'); + tracker.onMessageRead({ user: dave, readAt: iso(4000) }); // unknown -> ignored + expect(tracker.getUserProgress('dave')).toBeNull(); + + // but a known read creates progress + tracker.onMessageRead({ user: dave, readAt: iso(2000) }); + const pd = tracker.getUserProgress('dave')!; + expect(pd.lastReadRef).toEqual(ref(2000)); + expect(pd.lastDeliveredRef).toEqual(ref(2000)); + }); + + it('prevents search for message if last read message id is provided', () => { + const locator = vi.fn().mockImplementation(() => {}); + tracker = new MessageReceiptsTracker({ locateMessage: locator }); + const user = U('frank'); + tracker.onMessageRead({ user, readAt: iso(3000), lastReadMessageId: 'X' }); // unknown -> ignored + expect(locator).not.toHaveBeenCalled(); + expect(tracker.getUserProgress('frank')).toStrictEqual({ + lastDeliveredRef: { + msgId: 'X', + timestampMs: 3000, + }, + lastReadRef: { + msgId: 'X', + timestampMs: 3000, + }, + user: { + id: 'frank', + name: 'frank', + }, + }); + }); + + it('does not ignore own message.read events', () => { + const ownUser = U(ownUserId); + tracker.onMessageRead({ user: ownUser, readAt: iso(2000) }); + expect(tracker.getUserProgress(ownUserId)!.user).toStrictEqual(ownUser); + }); + }); + + describe('onMessageDelivered', () => { + it('creates user on first delivered; uses max(read, delivered)', () => { + const eve = U('eve'); + + tracker.onMessageDelivered({ user: eve, deliveredAt: iso(2000) }); + let progressEve = tracker.getUserProgress('eve')!; + expect(progressEve.lastDeliveredRef).toEqual(ref(2000)); + expect(progressEve.lastReadRef.timestampMs).toBe(Number.NEGATIVE_INFINITY); + + // deliver older/equal -> no-op + tracker.onMessageDelivered({ user: eve, deliveredAt: iso(1000) }); + tracker.onMessageDelivered({ user: eve, deliveredAt: iso(2000) }); + progressEve = tracker.getUserProgress('eve')!; + expect(progressEve.lastDeliveredRef).toEqual(ref(2000)); + + // if read goes ahead to m3, and a delivery arrives for m2, + // newDelivered = max(read, deliveredEvent) = read (m3) + tracker.onMessageRead({ user: eve, readAt: iso(3000) }); + progressEve = tracker.getUserProgress('eve')!; + expect(progressEve.lastReadRef).toEqual(ref(3000)); + expect(progressEve.lastDeliveredRef).toEqual(ref(3000)); // bumped by read + + // deliver at m4 -> moves forward + tracker.onMessageDelivered({ user: eve, deliveredAt: iso(4000) }); + progressEve = tracker.getUserProgress('eve')!; + expect(progressEve.lastDeliveredRef).toEqual(ref(4000)); + expect(progressEve.lastReadRef).toEqual(ref(3000)); + }); + + it('ignores delivered events with unknown timestamps (locator returns null)', () => { + const locator = (t?: number) => + t && t <= 2000 ? { timestampMs: t, msgId: byTs.get(t)!.id } : null; + tracker = new MessageReceiptsTracker({ locateMessage: locator }); + + const frank = U('frank'); + tracker.onMessageDelivered({ user: frank, deliveredAt: iso(3000) }); // unknown -> ignored + expect(tracker.getUserProgress('frank')).toBeNull(); + + tracker.onMessageDelivered({ user: frank, deliveredAt: iso(2000) }); // known -> creates + const pf = tracker.getUserProgress('frank')!; + expect(pf.lastDeliveredRef).toEqual(ref(2000)); + }); + + it('prevents search for message if last read message id is provided', () => { + const locator = vi.fn().mockImplementation(() => {}); + tracker = new MessageReceiptsTracker({ locateMessage: locator }); + const user = U('frank'); + tracker.onMessageDelivered({ + user, + deliveredAt: iso(3000), + lastDeliveredMessageId: 'X', + }); // unknown -> ignored + expect(locator).not.toHaveBeenCalled(); + expect(tracker.getUserProgress('frank')).toStrictEqual({ + lastDeliveredRef: { + msgId: 'X', + timestampMs: 3000, + }, + lastReadRef: { + msgId: '', + timestampMs: Number.NEGATIVE_INFINITY, + }, + user: { + id: 'frank', + name: 'frank', + }, + }); + }); + + it('does not ignore own message.delivered events', () => { + const ownUser = U(ownUserId); + tracker.onMessageDelivered({ user: ownUser, deliveredAt: iso(2000) }); + expect(tracker.getUserProgress(ownUserId)!.user).toStrictEqual(ownUser); + }); + }); + + describe('onNotificationMarkUnread', () => { + const user = U('u'); + it('moves lastRead backward to the event boundary and keeps delivered unchanged (no backward move)', () => { + tracker.onMessageRead({ user, readAt: iso(3000), lastReadMessageId: 'm3' }); + + tracker.onNotificationMarkUnread({ + user, + lastReadAt: iso(2000), + lastReadMessageId: 'm2', + }); + + const userProgress = tracker.getUserProgress(user.id)!; + // read moved back to m2 + expect(userProgress.lastReadRef).toEqual(ref(2000)); + // delivered did NOT move backward (stays at m3) + expect(userProgress.lastDeliveredRef).toEqual(ref(3000)); + + // sanity checks in queries + expect(tracker.hasUserRead(ref(2000), 'u')).toBe(true); + expect(tracker.hasUserRead(ref(3000), 'u')).toBe(false); + expect(tracker.hasUserDelivered(ref(3000), 'u')).toBe(true); + }); + + it('supports unread to MIN when lastReadAt is not provided', () => { + // v delivered m4 and read m2 + tracker.onMessageDelivered({ + user, + deliveredAt: iso(4000), + lastDeliveredMessageId: 'm4', + }); + tracker.onMessageRead({ user, readAt: iso(2000), lastReadMessageId: 'm2' }); + + let userProgress = tracker.getUserProgress(user.id)!; + expect(userProgress.lastReadRef).toEqual(ref(2000)); + expect(userProgress.lastDeliveredRef).toEqual(ref(4000)); + + // Unread everything (no lastReadAt) -> lastRead becomes MIN_REF; delivered stays at m4 + tracker.onNotificationMarkUnread({ + user, + }); + + userProgress = tracker.getUserProgress(user.id)!; + expect(userProgress.lastReadRef.timestampMs).toBe(Number.NEGATIVE_INFINITY); + expect(userProgress.lastReadRef.msgId).toBe(''); + // delivered remains ahead (not decreased) + expect(userProgress.lastDeliveredRef).toEqual(ref(4000)); + }); + + it('is a no-op when the provided last_read equals current lastReadRef', () => { + tracker.onMessageRead({ user, readAt: iso(3000) }); + const before = structuredClone(tracker.getUserProgress(user.id)!); + + tracker.onNotificationMarkUnread({ + user, + lastReadAt: iso(3000), + lastReadMessageId: 'm3', + }); + + const after = tracker.getUserProgress(user.id)!; + expect(after.lastReadRef).toEqual(before.lastReadRef); + expect(after.lastDeliveredRef).toEqual(before.lastDeliveredRef); + }); + + it('does not call locateMessage when lastReadMessageId is provided', () => { + const locator = vi.fn().mockImplementation(makeLocator()); + tracker = new MessageReceiptsTracker({ locateMessage: locator }); + + tracker.onNotificationMarkUnread({ + user, + lastReadAt: iso(2000), + lastReadMessageId: 'm2', + }); + + // new read state applied + const userProgress = tracker.getUserProgress(user.id)!; + expect(userProgress.lastReadRef).toEqual(ref(2000)); + + // ensure locator wasn’t used to derive the read ref + expect(locator).not.toHaveBeenCalled(); + }); + }); + + describe('queries', () => { + it('readersForMessage / deliveredForMessage / deliveredNotReadForMessage', () => { + const a = U('a'); + const b = U('b'); + const c = U('c'); + + // a: read m3, delivered m3 + tracker.onMessageRead({ user: a, readAt: iso(3000) }); + // b: delivered m3 only (not read) + tracker.onMessageDelivered({ user: b, deliveredAt: iso(3000) }); + // c: read m4, delivered m4 + tracker.onMessageRead({ user: c, readAt: iso(4000) }); + + // Readers of m2 => a, c + expect(ids(tracker.readersForMessage(ref(2000)))).toEqual(['a', 'c']); + + // Delivered for m2 => a, b, c + expect(ids(tracker.deliveredForMessage(ref(2000)))).toEqual(['a', 'b', 'c']); + + // Delivered-not-read for m3 => b only + expect(ids(tracker.deliveredNotReadForMessage(ref(3000)))).toEqual(['b']); + }); + + it('hasUserRead / hasUserDelivered flags reflect progress', () => { + const u1 = U('u1'); + const u2 = U('u2'); + + tracker.onMessageDelivered({ user: u1, deliveredAt: iso(2000) }); // delivered m2 + tracker.onMessageRead({ user: u2, readAt: iso(3000) }); // read m3 (delivered m3) + + // For m2: + expect(tracker.hasUserDelivered(ref(2000), 'u1')).toBe(true); + expect(tracker.hasUserRead(ref(2000), 'u1')).toBe(false); + + expect(tracker.hasUserDelivered(ref(2000), 'u2')).toBe(true); + expect(tracker.hasUserRead(ref(2000), 'u2')).toBe(true); + + // For m3: + expect(tracker.hasUserDelivered(ref(3000), 'u1')).toBe(false); + expect(tracker.hasUserRead(ref(3000), 'u1')).toBe(false); + + expect(tracker.hasUserDelivered(ref(3000), 'u2')).toBe(true); + expect(tracker.hasUserRead(ref(3000), 'u2')).toBe(true); + }); + + describe('usersWhoseLastReadIs / usersWhoseLastDeliveredIs', () => { + it('returns users for whom the given message is their exact *last* read/delivered', () => { + const a = U('a'); + const b = U('b'); + const c = U('c'); + const d = U('d'); // will share timestamp with m3 but different msgId via direct id override + const e = U('e'); // same for delivered side + + // a: read m2 -> delivered m2 + tracker.onMessageRead({ user: a, readAt: iso(2000) }); + + // b: read m3 -> delivered m3 + tracker.onMessageRead({ user: b, readAt: iso(3000) }); + + // c: delivered m3 only + tracker.onMessageDelivered({ user: c, deliveredAt: iso(3000) }); + + // d: read at ts=3000 but with a different msgId "X" (tests plateau filtering by msgId) + tracker.onMessageRead({ user: d, readAt: iso(3000), lastReadMessageId: 'X' }); + + // e: delivered at ts=3000 but with a different msgId "X" + tracker.onMessageDelivered({ + user: e, + deliveredAt: iso(3000), + lastDeliveredMessageId: 'X', + }); + + // Last READ is m2: only a + expect(ids(tracker.usersWhoseLastReadIs(ref(2000)))).toEqual(['a']); + + // Last READ is m3: only b (d is same timestamp but different msgId) + expect(ids(tracker.usersWhoseLastReadIs(ref(3000)))).toEqual(['b']); + + // Last DELIVERED is m2: only a + expect(ids(tracker.usersWhoseLastDeliveredIs(ref(2000)))).toEqual(['a']); + + // Last DELIVERED is m3: b (read bumps delivered) and c (delivered-only); e excluded (msgId "X") + expect(ids(tracker.usersWhoseLastDeliveredIs(ref(3000)))).toEqual(['b', 'c']); + }); + + it('updates membership when a user advances beyond the message', () => { + const user = U('x'); + + // x reads m2 -> last read m2 (and delivered m2) + tracker.onMessageRead({ user, readAt: iso(2000) }); + expect(ids(tracker.usersWhoseLastReadIs(ref(2000)))).toEqual(['x']); + expect(ids(tracker.usersWhoseLastDeliveredIs(ref(2000)))).toEqual(['x']); + + // x later reads m4 -> moves out of m2 group and into m4 group + tracker.onMessageRead({ user, readAt: iso(4000) }); + expect(ids(tracker.usersWhoseLastReadIs(ref(2000)))).toEqual([]); + expect(ids(tracker.usersWhoseLastReadIs(ref(4000)))).toEqual(['x']); + + // delivered follows read bump + expect(ids(tracker.usersWhoseLastDeliveredIs(ref(2000)))).toEqual([]); + expect(ids(tracker.usersWhoseLastDeliveredIs(ref(4000)))).toEqual(['x']); + }); + + it('returns empty array for empty message id', () => { + expect(tracker.usersWhoseLastReadIs({ timestampMs: 123, msgId: '' })).toEqual([]); + expect( + tracker.usersWhoseLastDeliveredIs({ timestampMs: 123, msgId: '' }), + ).toEqual([]); + }); + }); + }); + + describe('ordering & movement in sorted arrays', () => { + it('repositions users correctly when progress advances', () => { + const x = U('x'); + const y = U('y'); + + // x reads m2, y reads m3 + tracker.onMessageRead({ user: x, readAt: iso(2000) }); + tracker.onMessageRead({ user: y, readAt: iso(3000) }); + + // Readers of m2 -> x, y + expect(ids(tracker.readersForMessage(ref(2000)))).toEqual(['x', 'y']); + + // now x reads m4 (moves past y) + tracker.onMessageRead({ user: x, readAt: iso(4000) }); + // Readers of m3 -> x, y? Actually only x (m4) and y (m3) both >= m3 + expect(ids(tracker.readersForMessage(ref(3000)))).toEqual(['y', 'x']); + // and of m4 -> x only + expect(ids(tracker.readersForMessage(ref(4000)))).toEqual(['x']); + }); + }); +}); diff --git a/test/unit/threads.test.ts b/test/unit/threads.test.ts index 5c1081baa..121cfc40f 100644 --- a/test/unit/threads.test.ts +++ b/test/unit/threads.test.ts @@ -16,7 +16,6 @@ import { THREAD_MANAGER_INITIAL_STATE, ThreadFilters, ThreadSort, - QueryThreadsOptions, } from '../../src'; import { THREAD_RESPONSE_RESERVED_KEYS } from '../../src/thread'; @@ -56,6 +55,7 @@ describe('Threads 2.0', () => { channel: { id: uuidv4(), name: 'Test channel', members: [] }, }).channel as ChannelResponse; channel = client.channel(channelResponse.type, channelResponse.id); + channel.initialized = true; parentMessageResponse = generateMsg() as MessageResponse; threadManager = new ThreadManager({ client }); }); @@ -320,12 +320,12 @@ describe('Threads 2.0', () => { describe('markAsRead', () => { let stubbedChannelMarkRead: sinon.SinonStub< - Parameters, - ReturnType + Parameters, + ReturnType >; beforeEach(() => { - stubbedChannelMarkRead = sinon.stub(channel, 'markRead').resolves(); + stubbedChannelMarkRead = sinon.stub(channel, 'markAsReadRequest').resolves(); }); it('does nothing if unread count of the current user is zero', async () => { diff --git a/test/unit/utils.test.js b/test/unit/utils.test.js index 92744aa36..a56ea1cb6 100644 --- a/test/unit/utils.test.js +++ b/test/unit/utils.test.js @@ -2718,7 +2718,7 @@ describe('messageSetPagination', () => { }); }); -describe('', () => { +describe('binarySearchByDateEqualOrNearestGreater', () => { const messages = [ { created_at: '2024-08-05T08:55:00.199808Z', id: '0' }, { created_at: '2024-08-05T08:55:01.199808Z', id: '1' },