diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index a165d8a77c..7904dc74e8 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -42,21 +42,23 @@ const ( ) var ( - errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") + errNilZkClientConn = perrors.New("zookeeper client{conn} is nil") errNilChildren = perrors.Errorf("has none children") errNilNode = perrors.Errorf("node does not exist") ) // ZookeeperClient ... type ZookeeperClient struct { - name string - ZkAddrs []string - sync.RWMutex // for conn - Conn *zk.Conn - Timeout time.Duration - exit chan struct{} - Wait sync.WaitGroup - eventRegistry map[string][]*chan struct{} + name string + ZkAddrs []string + sync.RWMutex // for conn + Conn *zk.Conn + Timeout time.Duration + exit chan struct{} + Wait sync.WaitGroup + + eventRegistry map[string][]*chan struct{} + eventRegistryLock sync.RWMutex } // StateToString ... @@ -114,12 +116,11 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { var ( err error ) - opions := &Options{} + options := &Options{} for _, opt := range opts { - opt(opions) + opt(options) } connected := false - err = nil lock := container.ZkClientLock() url := container.GetUrl() @@ -128,7 +129,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { defer lock.Unlock() if container.ZkClient() == nil { - //in dubbo ,every registry only connect one node ,so this is []string{r.Address} + // in dubbo, every registry only connect one node, so this is []string{r.Address} timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", @@ -136,10 +137,10 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) } zkAddresses := strings.Split(url.Location, ",") - newClient, err := newZookeeperClient(opions.zkName, zkAddresses, timeout) + newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout) if err != nil { logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}", - opions.zkName, url.Location, timeout.String(), err) + options.zkName, url.Location, timeout.String(), err) return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) } container.SetZkClient(newClient) @@ -157,8 +158,8 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { } if connected { - logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location) - container.WaitGroup().Add(1) //zk client start successful, then registry wg +1 + logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location) + container.WaitGroup().Add(1) // zk client start successful, then registry wg +1 } return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) @@ -214,14 +215,14 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) eventRegistry: make(map[string][]*chan struct{}), } - opions := &Options{} + options := &Options{} for _, opt := range opts { - opt(opions) + opt(options) } // connect to zookeeper - if opions.ts != nil { - ts = opions.ts + if options.ts != nil { + ts = options.ts } else { ts, err = zk.StartTestCluster(1, nil, nil) if err != nil { @@ -229,16 +230,10 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) } } - //callbackChan := make(chan zk.Event) - //f := func(event zk.Event) { - // callbackChan <- event - //} - z.Conn, event, err = ts.ConnectWithOptions(timeout) if err != nil { return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect") } - //z.wait.Add(1) return ts, z, event, nil } @@ -255,11 +250,10 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name) }() -LOOP: for { select { case <-z.exit: - break LOOP + return case event = <-session: logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err) @@ -274,11 +268,10 @@ LOOP: if conn != nil { conn.Close() } - - break LOOP + return case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path) - z.RLock() + z.eventRegistryLock.RLock() for p, a := range z.eventRegistry { if strings.HasPrefix(p, event.Path) { logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener", @@ -288,16 +281,18 @@ LOOP: } } } - z.RUnlock() + z.eventRegistryLock.RUnlock() case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession): if state == (int)(zk.StateHasSession) { continue } + z.eventRegistryLock.RLock() if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) { for _, e := range a { *e <- struct{}{} } } + z.eventRegistryLock.RUnlock() } state = (int)(event.State) } @@ -310,13 +305,12 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { return } - z.Lock() + z.eventRegistryLock.Lock() + defer z.eventRegistryLock.Unlock() a := z.eventRegistry[zkPath] a = append(a, event) - z.eventRegistry[zkPath] = a logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event) - z.Unlock() } // UnregisterEvent ... @@ -324,16 +318,16 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return } - z.Lock() - defer z.Unlock() + + z.eventRegistryLock.Lock() + defer z.eventRegistryLock.Unlock() infoList, ok := z.eventRegistry[zkPath] if !ok { return } for i, e := range infoList { if e == event { - arr := infoList - infoList = append(arr[:i], arr[i+1:]...) + infoList = append(infoList[:i], infoList[i+1:]...) logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event) } } @@ -393,11 +387,11 @@ func (z *ZookeeperClient) Close() { z.Conn = nil z.Unlock() if conn != nil { - logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID()) + logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID()) conn.Close() } - logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) + logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } // Create will create the node recursively, which means that if the parent node is absent, @@ -428,9 +422,9 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { if err != nil { if err == zk.ErrNodeExists { - logger.Debugf("zk.create(\"%s\") exists\n", tmpPath) + logger.Debugf("zk.create(\"%s\") exists", tmpPath) } else { - logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err)) + logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err)) return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath) } } @@ -441,11 +435,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { // Delete ... func (z *ZookeeperClient) Delete(basePath string) error { - var ( - err error - ) - - err = errNilZkClientConn + err := errNilZkClientConn conn := z.getConn() if conn != nil { err = conn.Delete(basePath, -1) @@ -458,25 +448,22 @@ func (z *ZookeeperClient) Delete(basePath string) error { func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) { var ( err error - data []byte zkPath string tmpPath string ) err = errNilZkClientConn - data = []byte("") zkPath = path.Join(basePath) + "/" + node conn := z.getConn() if conn != nil { - tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } - //if err != nil && err != zk.ErrNodeExists { if err != nil { - logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err)) + logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err)) return zkPath, perrors.WithStack(err) } - logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath) + logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath) return tmpPath, nil } @@ -501,11 +488,11 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath) if err != nil && err != zk.ErrNodeExists { - logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n", + logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)", z.name, basePath, string(data), err) return "", perrors.WithStack(err) } - logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath) + logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath) return tmpPath, nil } diff --git a/remoting/zookeeper/client_test.go b/remoting/zookeeper/client_test.go index cb41eb326b..34741700ca 100644 --- a/remoting/zookeeper/client_test.go +++ b/remoting/zookeeper/client_test.go @@ -18,7 +18,6 @@ package zookeeper import ( - "fmt" "testing" "time" ) @@ -28,6 +27,10 @@ import ( "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common/logger" +) + func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) { for _, state := range expectedStates { for { @@ -35,7 +38,7 @@ func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk. if !ok { t.Fatalf("unexpected channel close for %s", source) } - fmt.Println(event) + logger.Debug(event) if event.Type != zk.EventSession { continue } @@ -87,9 +90,10 @@ func Test_newMockZookeeperClient(t *testing.T) { } func TestCreate(t *testing.T) { - ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) defer ts.Stop() - err := z.Create("test1/test2/test3/test4") + err = z.Create("test1/test2/test3/test4") assert.NoError(t, err) states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} @@ -97,22 +101,24 @@ func TestCreate(t *testing.T) { } func TestCreateDelete(t *testing.T) { - ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) defer ts.Stop() states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} verifyEventStateOrder(t, event, states, "event channel") - err := z.Create("/test1/test2/test3/test4") + err = z.Create("/test1/test2/test3/test4") + assert.NoError(t, err) + err = z.Delete("/test1/test2/test3/test4") assert.NoError(t, err) - err2 := z.Delete("/test1/test2/test3/test4") - assert.NoError(t, err2) - //verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel") + // verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel") } func TestRegisterTemp(t *testing.T) { - ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) defer ts.Stop() - err := z.Create("/test1/test2/test3") + err = z.Create("/test1/test2/test3") assert.NoError(t, err) tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4") @@ -123,9 +129,10 @@ func TestRegisterTemp(t *testing.T) { } func TestRegisterTempSeq(t *testing.T) { - ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second) + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) defer ts.Stop() - err := z.Create("/test1/test2/test3") + err = z.Create("/test1/test2/test3") assert.NoError(t, err) tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test")) assert.NoError(t, err)