Skip to content

Commit

Permalink
Merge pull request #251 from hcraM41/master
Browse files Browse the repository at this point in the history
notify opt
  • Loading branch information
aceld committed Jul 17, 2023
2 parents df441c0 + 30ad5ac commit 7a08f05
Showing 1 changed file with 38 additions and 70 deletions.
108 changes: 38 additions & 70 deletions znotify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,62 @@ package znotify
import (
"errors"
"fmt"
"sync"
"strconv"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/zutils"
)

// ConnIDMap Establish a structure that maps user-defined IDs to connections
// Map will have concurrent access issues, as well as looping through large amounts of data
// Currently, a map structure is used to store the data, but it may not be the best choice
// (建立一个用户自定义ID和连接映射的结构
// map会存在 并发问题,大量数据循环读取问题
// 暂时先用map结构存储,但是应该不是最好的选择,抛砖引玉)
type ConnIDMap map[uint64]ziface.IConnection
// Use the map structure of shard and lock storage to minimize lock granularity and lock holding time
// 建立一个用户自定义ID和连接映射的结构
// map会存在并发问题,大量数据循环读取问题
// 使用分片加锁的map结构存储,尽量减少锁的粒度和锁的持有时间

type notify struct {
cimap ConnIDMap
sync.RWMutex
connIdMap zutils.ShardLockMaps
}

func NewZNotify() ziface.Inotify {
return &notify{
cimap: make(map[uint64]ziface.IConnection, 5000),
connIdMap: zutils.NewShardLockMaps(),
}
}

func (n *notify) genConnStrId(connID uint64) string {
strConnId := strconv.FormatUint(connID, 10)
return strConnId
}

func (n *notify) ConnNums() int {
return len(n.cimap)
return n.connIdMap.Count()
}

func (n *notify) HasIdConn(Id uint64) bool {
n.RLock()
defer n.RUnlock()
_, ok := n.cimap[Id]
return ok
strId := n.genConnStrId(Id)
return n.connIdMap.Has(strId)
}

func (n *notify) SetNotifyID(Id uint64, conn ziface.IConnection) {
n.Lock()
defer n.Unlock()
n.cimap[Id] = conn
strId := n.genConnStrId(Id)
n.connIdMap.Set(strId, conn)
}

func (n *notify) GetNotifyByID(Id uint64) (ziface.IConnection, error) {
n.RLock()
defer n.RUnlock()
Conn, ok := n.cimap[Id]

strId := n.genConnStrId(Id)
Conn, ok := n.connIdMap.Get(strId)
if !ok {
return nil, errors.New(" Not Find UserId")
}
return Conn, nil
return Conn.(ziface.IConnection), nil
}

func (n *notify) DelNotifyByID(Id uint64) {
n.RLock()
defer n.RUnlock()
delete(n.cimap, Id)
strId := n.genConnStrId(Id)
n.connIdMap.Remove(strId)
}

func (n *notify) NotifyToConnByID(Id uint64, MsgId uint32, data []byte) error {
Expand All @@ -75,49 +75,16 @@ func (n *notify) NotifyToConnByID(Id uint64, MsgId uint32, data []byte) error {
}

func (n *notify) NotifyAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
for Id, v := range n.cimap {
err := v.SendMsg(MsgId, data)

n.connIdMap.IterCb(func(key string, v interface{}) {
conn, _ := v.(ziface.IConnection)
err := conn.SendMsg(MsgId, data)
if err != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, err)
zlog.Ins().ErrorF("Notify to %s err:%s \n", key, err)
}
}
return nil
}
})

func (n *notify) notifyAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
var err error
for Id, v := range n.cimap {
er := v.SendMsg(MsgId, data)
if er != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, er)
err = er
}
}
return err
}

// In extreme cases where many people are joining and sending messages at the same time, and the map needs to be released as soon as possible,
// but there are currently many problems and it is not used
// (极端情况 同时加入和发送的人很多需要尽快释放map的情况, 目前问题很多不采用)
func (n *notify) notifyAll2(MsgId uint32, data []byte) error {
conns := make([]ziface.IConnection, 0, len(n.cimap))
n.RLock()
for _, v := range n.cimap {
conns = append(conns, v)
}
n.RUnlock()

var err error
for i := 0; i < len(conns); i++ {
if er := conns[i].SendMsg(MsgId, data); er != nil {
err = er
}
}
return err
return nil
}

func (n *notify) NotifyBuffToConnByID(Id uint64, MsgId uint32, data []byte) error {
Expand All @@ -134,13 +101,14 @@ func (n *notify) NotifyBuffToConnByID(Id uint64, MsgId uint32, data []byte) erro
}

func (n *notify) NotifyBuffAll(MsgId uint32, data []byte) error {
n.RLock()
defer n.RUnlock()
for Id, v := range n.cimap {
err := v.SendBuffMsg(MsgId, data)

n.connIdMap.IterCb(func(key string, v interface{}) {
conn, _ := v.(ziface.IConnection)
err := conn.SendBuffMsg(MsgId, data)
if err != nil {
zlog.Ins().ErrorF("Notify to %d err:%s \n", Id, err)
zlog.Ins().ErrorF("Notify to %s err:%s \n", key, err)
}
}
})

return nil
}

0 comments on commit 7a08f05

Please sign in to comment.