Skip to content

Commit

Permalink
feat: add test function about 1000 users join group send msg (openims…
Browse files Browse the repository at this point in the history
  • Loading branch information
BanTanger committed Jul 18, 2023
1 parent 5b838b1 commit f80f072
Show file tree
Hide file tree
Showing 31 changed files with 423 additions and 171 deletions.
2 changes: 1 addition & 1 deletion cmd/gordon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func gRunGetToken(strMyUid string) string {
return token
}
func main() {
uid := "7789"
uid := "7788"
//Gordon
//uid:="1554321956297519104"
//Gordon2
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ require (

require golang.org/x/net v0.11.0

require github.com/OpenIMSDK/Open-IM-Server v0.0.0-20230712062720-2e6ea7b193c3
require (
github.com/OpenIMSDK/Open-IM-Server v0.0.0-20230712062720-2e6ea7b193c3
github.com/google/go-cmp v0.5.9
)

require (
github.com/bwmarrin/snowflake v0.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
10 changes: 5 additions & 5 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,16 @@ func (c *Conversation) getAdvancedHistoryMessageList2(ctx context.Context, req s
rawMessageLength := len(list)
t = time.Now()
if rawMessageLength < req.Count {
maxSeq, minSeq, lostSeqListLength := c.messageBlocksInternalContinuityCheck(ctx, conversationID, notStartTime, isReverse, req.Count, sessionType, startTime, &list, &messageListCallback)
_ = c.messageBlocksBetweenContinuityCheck(ctx, req.LastMinSeq, maxSeq, conversationID, notStartTime, isReverse, req.Count, sessionType, startTime, &list, &messageListCallback)
maxSeq, minSeq, lostSeqListLength := c.messageBlocksInternalContinuityCheck(ctx, conversationID, notStartTime, isReverse, req.Count, startTime, &list, &messageListCallback)
_ = c.messageBlocksBetweenContinuityCheck(ctx, req.LastMinSeq, maxSeq, conversationID, notStartTime, isReverse, req.Count, startTime, &list, &messageListCallback)
if minSeq == 1 && lostSeqListLength == 0 {
messageListCallback.IsEnd = true
} else {
c.messageBlocksEndContinuityCheck(ctx, minSeq, conversationID, notStartTime, isReverse, req.Count, sessionType, startTime, &list, &messageListCallback)
c.messageBlocksEndContinuityCheck(ctx, minSeq, conversationID, notStartTime, isReverse, req.Count, startTime, &list, &messageListCallback)
}
} else {
maxSeq, _, _ := c.messageBlocksInternalContinuityCheck(ctx, conversationID, notStartTime, isReverse, req.Count, sessionType, startTime, &list, &messageListCallback)
c.messageBlocksBetweenContinuityCheck(ctx, req.LastMinSeq, maxSeq, conversationID, notStartTime, isReverse, req.Count, sessionType, startTime, &list, &messageListCallback)
maxSeq, _, _ := c.messageBlocksInternalContinuityCheck(ctx, conversationID, notStartTime, isReverse, req.Count, startTime, &list, &messageListCallback)
c.messageBlocksBetweenContinuityCheck(ctx, req.LastMinSeq, maxSeq, conversationID, notStartTime, isReverse, req.Count, startTime, &list, &messageListCallback)

}
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
Expand Down
4 changes: 3 additions & 1 deletion internal/conversation_msg/conversation_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) {
}

for _, syncFunc := range []func(c context.Context) error{
c.user.SyncLoginUserInfo, c.SyncConversations,
c.user.SyncLoginUserInfo,
c.friend.SyncBlackList, c.friend.SyncFriendList, c.friend.SyncFriendApplication, c.friend.SyncSelfFriendApplication,
c.group.SyncJoinedGroup, c.group.SyncAdminGroupApplication, c.group.SyncSelfGroupApplication, c.group.SyncJoinedGroupMember,
} {
Expand All @@ -633,9 +633,11 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) {
c.ConversationListener.OnSyncServerFailed()
case constant.MsgSyncEnd:
defer c.ConversationListener.OnSyncServerFinish()
go c.SyncConversations(ctx)
}

for conversationID, msgs := range allMsg {
log.ZDebug(ctx, "notification handling", "conversationID", conversationID, "msgs", msgs)
if len(msgs.Msgs) != 0 {
lastMsg := msgs.Msgs[len(msgs.Msgs)-1]
log.ZDebug(ctx, "SetNotificationSeq", "conversationID", conversationID, "seq", lastMsg.Seq)
Expand Down
8 changes: 7 additions & 1 deletion internal/conversation_msg/create_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"open_im_sdk/pkg/utils"
"open_im_sdk/sdk_struct"
"os"
"path/filepath"
"strings"
)

func (c *Conversation) CreateTextMessage(ctx context.Context, text string) (*sdk_struct.MsgStruct, error) {
Expand Down Expand Up @@ -274,6 +276,7 @@ func (c *Conversation) CreateSoundMessageFromFullPath(ctx context.Context, sound
SoundPath: soundPath,
Duration: duration,
DataSize: fi.Size(),
SoundType: strings.Replace(filepath.Ext(fi.Name()), ".", "", 1),
}
return &s, nil
}
Expand Down Expand Up @@ -347,6 +350,9 @@ func (c *Conversation) CreateSoundMessage(ctx context.Context, soundPath string,
Duration: duration,
DataSize: fi.Size(),
}
if typ := strings.Replace(filepath.Ext(fi.Name()), ".", "", 1); typ != "" {
s.SoundElem.SoundType = "audio/" + strings.ToLower(typ)
}
return &s, nil
}
func (c *Conversation) CreateVideoMessageByURL(ctx context.Context, videoElem sdk_struct.VideoBaseInfo) (*sdk_struct.MsgStruct, error) {
Expand Down Expand Up @@ -460,8 +466,8 @@ func (c *Conversation) CreateFaceMessage(ctx context.Context, index int, data st
s.FaceElem.Index = index
s.Content = utils.StructToJsonString(s.FaceElem)
return &s, nil

}

func (c *Conversation) CreateForwardMessage(ctx context.Context, s *sdk_struct.MsgStruct) (*sdk_struct.MsgStruct, error) {
if s.Status != constant.MsgStatusSendSuccess {
log.Error("internal", "only send success message can be Forward")
Expand Down
68 changes: 0 additions & 68 deletions internal/conversation_msg/file_js.go

This file was deleted.

26 changes: 13 additions & 13 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
)

// 检测其内部连续性,如果不连续,则向前补齐,获取这一组消息的最大最小seq,以及需要补齐的seq列表长度
func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context, conversationID string, notStartTime, isReverse bool, count,
sessionType int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (max, min int64, length int) {
func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context, conversationID string, notStartTime, isReverse bool, count int,
startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (max, min int64, length int) {
var lostSeqListLength int
maxSeq, minSeq, haveSeqList := c.getMaxAndMinHaveSeqList(*list)
log.ZDebug(ctx, "getMaxAndMinHaveSeqList is:", "maxSeq", maxSeq, "minSeq", minSeq, "haveSeqList", haveSeqList)
Expand All @@ -49,7 +49,7 @@ func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context,
} else {
pullSeqList = lostSeqList[lostSeqListLength-constant.PullMsgNumForReadDiffusion : lostSeqListLength]
}
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, pullSeqList, notStartTime, isReverse, count, sessionType, startTime, list, messageListCallback)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, pullSeqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}

}
Expand All @@ -58,13 +58,13 @@ func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context,

// 检测消息块之间的连续性,如果不连续,则向前补齐,返回块之间是否连续,bool
func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context, lastMinSeq, maxSeq int64, conversationID string,
notStartTime, isReverse bool, count, sessionType int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) bool {
notStartTime, isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) bool {
if lastMinSeq != 0 {
log.ZDebug(ctx, "get lost LastMinSeq is :", "lastMinSeq", lastMinSeq, "thisMaxSeq", maxSeq)
if maxSeq != 0 {
if maxSeq+1 != lastMinSeq {
startSeq := int64(lastMinSeq) - constant.PullMsgNumForReadDiffusion
if startSeq <= int64(maxSeq) {
if startSeq <= maxSeq {
startSeq = int64(maxSeq) + 1
}
successiveSeqList := func(max, min int64) (seqList []int64) {
Expand All @@ -75,7 +75,7 @@ func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context,
}(lastMinSeq-1, startSeq)
log.ZDebug(ctx, "get lost successiveSeqList is :", "successiveSeqList", successiveSeqList, "length:", len(successiveSeqList))
if len(successiveSeqList) > 0 {
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, successiveSeqList, notStartTime, isReverse, count, sessionType, startTime, list, messageListCallback)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, successiveSeqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}
} else {
return true
Expand All @@ -94,7 +94,7 @@ func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context,

// 根据最小seq向前补齐消息,由服务器告诉拉取消息结果是否到底,如果网络,则向前补齐,获取这一组消息的最大最小seq,以及需要补齐的seq列表长度
func (c *Conversation) messageBlocksEndContinuityCheck(ctx context.Context, minSeq int64, conversationID string, notStartTime,
isReverse bool, count, sessionType int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
if minSeq != 0 {
seqList := func(seq int64) (seqList []int64) {
startSeq := seq - constant.PullMsgNumForReadDiffusion
Expand All @@ -110,13 +110,13 @@ func (c *Conversation) messageBlocksEndContinuityCheck(ctx context.Context, minS
log.ZDebug(ctx, "pull seqList is ", "seqList", seqList, "len", len(seqList))

if len(seqList) > 0 {
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, sessionType, startTime, list, messageListCallback)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}

} else {
//local don't have messages,本地无消息,但是服务器最大消息不为0
seqList := []int64{0, 0}
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, sessionType, startTime, list, messageListCallback)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, startTime, list, messageListCallback)

}

Expand Down Expand Up @@ -145,19 +145,19 @@ func (c *Conversation) getMaxAndMinHaveSeqList(messages []*model_struct.LocalCha
// 2、块中连续性检测
// 3、块之间连续性检测
func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, conversationID string, seqList []int64, notStartTime,
isReverse bool, count, sessionType int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
existedSeqList, err := c.db.GetAlreadyExistSeqList(ctx, conversationID, seqList)
if err != nil {
// log.Error(operationID, "SuperGroupGetAlreadyExistSeqList err", err.Error(), sourceID, seqList)
log.ZError(ctx, "GetAlreadyExistSeqList err", err, "conversationID", conversationID, "seqList", seqList)
return
}
if len(existedSeqList) == len(seqList) {
// log.Debug(operationID, "do not pull message")
log.ZDebug(ctx, "do not pull message", "seqList", seqList, "existedSeqList", existedSeqList)
return
}
newSeqList := utils.DifferenceSubset(seqList, existedSeqList)
if len(newSeqList) == 0 {
// log.Debug(operationID, "do not pull message")
log.ZDebug(ctx, "do not pull message", "seqList", seqList, "existedSeqList", existedSeqList, "newSeqList", newSeqList)
return
}
var pullMsgResp sdkws.PullMessageBySeqsResp
Expand Down
8 changes: 4 additions & 4 deletions internal/conversation_msg/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type msgUploadFileCallback struct {
func (c *msgUploadFileCallback) Open(size int64) {
}

func (c *msgUploadFileCallback) PartSize(partSize int64, num int32) {
func (c *msgUploadFileCallback) PartSize(partSize int64, num int) {
}

func (c *msgUploadFileCallback) HashPartProgress(index int32, size int64, partHash string) {
func (c *msgUploadFileCallback) HashPartProgress(index int, size int64, partHash string) {
}

func (c *msgUploadFileCallback) HashPartComplete(partsHash string, fileHash string) {
Expand All @@ -52,7 +52,7 @@ func (c *msgUploadFileCallback) UploadID(uploadID string) {
}
}

func (c *msgUploadFileCallback) UploadPartComplete(index int32, partSize int64, partHash string) {
func (c *msgUploadFileCallback) UploadPartComplete(index int, partSize int64, partHash string) {

}

Expand All @@ -74,7 +74,7 @@ func (c *msgUploadFileCallback) UploadComplete(fileSize int64, streamSize int64,
}
}

func (c *msgUploadFileCallback) Complete(size int64, url string, typ int32) {
func (c *msgUploadFileCallback) Complete(size int64, url string, typ int) {
c.msg.AttachedInfoElem.Progress = nil
data, err := json.Marshal(c.msg.AttachedInfoElem)
if err != nil {
Expand Down
57 changes: 34 additions & 23 deletions internal/conversation_msg/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,11 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,

res, err := c.file.UploadFile(ctx, &file.UploadFileReq{
//PutID: s.ClientMsgID,
Filepath: sourcePath,
Name: c.fileName("picture", s.ClientMsgID) + filepath.Ext(sourcePath),
Cause: "msg-picture",
ContentType: s.PictureElem.SourcePicture.Type,
Filepath: sourcePath,
Uuid: s.PictureElem.SourcePicture.UUID,
Name: c.fileName("picture", s.ClientMsgID) + filepath.Ext(sourcePath),
Cause: "msg-picture",
}, NewUploadFileCallback(ctx, callback.OnProgress, s, lc.ConversationID, c.db))
if err != nil {
c.updateMsgStatusAndTriggerConversation(ctx, s.ClientMsgID, "", s.CreateTime, constant.MsgStatusSendFailed, s, lc)
Expand Down Expand Up @@ -461,9 +463,11 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,
// log.Info("", "file", sourcePath, delFile)

res, err := c.file.UploadFile(ctx, &file.UploadFileReq{
Filepath: sourcePath,
Name: c.fileName("voice", s.ClientMsgID) + filepath.Ext(sourcePath),
Cause: "msg-voice",
ContentType: s.SoundElem.SoundType,
Filepath: sourcePath,
Uuid: s.SoundElem.UUID,
Name: c.fileName("voice", s.ClientMsgID) + filepath.Ext(sourcePath),
Cause: "msg-voice",
}, NewUploadFileCallback(ctx, callback.OnProgress, s, lc.ConversationID, c.db))
if err != nil {
c.updateMsgStatusAndTriggerConversation(ctx, s.ClientMsgID, "", s.CreateTime, constant.MsgStatusSendFailed, s, lc)
Expand Down Expand Up @@ -494,27 +498,33 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,
var wg sync.WaitGroup
wg.Add(2)
var putErrs [2]error
go func() {
defer wg.Done()
snapRes, err := c.file.UploadFile(ctx, &file.UploadFileReq{
Filepath: snapPath,
Name: c.fileName("videoSnapshot", s.ClientMsgID) + filepath.Ext(snapPath),
Cause: "msg-video-snapshot",
}, nil)
if err != nil {
c.updateMsgStatusAndTriggerConversation(ctx, s.ClientMsgID, "", s.CreateTime, constant.MsgStatusSendFailed, s, lc)
putErrs[0] = err
return
}
s.VideoElem.SnapshotURL = snapRes.URL
}()
if s.VideoElem.SnapshotUUID != "" {
go func() {
defer wg.Done()
snapRes, err := c.file.UploadFile(ctx, &file.UploadFileReq{
ContentType: s.VideoElem.SnapshotType,
Filepath: snapPath,
Uuid: s.VideoElem.SnapshotUUID,
Name: c.fileName("videoSnapshot", s.ClientMsgID) + filepath.Ext(snapPath),
Cause: "msg-video-snapshot",
}, nil)
if err != nil {
c.updateMsgStatusAndTriggerConversation(ctx, s.ClientMsgID, "", s.CreateTime, constant.MsgStatusSendFailed, s, lc)
putErrs[0] = err
return
}
s.VideoElem.SnapshotURL = snapRes.URL
}()
}

go func() {
defer wg.Done()
res, err := c.file.UploadFile(ctx, &file.UploadFileReq{
Filepath: videoPath,
Name: c.fileName("video", s.ClientMsgID) + filepath.Ext(videoPath),
Cause: "msg-video",
ContentType: s.VideoElem.VideoType,
Filepath: videoPath,
Uuid: s.VideoElem.VideoUUID,
Name: c.fileName("video", s.ClientMsgID) + filepath.Ext(videoPath),
Cause: "msg-video",
}, NewUploadFileCallback(ctx, callback.OnProgress, s, lc.ConversationID, c.db))
if err != nil {
c.updateMsgStatusAndTriggerConversation(ctx, s.ClientMsgID, "", s.CreateTime, constant.MsgStatusSendFailed, s, lc)
Expand All @@ -537,6 +547,7 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,
}
res, err := c.file.UploadFile(ctx, &file.UploadFileReq{
Filepath: s.FileElem.FilePath,
Uuid: s.FileElem.UUID,
Name: c.fileName("file", s.ClientMsgID) + filepath.Ext(s.FileElem.FilePath),
Cause: "msg-file",
}, NewUploadFileCallback(ctx, callback.OnProgress, s, lc.ConversationID, c.db))
Expand Down
Loading

0 comments on commit f80f072

Please sign in to comment.