Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notify opt #251

Merged
merged 2 commits into from
Jul 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 38 additions & 70 deletions znotify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,62 @@ package znotify
import (
"errors"
"fmt"
"sync"
"strconv"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/zutils"
)

// ConnIDMap Establish a structure that maps user-defined IDs to connections
// Map will have concurrent access issues, as well as looping through large amounts of data
// Currently, a map structure is used to store the data, but it may not be the best choice
// (建立一个用户自定义ID和连接映射的结构
// map会存在 并发问题,大量数据循环读取问题
// 暂时先用map结构存储,但是应该不是最好的选择,抛砖引玉)
type ConnIDMap map[uint64]ziface.IConnection
// Use the map structure of shard and lock storage to minimize lock granularity and lock holding time
// 建立一个用户自定义ID和连接映射的结构
// map会存在并发问题,大量数据循环读取问题
// 使用分片加锁的map结构存储,尽量减少锁的粒度和锁的持有时间

type notify struct {
cimap ConnIDMap
sync.RWMutex
connIdMap zutils.ShardLockMaps
}

func NewZNotify() ziface.Inotify {
return &notify{
cimap: make(map[uint64]ziface.IConnection, 5000),
connIdMap: zutils.NewShardLockMaps(),
}
}

func (n *notify) genConnStrId(connID uint64) string {
strConnId := strconv.FormatUint(connID, 10)
return strConnId
}

func (n *notify) ConnNums() int {
return len(n.cimap)
return n.connIdMap.Count()
}

func (n *notify) HasIdConn(Id uint64) bool {
n.RLock()
defer n.RUnlock()
_, ok := n.cimap[Id]
return ok
strId := n.genConnStrId(Id)
return n.connIdMap.Has(strId)
}

func (n *notify) SetNotifyID(Id uint64, conn ziface.IConnection) {
n.Lock()
defer n.Unlock()
n.cimap[Id] = conn
strId := n.genConnStrId(Id)
n.connIdMap.Set(strId, conn)
}

func (n *notify) GetNotifyByID(Id uint64) (ziface.IConnection, error) {
n.RLock()
defer n.RUnlock()
Conn, ok := n.cimap[Id]

strId := n.genConnStrId(Id)
Conn, ok := n.connIdMap.Get(strId)
if !ok {
return nil, errors.New(" Not Find UserId")
}
return Conn, nil
return Conn.(ziface.IConnection), nil
}

func (n *notify) DelNotifyByID(Id uint64) {
n.RLock()
defer n.RUnlock()
delete(n.cimap, Id)
strId := n.genConnStrId(Id)
n.connIdMap.Remove(strId)
}

func (n *notify) NotifyToConnByID(Id uint64, MsgId uint32, data []byte) error {
Expand All @@ -75,49 +75,16 @@ func (n *notify) NotifyToConnByID(Id uint64, MsgId uint32, data []byte) error {
}

func (n *notify) NotifyAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
for Id, v := range n.cimap {
err := v.SendMsg(MsgId, data)

n.connIdMap.IterCb(func(key string, v interface{}) {
conn, _ := v.(ziface.IConnection)
err := conn.SendMsg(MsgId, data)
if err != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, err)
zlog.Ins().ErrorF("Notify to %s err:%s \n", key, err)
}
}
return nil
}
})

func (n *notify) notifyAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
var err error
for Id, v := range n.cimap {
er := v.SendMsg(MsgId, data)
if er != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, er)
err = er
}
}
return err
}

// In extreme cases where many people are joining and sending messages at the same time, and the map needs to be released as soon as possible,
// but there are currently many problems and it is not used
// (极端情况 同时加入和发送的人很多需要尽快释放map的情况, 目前问题很多不采用)
func (n *notify) notifyAll2(MsgId uint32, data []byte) error {
conns := make([]ziface.IConnection, 0, len(n.cimap))
n.RLock()
for _, v := range n.cimap {
conns = append(conns, v)
}
n.RUnlock()

var err error
for i := 0; i < len(conns); i++ {
if er := conns[i].SendMsg(MsgId, data); er != nil {
err = er
}
}
return err
return nil
}

func (n *notify) NotifyBuffToConnByID(Id uint64, MsgId uint32, data []byte) error {
Expand All @@ -134,13 +101,14 @@ func (n *notify) NotifyBuffToConnByID(Id uint64, MsgId uint32, data []byte) erro
}

func (n *notify) NotifyBuffAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
for Id, v := range n.cimap {
err := v.SendBuffMsg(MsgId, data)

n.connIdMap.IterCb(func(key string, v interface{}) {
conn, _ := v.(ziface.IConnection)
err := conn.SendBuffMsg(MsgId, data)
if err != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, err)
zlog.Ins().ErrorF("Notify to %s err:%s \n", key, err)
}
}
})

return nil
}