Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(live): Fix derigster while retrying #9121

Merged
merged 11 commits into from
Aug 9, 2024
Merged
18 changes: 17 additions & 1 deletion dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ func (l *loader) infinitelyRetry(req *request) {
if i >= 10*time.Second {
i = 10 * time.Second
}
l.deregister(req)
time.Sleep(i)
l.addConflictKeys(req)
}
}

Expand Down Expand Up @@ -306,6 +308,11 @@ func (l *loader) conflictKeysForNQuad(nq *api.NQuad) ([]uint64, error) {
return keys, nil
}

val := sid
if pred.Upsert {
val = 0
}

errs := make([]string, 0)
for _, tokName := range pred.Tokenizer {
token, ok := tok.GetTokenizer(tokName)
Expand All @@ -329,7 +336,7 @@ func (l *loader) conflictKeysForNQuad(nq *api.NQuad) ([]uint64, error) {
}

for _, t := range toks {
keys = append(keys, farm.Fingerprint64(x.IndexKey(attr, t))^sid)
keys = append(keys, farm.Fingerprint64(x.IndexKey(attr, t))^val)
}

}
Expand All @@ -354,6 +361,15 @@ func (l *loader) conflictKeysForReq(req *request) []uint64 {
return keys
}

//lint:ignore U1000 Ignore unused function temporarily for debugging
func (l *loader) print(req *request) {
m := make(map[string]struct{})
for _, i := range req.Set {
m[i.Predicate] = struct{}{}
}
fmt.Println(m)
}

func (l *loader) addConflictKeys(req *request) bool {
l.uidsLock.Lock()
defer l.uidsLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions graphql/e2e/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import (
)

var (
groupOneHTTP = testutil.ContainerAddr0("alpha1", 8080)
groupTwoHTTP = testutil.ContainerAddr0("alpha2", 8080)
groupThreeHTTP = testutil.ContainerAddr0("alpha3", 8080)
groupOneHTTP = testutil.ContainerAddr("alpha1", 8080)
groupTwoHTTP = testutil.ContainerAddr("alpha2", 8080)
groupThreeHTTP = testutil.ContainerAddr("alpha3", 8080)
groupOnegRPC = testutil.SockAddr

groupOneGraphQLServer = "http://" + groupOneHTTP + "/graphql"
Expand Down
10 changes: 5 additions & 5 deletions systest/loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLoaderXidmap(t *testing.T) {
// internal-port
true))

dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
require.NoError(t, err)
ctx := context.Background()
testutil.DropAll(t, dg)
Expand Down Expand Up @@ -71,8 +71,8 @@ func TestLoaderXidmap(t *testing.T) {
err = testutil.ExecWithOpts([]string{testutil.DgraphBinaryPath(), "live",
"--tls", tlsFlag,
"--files", data,
"--alpha", testutil.SockAddr,
"--zero", testutil.SockAddrZero,
"--alpha", testutil.SockAddrLocalhost,
"--zero", testutil.SockAddrZeroLocalhost,
"-x", "x"}, testutil.CmdOpts{Dir: tmpDir})
require.NoError(t, err)

Expand All @@ -82,8 +82,8 @@ func TestLoaderXidmap(t *testing.T) {
err = testutil.ExecWithOpts([]string{testutil.DgraphBinaryPath(), "live",
"--tls", tlsFlag,
"--files", data,
"--alpha", testutil.SockAddr,
"--zero", testutil.SockAddrZero,
"--alpha", testutil.SockAddrLocalhost,
"--zero", testutil.SockAddrZeroLocalhost,
"-x", "x"}, testutil.CmdOpts{Dir: tmpDir})
require.NoError(t, err)

Expand Down
11 changes: 9 additions & 2 deletions testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ var (
TestDataDirectory string
Instance string
MinioInstance string
// SockAddr is the address to the gRPC endpoint of the alpha used during tests with localhost
SockAddrLocalhost string
// SockAddr is the address to the gRPC endpoint of the alpha used during tests.
SockAddr string
// SockAddrHttp is the address to the HTTP of alpha used during tests.
SockAddrHttp string
SockAddrHttp string
SockAddrHttpLocalhost string
// SockAddrZero is the address to the gRPC endpoint of the zero used during tests.
SockAddrZero string
// SockAddrZeroHttp is the address to the HTTP endpoint of the zero used during tests.
SockAddrZeroHttp string
SockAddrZeroHttp string
SockAddrZeroLocalhost string

// SockAddrAlpha4 is the address to the gRPC endpoint of the alpha4 used during restore tests.
SockAddrAlpha4 string
Expand Down Expand Up @@ -104,10 +108,13 @@ func init() {
TestDataDirectory = os.Getenv("TEST_DATA_DIRECTORY")
MinioInstance = ContainerAddr("minio", 9001)
Instance = fmt.Sprintf("%s_%s_1", DockerPrefix, "alpha1")
SockAddrLocalhost = ContainerAddrLocalhost("alpha1", 9080)
SockAddr = ContainerAddr("alpha1", 9080)
SockAddrHttp = ContainerAddr("alpha1", 8080)
SockAddrHttpLocalhost = ContainerAddrLocalhost("alpha1", 8080)

SockAddrZero = ContainerAddr("zero1", 5080)
SockAddrZeroLocalhost = ContainerAddrLocalhost("zero1", 5080)
SockAddrZeroHttp = ContainerAddr("zero1", 6080)

SockAddrAlpha4 = ContainerAddr("alpha4", 9080)
Expand Down
56 changes: 35 additions & 21 deletions testutil/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,40 @@ func (in ContainerInstance) BestEffortWaitForHealthy(privatePort uint16) error {
return nil
}

for i := 0; i < 60; i++ {
resp, err := http.Get("http://localhost:" + port + "/health")
var body []byte
if resp != nil && resp.Body != nil {
body, _ = io.ReadAll(resp.Body)
_ = resp.Body.Close()
}
if err == nil && resp.StatusCode == http.StatusOK {
if aerr := checkACL(body); aerr == nil {
return nil
} else {
fmt.Printf("waiting for login to work: %v\n", aerr)
time.Sleep(time.Second)
continue
tryWith := func(host string) error {
for i := 0; i < 60; i++ {
resp, err := http.Get("http://" + host + ":" + port + "/health")
var body []byte
if resp != nil && resp.Body != nil {
body, _ = io.ReadAll(resp.Body)
_ = resp.Body.Close()
}
if err == nil && resp.StatusCode == http.StatusOK {
if aerr := checkACL(body); aerr == nil {
return nil
} else {
fmt.Printf("waiting for login to work: %v\n", aerr)
time.Sleep(time.Second)
continue
}
}
fmt.Printf("Health for %s failed: %v. Response: %q. Retrying...\n", in, err, body)
time.Sleep(time.Second)
}
fmt.Printf("Health for %s failed: %v. Response: %q. Retrying...\n", in, err, body)
time.Sleep(time.Second)
return fmt.Errorf("did not pass health check on %s", "http://"+host+":"+port+"/health\n")
}
return fmt.Errorf("did not pass health check on %s", "http://localhost:"+port+"/health\n")

err := tryWith("0.0.0.0")
if err == nil {
return nil
}

err = tryWith("localhost")
if err == nil {
return nil
}

return err
}

func (in ContainerInstance) publicPort(privatePort uint16) string {
Expand All @@ -120,7 +134,7 @@ func (in ContainerInstance) login() error {
}

_, err := HttpLogin(&LoginParams{
Endpoint: "http://localhost:" + addr + "/admin",
Endpoint: "http://0.0.0.0:" + addr + "/admin",
UserID: "groot",
Passwd: "password",
})
Expand Down Expand Up @@ -218,12 +232,12 @@ func ContainerAddrWithHost(name string, privatePort uint16, host string) string
return host + ":" + strconv.Itoa(int(privatePort))
}

func ContainerAddr0(name string, privatePort uint16) string {
return ContainerAddrWithHost(name, privatePort, "0.0.0.0")
func ContainerAddrLocalhost(name string, privatePort uint16) string {
return ContainerAddrWithHost(name, privatePort, "localhost")
}

func ContainerAddr(name string, privatePort uint16) string {
return ContainerAddrWithHost(name, privatePort, "localhost")
return ContainerAddrWithHost(name, privatePort, "0.0.0.0")
}

// DockerStart starts the specified services.
Expand Down
4 changes: 2 additions & 2 deletions tlstest/certrequest/certrequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestAccessWithCaCert(t *testing.T) {
func TestCurlAccessWithCaCert(t *testing.T) {
// curl over plaintext should fail
curlPlainTextArgs := []string{
"https://" + testutil.SockAddrHttp + "/alter",
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlPlainTextArgs, &testutil.CurlFailureConfig{
Expand All @@ -59,7 +59,7 @@ func TestCurlAccessWithCaCert(t *testing.T) {
})

curlArgs := []string{
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
Expand Down
10 changes: 5 additions & 5 deletions tlstest/certrequireandverify/certrequireandverify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAccessWithoutClientCert(t *testing.T) {
// server-name
"node"))

dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
require.NoError(t, err, "Unable to get dgraph client: %v", err)
require.Error(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
}
Expand All @@ -53,7 +53,7 @@ func TestAccessWithClientCert(t *testing.T) {

func TestCurlAccessWithoutClientCert(t *testing.T) {
curlArgs := []string{
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
Expand All @@ -67,7 +67,7 @@ func TestCurlAccessWithClientCert(t *testing.T) {
"--cacert", "../tls/ca.crt",
"--cert", "../tls/client.acl.crt",
"--key", "../tls/client.acl.key",
"https://" + testutil.SockAddrHttp + "/alter",
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestGQLAdminHealthWithClientCert(t *testing.T) {
}

healthCheckQuery := []byte(`{"query":"query {\n health {\n status\n }\n}"}`)
gqlAdminEndpoint := "https://" + testutil.SockAddrHttp + "/admin"
gqlAdminEndpoint := "https://" + testutil.SockAddrHttpLocalhost + "/admin"
req, err := http.NewRequest("POST", gqlAdminEndpoint, bytes.NewBuffer(healthCheckQuery))
require.NoError(t, err, "Failed to create request : %v", err)
req.Header.Set("Content-Type", "application/json")
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestGQLAdminHealthWithoutClientCert(t *testing.T) {
}

healthCheckQuery := []byte(`{"query":"query {\n health {\n message\n status\n }\n}"}`)
gqlAdminEndpoint := "https://" + testutil.SockAddrHttp + "/admin"
gqlAdminEndpoint := "https://" + testutil.SockAddrHttpLocalhost + "/admin"
req, err := http.NewRequest("POST", gqlAdminEndpoint, bytes.NewBuffer(healthCheckQuery))
require.NoError(t, err, "Failed to create request : %v", err)
req.Header.Set("Content-Type", "application/json")
Expand Down
8 changes: 4 additions & 4 deletions tlstest/certverifyifgiven/certverifyifgiven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestAccessWithoutClientCert(t *testing.T) {
// server-name
"node"))

dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
require.NoError(t, err, "Unable to get dgraph client: %v", err)
require.NoError(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
}
Expand All @@ -39,14 +39,14 @@ func TestAccessWithClientCert(t *testing.T) {
// client-key
"../tls/client.acl.key"))

dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
require.NoError(t, err, "Unable to get dgraph client: %v", err)
require.NoError(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
}

func TestCurlAccessWithoutClientCert(t *testing.T) {
curlArgs := []string{
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
Expand All @@ -59,7 +59,7 @@ func TestCurlAccessWithClientCert(t *testing.T) {
"--cacert", "../tls/ca.crt",
"--cert", "../tls/client.acl.crt",
"--key", "../tls/client.acl.key",
"https://" + testutil.SockAddrHttp + "/alter",
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
"-d", "name: string @index(exact) .",
}
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
Expand Down