Skip to content

Commit

Permalink
presuretest code
Browse files Browse the repository at this point in the history
  • Loading branch information
wangchuxiao-dev committed Aug 1, 2023
1 parent b78f98d commit c53b4a0
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 289 deletions.
10 changes: 5 additions & 5 deletions testv3new/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package main

import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"open_im_sdk/pkg/utils"
"open_im_sdk/testv3new"
"open_im_sdk/testv3new/testcore"

"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
)

func main() {
if err := log.InitFromConfig("sdk.log", "sdk", 3, true, false, "", 2, 24); err != nil {
panic(err)
}
userID := "4844258055"
recvID := "4950983283"
userID := "523"
manager := testv3new.NewRegisterManager()
token, _ := manager.GetToken(userID)
ctx := testv3new.NewCtx(testcore.APIADDR, testcore.WSADDR, userID, token)
ctx := testv3new.NewCtx(testv3new.APIADDR, testv3new.WSADDR, userID, token)
baseCore := testcore.NewBaseCore(ctx, userID)
ctx = mcontext.SetOperationID(ctx, utils.OperationIDGenerator())
if err := baseCore.SendSingleMsg(ctx, recvID, 0); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions testv3new/testcore/config.go → testv3new/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package testcore
package testv3new

import (
"fmt"
Expand All @@ -9,12 +9,12 @@ import (

// system
var (
TESTIP = "203.56.175.233"
TESTIP = "59.36.173.89"
APIADDR = fmt.Sprintf("http://%v:10002", TESTIP)
WSADDR = fmt.Sprintf("ws://%v:10001", TESTIP)
SECRET = "tuoyun"
SECRET = "openIM123"

REGISTERADDR = APIADDR + constant.UserRegister
TOKENADDR = APIADDR + constant.GetUsersToken
PLATFORMID = constant.AndroidPlatformID
PLATFORMID = constant.LinuxPlatformID
)
27 changes: 16 additions & 11 deletions testv3new/pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package testv3new

import (
"fmt"
"github.com/OpenIMSDK/tools/log"
"open_im_sdk/testv3new/testcore"
"testing"
"time"

"github.com/OpenIMSDK/tools/log"
)

var (
Expand All @@ -14,15 +14,20 @@ var (

func init() {
pressureTestAttribute.InitWithFlag()
//pressureTestAttribute.recvUserIDs = []string{"6680650275"}
//pressureTestAttribute.sendUserIDs = []string{"8430973211", "4098531159", "3171794400"}
//pressureTestAttribute.messageNumber = 10
//pressureTestAttribute.timeInterval = 100

if err := log.InitFromConfig("sdk.log", "sdk", 4,
true, true, "./chat_log", 2, 24); err != nil {
true, false, "./chat_log", 2, 24); err != nil {
panic(err)
}
}

func TestPressureTester_PressureSendMsgs(t *testing.T) {
ParseFlag()
p := NewPressureTester(testcore.APIADDR, testcore.WSADDR)
p := NewPressureTester(APIADDR, WSADDR)
for i := 0; i < 10; i++ {
p.WithTimer(p.PressureSendMsgs2)(pressureTestAttribute.sendUserIDs, pressureTestAttribute.recvUserIDs, pressureTestAttribute.messageNumber, time.Duration(pressureTestAttribute.timeInterval)*time.Millisecond)
time.Sleep(time.Second)
Expand All @@ -32,7 +37,7 @@ func TestPressureTester_PressureSendMsgs(t *testing.T) {

func TestPressureTester_PressureSendGroupMsgs(t *testing.T) {
ParseFlag()
p := NewPressureTester(testcore.APIADDR, testcore.WSADDR)
p := NewPressureTester(APIADDR, WSADDR)
for i := 0; i < 10; i++ {
p.WithTimer(p.PressureSendGroupMsgs2)(pressureTestAttribute.sendUserIDs, pressureTestAttribute.groupIDs, pressureTestAttribute.messageNumber, time.Duration(pressureTestAttribute.timeInterval)*time.Millisecond)
time.Sleep(time.Second)
Expand All @@ -45,16 +50,16 @@ func TestPressureTester_Conversation(t *testing.T) {
for i := 1; i <= 1000; i++ {
recvUserIDs = append(recvUserIDs, fmt.Sprintf("register_test_%v", i))
}
p := NewPressureTester(testcore.APIADDR, testcore.WSADDR)
p := NewPressureTester(APIADDR, WSADDR)
p.WithTimer(p.PressureSendMsgs)(sendUserID, recvUserIDs, 1, 100*time.Millisecond)
}

func TestPressureTester_PressureSendMsgs2(t *testing.T) {
recvUserID := "5338610321"
recvUserID := "6680650275"
var sendUserIDs []string
for i := 1; i <= 1000; i++ {
sendUserIDs = append(sendUserIDs, fmt.Sprintf("register_test_%v", i))
for i := 1; i <= 100; i++ {
sendUserIDs = append(sendUserIDs, fmt.Sprintf("register_%v", i))
}
p := NewPressureTester(testcore.APIADDR, testcore.WSADDR)
p.WithTimer(p.PressureSendMsgs2)(sendUserIDs, []string{recvUserID}, 1, 100*time.Millisecond)
p := NewPressureTester(APIADDR, WSADDR)
p.WithTimer(p.PressureSendMsgs2)(sendUserIDs, []string{recvUserID}, 1000, 100*time.Millisecond)
}
143 changes: 80 additions & 63 deletions testv3new/pressure_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@ package testv3new
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/log"
"open_im_sdk/pkg/ccontext"
"open_im_sdk/pkg/constant"
"open_im_sdk/sdk_struct"
"open_im_sdk/testv3new/testcore"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/OpenIMSDK/tools/log"
)

type PressureTester struct {
sendLightWeightSDKCores map[string]*testcore.BaseCore
recvLightWeightSDKCores map[string]*testcore.BaseCore

registerManager RegisterManager
registerManager TestUserManager
apiAddr string
wsAddr string
}
Expand Down Expand Up @@ -46,28 +48,48 @@ func NewCtx(apiAddr, wsAddr, userID, token string) context.Context {
}})
}

func (p *PressureTester) add(a, b int) int {
return a + b
}

func (p *PressureTester) initCores(m *map[string]*testcore.BaseCore, userIDs []string) {
for _, userID := range userIDs {
token, err := p.registerManager.GetToken(userID)
if err != nil {
log.ZError(context.Background(), "get token error", err, "userID", userID)
continue
}
mV := *m
mV[userID] = testcore.NewBaseCore(NewCtx(p.apiAddr, p.wsAddr, userID, token), userID)
func (p *PressureTester) initCores(m map[string]*testcore.BaseCore, userIDs []string) {
var wg sync.WaitGroup
var index int64
var mutex sync.Mutex
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
idx := int(atomic.AddInt64(&index, 1) - 1)
if idx >= len(userIDs) {
return
}
userID := userIDs[idx]
token, err := p.registerManager.GetToken(ctx, userID)
if err != nil {
log.ZError(context.Background(), "get token error", err, "userID", userID)
continue
}
mutex.Lock()
m[userID] = testcore.NewBaseCore(NewCtx(p.apiAddr, p.wsAddr, userID, token), userID)
mutex.Unlock()
}
}()
}
wg.Wait()
//for _, userID := range userIDs {
// token, err := p.registerManager.GetToken(userID)
// if err != nil {
// log.ZError(context.Background(), "get token error", err, "userID", userID)
// continue
// }
// m[userID] = testcore.NewBaseCore(NewCtx(p.apiAddr, p.wsAddr, userID, token), userID)
//}
}

func (p *PressureTester) InitSendCores(userIDs []string) {
p.initCores(&p.sendLightWeightSDKCores, userIDs)
p.initCores(p.sendLightWeightSDKCores, userIDs)
}

func (p *PressureTester) InitRecvCores(userIDs []string) {
p.initCores(&p.recvLightWeightSDKCores, userIDs)
p.initCores(p.recvLightWeightSDKCores, userIDs)
}

// PressureSendMsgs user single chat send msg pressure test
Expand All @@ -77,11 +99,10 @@ func (p *PressureTester) PressureSendMsgs(sendUserID string, recvUserIDs []strin

sendCore := p.sendLightWeightSDKCores[sendUserID]

// fmt.Println("\nmsgNum ==> ", msgNum)
var wg sync.WaitGroup
wg.Add(len(recvUserIDs))
for idx, recvUserID := range recvUserIDs {
go func(idx int) {
for _, recvUserID := range recvUserIDs {
go func(recvUserID string) {
defer wg.Done() // Mark this goroutine as done when finished

// Create a new context for each goroutine to avoid shared state
Expand Down Expand Up @@ -114,64 +135,61 @@ func (p *PressureTester) PressureSendMsgs(sendUserID string, recvUserIDs []strin
log.ZInfo(ctx, "recv msg", "recv num", count, "recvUserID", recvUserID, "recv status", count == msgNum)
}
}
}(idx)
}(recvUserID)
}
wg.Wait()
}

// PressureSendMsgs2 user single chat send msg pressure test
func (p *PressureTester) PressureSendMsgs2(sendUserIDs []string, recvUserIDs []string, msgNum int, duration time.Duration) {
func (p *PressureTester) PressureSendMsgs2(ctx context.Context, sendUserIDs []string, recvUserIDs []string, msgNum int, duration time.Duration) {
p.WithTimer(p.InitSendCores)(sendUserIDs)
p.WithTimer(p.InitRecvCores)(recvUserIDs)

//msgChan := make(chan struct{})

//for _, recvUserID := range recvUserIDs {
// recvCore := p.recvLightWeightSDKCores[recvUserID]
// if recvCore == nil {
// continue
// }
// go func(baseCode *testcore.BaseCore) {
// if recvCore != nil {
// time.Sleep(duration)
// recvMap := recvCore.GetRecvMap()
// if recvMap != nil {
// count := 0
// for range sendUserMsgChan {
// count++
// }
// recvMap[sendUserID+"_"+recvUserID] = count
// fmt.Println(fmt.Sprintf("recvUserID: %v ==> recv msg num: %d %v", recvUserID, count, count == msgNum))
// log.ZInfo(ctx, "recv msg", "recv num", count, "recvUserID", recvUserID, "recv status", count == msgNum)
// }
// }
// }(recvCore)
//}
var wg sync.WaitGroup
msgChan := make(chan struct{})

for _, sendUserID := range sendUserIDs {
ctx, _ := InitContext(sendUserID)
// ctx, _ := InitContext(sendUserID)
sendCore := p.sendLightWeightSDKCores[sendUserID]

// fmt.Println("\nmsgNum ==> ", msgNum)
for _, recvUserID := range recvUserIDs {
wg.Add(1)
go func(sendUserID, recvUserID string) {
defer wg.Done()
sendUserMsgChan := make(chan int)

// Send messages concurrently
var sendWG sync.WaitGroup
sendWG.Add(msgNum)
if sendCore == nil {
log.ZInfo(ctx, "sendCore is nil", "sendUserID", sendUserID)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
for _, recvUserID := range recvUserIDs {
for i := 1; i <= msgNum; i++ {
go func(i int) {
defer sendWG.Done()
p.WithTimer(sendCore.SendSingleMsg)(ctx, recvUserID, i)
}(i)
p.WithTimer(sendCore.SendSingleMsg)(ctx, recvUserID, i)
time.Sleep(duration)
}
sendWG.Wait()

// Goroutine for receiving messages
go func() {
recvCore := p.recvLightWeightSDKCores[recvUserID]
if recvCore != nil {
time.Sleep(100 * time.Millisecond)
recvMap := recvCore.GetRecvMap()
if recvMap != nil {
count := 0
for range sendUserMsgChan {
count++
}
recvMap[sendUserID+"_"+recvUserID] = count
fmt.Println(fmt.Sprintf("recvUserID: %v ==> recv msg num: %d %v", recvUserID, count, count == msgNum))
log.ZInfo(ctx, "recv msg", "recv num", count, "recvUserID", recvUserID, "recv status", count == msgNum)
}
}
}()
}(sendUserID, recvUserID)
}
}
}()
}

wg.Wait()
close(msgChan)
//close(msgChan)
}

// PressureSendGroupMsgs group chat send msg pressure test
Expand All @@ -183,7 +201,6 @@ func (p *PressureTester) PressureSendGroupMsgs(sendUserIDs []string, groupID str
log.ZError(context.Background(), "get group members info failed", err, "userIDs", sendUserIDs)
return
}

startTime := time.Now().UnixNano()
p.InitSendCores(sendUserIDs)
endTime := time.Now().UnixNano()
Expand Down
Loading

0 comments on commit c53b4a0

Please sign in to comment.