Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get reverse history message change. #805

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 52 additions & 33 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
startTime = m.SendTime
} else {
c.messagePullMinSeqMap.Delete(conversationID)
// Clear both maps when the user enters the conversation
c.messagePullForwardEndSeqMap.Delete(conversationID)
c.messagePullReverseEndSeqMap.Delete(conversationID)
}
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
Expand All @@ -75,41 +77,48 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
t = time.Now()

var thisMinSeq int64
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list)
var thisEndSeq int64
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
if thisEndSeq != 0 {
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
}
} else {
if thisEndSeq != 0 {
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
}
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq != 0 {
c.messagePullMinSeqMap.Store(conversationID, thisMinSeq)
}
return &messageListCallback, nil

return &messageListCallback, nil
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list []*model_struct.LocalChatLog
var list, validMessages []*model_struct.LocalChatLog

// If all retrieved messages are either deleted or filtered out, continue fetching messages from an earlier point.
shouldFetchMoreMessages := func(messages []*model_struct.LocalChatLog) bool {
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
if len(messages) == 0 {
return false
return count
}

allDeleted := true
// Represents the number of valid messages in the batch
validateMessageNum := 0
for _, msg := range messages {
if msg.Status < constant.MsgStatusHasDeleted {
allDeleted = false
break
validateMessageNum++
validMessages = append(validMessages, msg)
} else {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
}
}
return allDeleted
return count - validateMessageNum
}
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
if len(messages) == 0 {
Expand All @@ -128,39 +137,49 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
return nil, err
}
t = time.Now()
maxSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, maxSeq, conversationID,
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
// If all retrieved messages are either deleted or filtered out,
//continue fetching recursively until either valid messages are found or all messages have been fetched.
if shouldFetchMoreMessages(list) && !messageListCallback.IsEnd {
return c.fetchMessagesWithGapCheck(ctx, conversationID, count, getNewStartTime(list), isReverse, messageListCallback)
// If the number of valid messages retrieved is less than the count,
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
missingCount := shouldFetchMoreMessagesNum(list)
if missingCount > 0 && !messageListCallback.IsEnd {
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "fetch more messages", "missingMessages", missingMessages)
return append(validMessages, missingMessages...), nil
}

return list, nil
return validMessages, nil
}

func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog) (int64, []*sdk_struct.MsgStruct) {
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
var thisMinSeq int64
var thisEndSeq int64
for _, v := range list {
if v.Seq != 0 && thisMinSeq == 0 {
thisMinSeq = v.Seq
if v.Seq != 0 && thisEndSeq == 0 {
thisEndSeq = v.Seq
}
if v.Seq < thisMinSeq && v.Seq != 0 {
thisMinSeq = v.Seq
}
if v.Status >= constant.MsgStatusHasDeleted {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
continue
if isReverse {
if v.Seq > thisEndSeq && thisEndSeq != 0 {
thisEndSeq = v.Seq
}

} else {
if v.Seq < thisEndSeq && v.Seq != 0 {
thisEndSeq = v.Seq
}
}
temp := LocalChatLogToMsgStruct(v)

Expand All @@ -169,7 +188,7 @@ func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model
}
messageList = append(messageList, temp)
}
return thisMinSeq, messageList
return thisEndSeq, messageList
}

func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
Expand Down
140 changes: 71 additions & 69 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,29 @@ var SearchContentType = []int{constant.Text, constant.AtText, constant.File}

type Conversation struct {
*interaction.LongConnMgr
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
batchMsgListener func() open_im_sdk_callback.OnBatchMsgListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullMinSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
batchMsgListener func() open_im_sdk_callback.OnBatchMsgListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullForwardEndSeqMap *cache.Cache[string, int64]
messagePullReverseEndSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex

startTime time.Time

Expand Down Expand Up @@ -100,20 +101,21 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
file *file.File) *Conversation {
info := ccontext.Info(ctx)
n := &Conversation{db: db,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullMinSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
}
n.typing = newTyping(n)
n.initSyncer()
Expand Down Expand Up @@ -835,8 +837,8 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations
conversation.FaceURL = v.FaceURL
conversation.ShowName = v.Nickname
} else {
log.ZWarn(ctx, "user info not found", errors.New("user not found"),"userID", conversation.UserID)
log.ZWarn(ctx, "user info not found", errors.New("user not found"), "userID", conversation.UserID)

conversation.FaceURL = ""
conversation.ShowName = "UserNotFound"
}
Expand Down Expand Up @@ -929,37 +931,37 @@ func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversatio
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]})
if len(msgList) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
msg := msgList[0]
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
//if len(msgList) == 0 {
// return []*sdk_struct.MsgStruct{}, nil
//}
//msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(before),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, msg)
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(after),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
sort.Sort(sdk_struct.NewMsgList(result))
//if before > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(before),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//result = append(result, msg)
//if after > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(after),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
Loading
Loading