Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { ChannelState } from './channel_state';
import { MessageComposer } from './messageComposer';
import { OwnMessageReceiptsTracker } from './messageDelivery';
import {
generateChannelTempCid,
logChatPromiseExecution,
Expand Down Expand Up @@ -74,7 +76,6 @@ import type {
} from './types';
import type { Role } from './permissions';
import type { CustomChannelData } from './custom_types';
import { MessageComposer } from './messageComposer';

/**
* Channel - The Channel class manages it's own state.
Expand Down Expand Up @@ -110,6 +111,7 @@ export class Channel {
disconnected: boolean;
push_preferences?: PushPreference;
public readonly messageComposer: MessageComposer;
public readonly ownMessageReceiptsTracker: OwnMessageReceiptsTracker;

/**
* constructor - Create a channel
Expand Down Expand Up @@ -158,6 +160,13 @@ export class Channel {
client: this._client,
compositionContext: this,
});

this.ownMessageReceiptsTracker = new OwnMessageReceiptsTracker({
locateMessage: (timestampMs) => {
const msg = this.state.findMessageByTimestamp(timestampMs);
return msg && { timestampMs, msgId: msg.id };
},
});
}

/**
Expand Down Expand Up @@ -1131,16 +1140,26 @@ export class Channel {
}

/**
* markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled
* markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled. Syncs the message delivery report candidates local state.
*
* @param {MarkReadOptions} data
* @return {Promise<EventAPIResponse | null>} Description
*/
async markRead(data: MarkReadOptions = {}) {
return await this.getClient().messageDeliveryReporter.markRead(this, data);
}

/**
* markReadRequest - Send the mark read event for this user, only works if the `read_events` setting is enabled
*
* @param {MarkReadOptions} data
* @return {Promise<EventAPIResponse | null>} Description
*/
async markAsReadRequest(data: MarkReadOptions = {}) {
this._checkInitialized();

if (!this.getConfig()?.read_events && !this.getClient()._isUsingServerAuth()) {
return Promise.resolve(null);
return null;
}

return await this.getClient().post<EventAPIResponse>(this._channelURL() + '/read', {
Expand Down Expand Up @@ -1554,6 +1573,7 @@ export class Channel {
{ method: 'upsertChannels' },
);

this.getClient().syncDeliveredCandidates([this]);
return state;
}

Expand Down Expand Up @@ -1879,13 +1899,47 @@ export class Channel {
last_read_message_id: event.last_read_message_id,
user: event.user,
unread_messages: 0,
last_delivered_at: event.last_delivered_at
? new Date(event.last_delivered_at)
: undefined,
last_delivered_message_id: event.last_delivered_message_id,
};
this.ownMessageReceiptsTracker.onMessageRead({
user: event.user,
readAt: event.created_at,
lastReadMessageId: event.last_read_message_id,
});
const client = this.getClient();

if (event.user?.id === this.getClient().user?.id) {
const isOwnEvent = event.user?.id === client.user?.id;

if (isOwnEvent) {
channelState.unreadCount = 0;
client.syncDeliveredCandidates([this]);
}
}
break;
case 'message.delivered':
// todo: update also on thread
if (event.user?.id && event.created_at) {
channelState.read[event.user.id] = {
last_read: new Date(event.created_at),
last_read_message_id: event.last_read_message_id,
user: event.user,
unread_messages: event.unread_messages ?? 0,
last_delivered_at: event.last_delivered_at
? new Date(event.last_delivered_at)
: undefined,
last_delivered_message_id: event.last_delivered_message_id,
};

this.ownMessageReceiptsTracker.onMessageDelivered({
user: event.user,
deliveredAt: event.created_at,
lastDeliveredMessageId: event.last_delivered_message_id,
});
}
break;
case 'user.watching.start':
case 'user.updated':
if (event.user?.id) {
Expand Down Expand Up @@ -1921,8 +1975,9 @@ export class Channel {
break;
case 'message.new':
if (event.message) {
const client = this.getClient();
/* if message belongs to current user, always assume timestamp is changed to filter it out and add again to avoid duplication */
const ownMessage = event.user?.id === this.getClient().user?.id;
const ownMessage = event.user?.id === client.user?.id;
const isThreadMessage =
event.message.parent_id && !event.message.show_in_channel;

Expand All @@ -1943,10 +1998,17 @@ export class Channel {
if (event.user?.id) {
for (const userId in channelState.read) {
if (userId === event.user.id) {
const currentState = channelState.read[event.user.id];
channelState.read[event.user.id] = {
last_read: new Date(event.created_at as string),
user: event.user,
unread_messages: 0,
last_delivered_at: event.last_delivered_at
? new Date(event.last_delivered_at)
: currentState.last_delivered_at,
last_delivered_message_id:
event.last_delivered_message_id ??
currentState.last_delivered_message_id,
};
} else {
channelState.read[userId].unread_messages += 1;
Expand All @@ -1957,6 +2019,8 @@ export class Channel {
if (this._countMessageAsUnread(event.message)) {
channelState.unreadCount = channelState.unreadCount + 1;
}

client.syncDeliveredCandidates([this]);
}
break;
case 'message.updated':
Expand Down Expand Up @@ -2057,7 +2121,7 @@ export class Channel {
break;
case 'notification.mark_unread': {
const ownMessage = event.user?.id === this.getClient().user?.id;
if (!(ownMessage && event.user)) break;
if (!ownMessage || !event.user) break;

const unreadCount = event.unread_messages ?? 0;

Expand All @@ -2067,6 +2131,10 @@ export class Channel {
last_read_message_id: event.last_read_message_id,
user: event.user,
unread_messages: unreadCount,
last_delivered_at: event.last_delivered_at
? new Date(event.last_delivered_at)
: undefined,
last_delivered_message_id: event.last_delivered_message_id,
};

channelState.unreadCount = unreadCount;
Expand Down Expand Up @@ -2286,6 +2354,8 @@ export class Channel {
this.state.unreadCount = this.state.read[read.user.id].unread_messages;
}
}

this.ownMessageReceiptsTracker.ingestInitial(state.read);
}

return {
Expand Down
161 changes: 147 additions & 14 deletions src/channel_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ type ChannelReadStatus = Record<
}
>;

const messageSetBounds = (
a: LocalMessage[] | MessageResponse[],
b: LocalMessage[] | MessageResponse[],
) => ({
newestMessageA: new Date(a[0]?.created_at ?? 0),
oldestMessageA: new Date(a.slice(-1)[0]?.created_at ?? 0),
newestMessageB: new Date(b[0]?.created_at ?? 0),
oldestMessageB: new Date(b.slice(-1)[0]?.created_at ?? 0),
});

const aContainsOrEqualsB = (a: LocalMessage[], b: LocalMessage[]) => {
const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } =
messageSetBounds(a, b);
return newestMessageA >= newestMessageB && oldestMessageB >= oldestMessageA;
};

const aOverlapsB = (a: LocalMessage[], b: LocalMessage[]) => {
const { newestMessageA, newestMessageB, oldestMessageA, oldestMessageB } =
messageSetBounds(a, b);
return (
oldestMessageA < oldestMessageB &&
oldestMessageB < newestMessageA &&
newestMessageA < newestMessageB
);
};

const messageSetsOverlapByTimestamp = (a: LocalMessage[], b: LocalMessage[]) =>
aContainsOrEqualsB(a, b) ||
aContainsOrEqualsB(b, a) ||
aOverlapsB(a, b) ||
aOverlapsB(b, a);

/**
* ChannelState - A container class for the channel state.
*/
Expand Down Expand Up @@ -867,6 +899,41 @@ export class ChannelState {
return this.messageSets[messageSetIndex].messages.find((m) => m.id === messageId);
}

findMessageByTimestamp(
timestampMs: number,
parentMessageId?: string,
exactTsMatch: boolean = false,
): LocalMessage | null {
if (
(parentMessageId && !this.threads[parentMessageId]) ||
this.messageSets.length === 0
)
return null;
const setIndex = this.findMessageSetByOldestTimestamp(timestampMs);
const targetMsgSet = this.messageSets[setIndex]?.messages;
if (!targetMsgSet?.length) return null;
const firstMsgTimestamp = targetMsgSet[0].created_at.getTime();
const lastMsgTimestamp = targetMsgSet.slice(-1)[0].created_at.getTime();
const isOutOfBound =
timestampMs < firstMsgTimestamp || lastMsgTimestamp < timestampMs;
if (isOutOfBound && exactTsMatch) return null;

let msgIndex = 0,
hi = targetMsgSet.length - 1;
while (msgIndex < hi) {
const mid = (msgIndex + hi) >>> 1;
if (timestampMs <= targetMsgSet[mid].created_at.getTime()) hi = mid;
else msgIndex = mid + 1;
}

const foundMessage = targetMsgSet[msgIndex];
return !exactTsMatch
? foundMessage
: foundMessage.created_at.getTime() === timestampMs
? foundMessage
: null;
}

private switchToMessageSet(index: number) {
const currentMessages = this.messageSets.find((s) => s.isCurrent);
if (!currentMessages) {
Expand All @@ -889,46 +956,112 @@ export class ChannelState {
);
}

/**
* Identifies the set index into which a message set would pertain if its first item's creation date corresponded to oldestTimestampMs.
* @param oldestTimestampMs
*/
private findMessageSetByOldestTimestamp = (oldestTimestampMs: number): number => {
let lo = 0,
hi = this.messageSets.length;
while (lo < hi) {
const mid = (lo + hi) >>> 1;
const msgSet = this.messageSets[mid];
// should not happen
if (msgSet.messages.length === 0) return -1;

const oldestMessageTimestampInSet = msgSet.messages[0].created_at.getTime();
if (oldestMessageTimestampInSet <= oldestTimestampMs) hi = mid;
else lo = mid + 1;
}
return lo;
};

private findTargetMessageSet(
newMessages: (MessageResponse | LocalMessage)[],
addIfDoesNotExist = true,
messageSetToAddToIfDoesNotExist: MessageSetType = 'current',
) {
let messagesToAdd: (MessageResponse | LocalMessage)[] = newMessages;
let targetMessageSetIndex!: number;
if (newMessages.length === 0)
return { targetMessageSetIndex: 0, messagesToAdd: newMessages };
if (addIfDoesNotExist) {
const overlappingMessageSetIndices = this.messageSets
const overlappingMessageSetIndicesByMsgIds = this.messageSets
.map((_, i) => i)
.filter((i) =>
this.areMessageSetsOverlap(this.messageSets[i].messages, newMessages),
);
const overlappingMessageSetIndicesByTimestamp = this.messageSets
.map((_, i) => i)
.filter((i) =>
messageSetsOverlapByTimestamp(
this.messageSets[i].messages,
newMessages.map(formatMessage),
),
);
switch (messageSetToAddToIfDoesNotExist) {
case 'new':
if (overlappingMessageSetIndices.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndices[0];
if (overlappingMessageSetIndicesByMsgIds.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByMsgIds[0];
} else if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
// No new message set is created if newMessages only contains thread replies
} else if (newMessages.some((m) => !m.parent_id)) {
this.messageSets.push({
messages: [],
isCurrent: false,
isLatest: false,
pagination: DEFAULT_MESSAGE_SET_PAGINATION,
});
targetMessageSetIndex = this.messageSets.length - 1;
// find the index to insert the set
const setIngestIndex = this.findMessageSetByOldestTimestamp(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new Date(newMessages[0].created_at!).getTime(),
);
if (setIngestIndex === -1) {
this.messageSets.push({
messages: [],
isCurrent: false,
isLatest: false,
pagination: DEFAULT_MESSAGE_SET_PAGINATION,
});
targetMessageSetIndex = this.messageSets.length - 1;
} else {
const isLatest = setIngestIndex === 0;
this.messageSets.splice(setIngestIndex, 0, {
messages: [],
isCurrent: false,
isLatest,
pagination: DEFAULT_MESSAGE_SET_PAGINATION, // fixme: it is problematic decide about pagination without having data
});
if (isLatest) {
this.messageSets.slice(1).forEach((set) => {
set.isLatest = false;
});
}
targetMessageSetIndex = setIngestIndex;
}
}
break;
case 'current':
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent);
// determine if there is another set to which it would match taken into consideration the timestamp
if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
} else {
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isCurrent);
}
break;
case 'latest':
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest);
// determine if there is another set to which it would match taken into consideration the timestamp
if (overlappingMessageSetIndicesByTimestamp.length > 0) {
targetMessageSetIndex = overlappingMessageSetIndicesByTimestamp[0];
} else {
targetMessageSetIndex = this.messageSets.findIndex((s) => s.isLatest);
}
break;
default:
targetMessageSetIndex = -1;
}
// when merging the target set will be the first one from the overlapping message sets
const mergeTargetMessageSetIndex = overlappingMessageSetIndices.splice(0, 1)[0];
const mergeSourceMessageSetIndices = [...overlappingMessageSetIndices];
const mergeTargetMessageSetIndex = overlappingMessageSetIndicesByMsgIds.splice(
0,
1,
)[0];
const mergeSourceMessageSetIndices = [...overlappingMessageSetIndicesByMsgIds];
if (
mergeTargetMessageSetIndex !== undefined &&
mergeTargetMessageSetIndex !== targetMessageSetIndex
Expand Down
Loading