Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the runtime of TestChildWatch and TestSetWatchers #88

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ $(ZK):
tar -zxf $(ZK).tar.gz
rm $(ZK).tar.gz

.PHONY: zookeeper
zookeeper: $(ZK)
# we link to a standard directory path so then the tests dont need to find based on version
# in the test code. this allows backward compatable testing.
ln -s $(ZK) zookeeper
rm -f $@
ln -s $(ZK) $@

.PHONY: setup
setup: zookeeper
Expand Down
46 changes: 37 additions & 9 deletions server_help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -53,7 +54,7 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl
}

tmpPath, err := ioutil.TempDir("", "gozk")
requireNoError(t, err, "failed to create tmp dir for test server setup")
requireNoErrorf(t, err, "failed to create tmp dir for test server setup")

success := false
startPort := int(rand.Int31n(6000) + 10000)
Expand All @@ -67,7 +68,7 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl

for serverN := 0; serverN < size; serverN++ {
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN+1))
requireNoError(t, os.Mkdir(srvPath, 0700), "failed to make server path")
requireNoErrorf(t, os.Mkdir(srvPath, 0700), "failed to make server path")

port := startPort + serverN*3
cfg := ServerConfig{
Expand All @@ -88,20 +89,20 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl

cfgPath := filepath.Join(srvPath, _testConfigName)
fi, err := os.Create(cfgPath)
requireNoError(t, err)
requireNoErrorf(t, err)

requireNoError(t, cfg.Marshall(fi))
requireNoErrorf(t, cfg.Marshall(fi))
fi.Close()

fi, err = os.Create(filepath.Join(srvPath, _testMyIDFileName))
requireNoError(t, err)
requireNoErrorf(t, err)

_, err = fmt.Fprintf(fi, "%d\n", serverN+1)
fi.Close()
requireNoError(t, err)
requireNoErrorf(t, err)

srv, err := NewIntegrationTestServer(t, cfgPath, stdout, stderr)
requireNoError(t, err)
requireNoErrorf(t, err)

if err := srv.Start(); err != nil {
return nil, err
Expand Down Expand Up @@ -251,9 +252,36 @@ func (tc *TestCluster) StopAllServers() error {
return nil
}

func requireNoError(t *testing.T, err error, msgAndArgs ...interface{}) {
func requireNoErrorf(t *testing.T, err error, msgAndArgs ...interface{}) {
if err != nil {
t.Helper()
t.Logf("received unexpected error: %v", err)
t.Fatal(msgAndArgs...)
t.Fatalf(msgAndArgs[0].(string), msgAndArgs[1:]...)
}
}

func RequireMinimumZkVersion(t *testing.T, minimum string) {
if val, ok := os.LookupEnv("ZK_VERSION"); ok {
split := func(v string) (parts []int) {
for _, s := range strings.Split(minimum, ".") {
i, err := strconv.Atoi(s)
if err != nil {
t.Fatalf("invalid version segment: %q", s)
}
parts = append(parts, i)
}
return parts
}

minimumV, actualV := split(minimum), split(val)
for i, p := range minimumV {
if actualV[i] < p {
if !strings.HasPrefix(val, minimum) {
t.Skipf("running with zookeeper that does not support this api (requires at least %s)", minimum)
}
}
}
} else {
t.Skip("did not detect zk_version from env. skipping test")
}
}
127 changes: 64 additions & 63 deletions zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package zk

import (
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -187,27 +186,22 @@ func TestCreateContainer(t *testing.T) {
}

func TestIncrementalReconfig(t *testing.T) {
if val, ok := os.LookupEnv("zk_version"); ok {
if !strings.HasPrefix(val, "3.5") {
t.Skip("running with zookeeper that does not support this api")
}
} else {
t.Skip("did not detect zk_version from env. skipping reconfig test")
}
RequireMinimumZkVersion(t, "3.5")

ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
requireNoError(t, err, "failed to setup test cluster")
requireNoErrorf(t, err, "failed to setup test cluster")
defer ts.Stop()

// start and add a new server.
tmpPath, err := ioutil.TempDir("", "gozk")
requireNoError(t, err, "failed to create tmp dir for test server setup")
requireNoErrorf(t, err, "failed to create tmp dir for test server setup")
defer os.RemoveAll(tmpPath)

startPort := int(rand.Int31n(6000) + 10000)

srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv4"))
if err := os.Mkdir(srvPath, 0700); err != nil {
requireNoError(t, err, "failed to make server path")
requireNoErrorf(t, err, "failed to make server path")
}
testSrvConfig := ServerConfigServer{
ID: 4,
Expand All @@ -224,35 +218,35 @@ func TestIncrementalReconfig(t *testing.T) {
// TODO: clean all this server creating up to a better helper method
cfgPath := filepath.Join(srvPath, _testConfigName)
fi, err := os.Create(cfgPath)
requireNoError(t, err)
requireNoErrorf(t, err)

requireNoError(t, cfg.Marshall(fi))
requireNoErrorf(t, cfg.Marshall(fi))
fi.Close()

fi, err = os.Create(filepath.Join(srvPath, _testMyIDFileName))
requireNoError(t, err)
requireNoErrorf(t, err)

_, err = fmt.Fprintln(fi, "4")
fi.Close()
requireNoError(t, err)
requireNoErrorf(t, err)

testServer, err := NewIntegrationTestServer(t, cfgPath, nil, nil)
requireNoError(t, err)
requireNoError(t, testServer.Start())
requireNoErrorf(t, err)
requireNoErrorf(t, testServer.Start())
defer testServer.Stop()

zk, events, err := ts.ConnectAll()
requireNoError(t, err, "failed to connect to cluster")
requireNoErrorf(t, err, "failed to connect to cluster")
defer zk.Close()

err = zk.AddAuth("digest", []byte("super:test"))
requireNoError(t, err, "failed to auth to cluster")
requireNoErrorf(t, err, "failed to auth to cluster")

waitCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err = waitForSession(waitCtx, events)
requireNoError(t, err, "failed to wail for session")
requireNoErrorf(t, err, "failed to wail for session")

_, _, err = zk.Get("/zookeeper/config")
if err != nil {
Expand All @@ -268,7 +262,7 @@ func TestIncrementalReconfig(t *testing.T) {
if err != nil && err == ErrConnectionClosed {
t.Log("conneciton closed is fine since the cluster re-elects and we dont reconnect")
} else {
requireNoError(t, err, "failed to remove node from cluster")
requireNoErrorf(t, err, "failed to remove node from cluster")
}

// add node a new 4th node
Expand All @@ -277,36 +271,30 @@ func TestIncrementalReconfig(t *testing.T) {
if err != nil && err == ErrConnectionClosed {
t.Log("conneciton closed is fine since the cluster re-elects and we dont reconnect")
} else {
requireNoError(t, err, "failed to add new server to cluster")
requireNoErrorf(t, err, "failed to add new server to cluster")
}
}

func TestReconfig(t *testing.T) {
if val, ok := os.LookupEnv("zk_version"); ok {
if !strings.HasPrefix(val, "3.5") {
t.Skip("running with zookeeper that does not support this api")
}
} else {
t.Skip("did not detect zk_version from env. skipping reconfig test")
}
RequireMinimumZkVersion(t, "3.5")

// This test enures we can do an non-incremental reconfig
ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
requireNoError(t, err, "failed to setup test cluster")
requireNoErrorf(t, err, "failed to setup test cluster")
defer ts.Stop()

zk, events, err := ts.ConnectAll()
requireNoError(t, err, "failed to connect to cluster")
requireNoErrorf(t, err, "failed to connect to cluster")
defer zk.Close()

err = zk.AddAuth("digest", []byte("super:test"))
requireNoError(t, err, "failed to auth to cluster")
requireNoErrorf(t, err, "failed to auth to cluster")

waitCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err = waitForSession(waitCtx, events)
requireNoError(t, err, "failed to wail for session")
requireNoErrorf(t, err, "failed to wail for session")

_, _, err = zk.Get("/zookeeper/config")
if err != nil {
Expand All @@ -320,7 +308,7 @@ func TestReconfig(t *testing.T) {
}

_, err = zk.Reconfig(s, -1)
requireNoError(t, err, "failed to reconfig cluster")
requireNoErrorf(t, err, "failed to reconfig cluster")

// reconfig to all the hosts again
s = []string{}
Expand All @@ -329,7 +317,7 @@ func TestReconfig(t *testing.T) {
}

_, err = zk.Reconfig(s, -1)
requireNoError(t, err, "failed to reconfig cluster")
requireNoErrorf(t, err, "failed to reconfig cluster")
}

func TestOpsAfterCloseDontDeadlock(t *testing.T) {
Expand Down Expand Up @@ -604,6 +592,7 @@ func TestAuth(t *testing.T) {
}
}

// Tests that we correctly handle a response larger than the default buffer size
func TestChildren(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand All @@ -622,38 +611,44 @@ func TestChildren(t *testing.T) {
}
}

deleteNode("/gozk-test-big")
testNode := "/gozk-test-big"
deleteNode(testNode)

if path, err := zk.Create("/gozk-test-big", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
if _, err := zk.Create(testNode, nil, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test-big" {
t.Fatalf("Create returned different path '%s' != '/gozk-test-big'", path)
}

rb := make([]byte, 1000)
hb := make([]byte, 2000)
prefix := []byte("/gozk-test-big/")
for i := 0; i < 10000; i++ {
_, err := rand.Read(rb)
if err != nil {
t.Fatal("Cannot create random znode name")
}
hex.Encode(hb, rb)
const (
nodesToCreate = 100
// By creating many nodes with long names, the response from the Children call should be significantly longer
// than the buffer size, forcing recvLoop to allocate a bigger buffer
nameLength = 2 * bufferSize / nodesToCreate
)

format := fmt.Sprintf("%%0%dd", nameLength)
if name := fmt.Sprintf(format, 0); len(name) != nameLength {
// Sanity check that the generated format string creates strings of the right length
t.Fatalf("Length of generated name was not %d, got %d", nameLength, len(name))
}

expect := string(append(prefix, hb...))
if path, err := zk.Create(expect, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
var createdNodes []string
for i := 0; i < nodesToCreate; i++ {
name := fmt.Sprintf(format, i)
createdNodes = append(createdNodes, name)
path := testNode + "/" + name
if _, err := zk.Create(path, nil, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != expect {
t.Fatalf("Create returned different path '%s' != '%s'", path, expect)
}
defer deleteNode(string(expect))
defer deleteNode(path)
}

children, _, err := zk.Children("/gozk-test-big")
children, _, err := zk.Children(testNode)
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if len(children) != 10000 {
t.Fatal("Children returned wrong number of nodes")
}
sort.Strings(children)
if !reflect.DeepEqual(children, createdNodes) {
t.Fatal("Children did not return expected nodes")
}
}

Expand Down Expand Up @@ -765,10 +760,16 @@ func TestSetWatchers(t *testing.T) {
}
}()

// we create lots of paths to watch, to make sure a "set watches" request
// on re-create will be too big and be required to span multiple packets
for i := 0; i < 1000; i++ {
testPath, err := zk.Create(fmt.Sprintf("/gozk-test-%d", i), []byte{}, 0, WorldACL(PermAll))
// we create lots of long paths to watch, to make sure a "set watches" request on will be too big and be broken
// into multiple packets. The size is chosen such that each packet can hold exactly 2 watches, meaning we should
// see half as many packets as there are watches.
const (
watches = 50
watchedNodeNameFormat = "/gozk-test-%0450d"
)

for i := 0; i < watches; i++ {
testPath, err := zk.Create(fmt.Sprintf(watchedNodeNameFormat, i), []byte{}, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned: %+v", err)
}
Expand Down Expand Up @@ -852,9 +853,9 @@ func TestSetWatchers(t *testing.T) {
buf := make([]byte, bufferSize)
totalWatches := 0
actualReqs := setWatchReqs.Load().([]*setWatchesRequest)
if len(actualReqs) < 12 {
// sanity check: we should have generated *at least* 12 requests to reset watches
t.Fatalf("too few setWatchesRequest messages: %d", len(actualReqs))
if len(actualReqs) != watches/2 {
// sanity check: we should have generated exactly 25 requests to reset watches
t.Fatalf("Did not send exactly %d setWatches requests, got %d instead", watches/2, len(actualReqs))
}
for _, r := range actualReqs {
totalWatches += len(r.ChildWatches) + len(r.DataWatches) + len(r.ExistWatches)
Expand Down