From ffcc5a34891a18f3e316d3cf6e93df9883e0ecbb Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Fri, 6 Dec 2024 16:05:25 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=A3=80=E6=9F=A5=202.=E5=BA=9F=E5=BC=83SetGoRoutineN?= =?UTF-8?q?um=E6=8E=A5=E5=8F=A3=203.=E9=87=8A=E6=94=BEModule=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/parsecfg.go | 11 +++++++---- service/module.go | 28 ++++++++++++++++------------ service/service.go | 6 +++++- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 2d7e931..b62fc3a 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -417,13 +417,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error { return nil } -func (cls *Cluster) parseLocalCfg() { +func (cls *Cluster) parseLocalCfg() error{ rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet) cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo - for _, serviceName := range cls.localNodeInfo.ServiceList { splitServiceName := strings.Split(serviceName, ":") if len(splitServiceName) == 2 { @@ -440,8 +439,13 @@ func (cls *Cluster) parseLocalCfg() { cls.mapServiceNode[serviceName] = make(map[string]struct{}) } + if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok { + return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId) + } cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} } + + return nil } func (cls *Cluster) IsNatsMode() bool { @@ -474,8 +478,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error { } //本地配置服务加到全局map信息中 - cls.parseLocalCfg() - return nil + return cls.parseLocalCfg() } func (cls *Cluster) IsConfigService(serviceName string) bool { diff --git a/service/module.go b/service/module.go index 0152473..33ef4d8 100644 --- a/service/module.go +++ b/service/module.go @@ -11,6 +11,7 @@ import ( "github.com/duanhf2012/origin/v2/log" rpcHandle "github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/util/timer" + "slices" ) const InitModuleId = 1e9 @@ -46,7 +47,7 @@ type Module struct { moduleName string //模块名称 parent IModule //父亲 self IModule //自己 - child map[uint32]IModule //孩子们 + child []IModule //孩子们 mapActiveTimer map[timer.ITimer]struct{} mapActiveIdTimer map[uint64]timer.ITimer dispatcher *timer.Dispatcher //timer @@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) { pAddModule.moduleId = m.NewModuleId() } - if m.child == nil { - m.child = map[uint32]IModule{} - } - _, ok := m.child[module.GetModuleId()] + _,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] if ok == true { return 0, fmt.Errorf("exists module id %d", module.GetModuleId()) } @@ -109,24 +107,27 @@ func (m *Module) AddModule(module IModule) (uint32, error) { pAddModule.eventHandler = event.NewEventHandler() pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor()) pAddModule.IConcurrent = m.IConcurrent + + m.child = append(m.child,module) + m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module + err := module.OnInit() if err != nil { + delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId()) + m.child = m.child[:len(m.child)-1] + log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err)) return 0, err } - m.child[module.GetModuleId()] = module - m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module - log.Debug("Add module " + module.GetModuleName() + " completed") return module.GetModuleId(), nil } func (m *Module) ReleaseModule(moduleId uint32) { pModule := m.GetModule(moduleId).getBaseModule().(*Module) - //释放子孙 - for id := range pModule.child { - m.ReleaseModule(id) + for i:=len(pModule.child)-1; i>=0; i-- { + m.ReleaseModule(pModule.child[i].GetModuleId()) } pModule.self.OnRelease() @@ -140,7 +141,10 @@ func (m *Module) ReleaseModule(moduleId uint32) { t.Cancel() } - delete(m.child, moduleId) + m.child = slices.DeleteFunc(m.child, func(module IModule) bool { + return module.GetModuleId() == moduleId + }) + delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId) //清理被删除的Module diff --git a/service/service.go b/service/service.go index 44cee93..cfdbcc3 100644 --- a/service/service.go +++ b/service/service.go @@ -264,10 +264,13 @@ func (s *Service) Release() { } }() + for i:=len(s.child)-1; i>=0; i-- { + s.ReleaseModule(s.child[i].GetModuleId()) + } + if atomic.AddInt32(&s.isRelease, -1) == -1 { s.self.OnRelease() } - } func (s *Service) OnRelease() { @@ -432,6 +435,7 @@ func (s *Service) SetEventChannelNum(num int) { } } +// Deprecated: replace it with the OpenConcurrent function func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 if s.startStatus == true || s.profiler != nil {