From 91975b3167108cae10aa32b9d0f68cf75f41f8e5 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 14 May 2024 11:45:30 +0530 Subject: [PATCH 1/7] Fix deadlock in runMutation and error handling --- worker/draft.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/worker/draft.go b/worker/draft.go index 23b92099a57..da0d278aee3 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -549,12 +549,16 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr errCh <- process(m.Edges[start:end]) }(start, end) } + var errs error for i := 0; i < numGo; i++ { if err := <-errCh; err != nil { - return err + if errs == nil { + errs = errors.New("Got error while running mutation") + } + errs = errors.Wrapf(err, errs.Error()) } } - return nil + return errs } func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { From f68bdac3f0fda82802754515f7c2e3ef88356fa6 Mon Sep 17 00:00:00 2001 From: ShivajiKharse <115525374+shivaji-dgraph@users.noreply.github.com> Date: Tue, 14 May 2024 16:15:08 +0530 Subject: [PATCH 2/7] fix pending query (#9086) --- tok/hnsw/helper.go | 5 +++-- worker/draft.go | 4 +++- worker/mutation.go | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/tok/hnsw/helper.go b/tok/hnsw/helper.go index f42431a9143..033ef40a99d 100644 --- a/tok/hnsw/helper.go +++ b/tok/hnsw/helper.go @@ -433,9 +433,10 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes( err := ph.getVecFromUid(entry, c, vec) if err != nil || len(*vec) == 0 { // The entry vector has been deleted. We have to create a new entry vector. - entry, err := ph.PickStartNode(ctx, c, vec) + entry, err := ph.calculateNewEntryVec(ctx, c, vec) if err != nil { - return 0, []*index.KeyValue{}, err + // No other node exists, go with the new node that has come + return create_edges(inUuid) } return create_edges(entry) } diff --git a/worker/draft.go b/worker/draft.go index da0d278aee3..a3027024dec 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -843,7 +843,9 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { if txn == nil { return } - txn.Update() + if commit != 0 { + txn.Update() + } // We start with 20 ms, so that we end up waiting 5 mins by the end. // If there is any transient issue, it should get fixed within that timeframe. err := x.ExponentialRetry(int(x.Config.MaxRetries), diff --git a/worker/mutation.go b/worker/mutation.go index abae1515dde..7ebd3cb0deb 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -950,7 +950,9 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext, node := groups().Node err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m}) - fillTxnContext(txnCtx, m.StartTs) + if err == nil { + fillTxnContext(txnCtx, m.StartTs) + } return err } From 00f418341010291fa30d9f7d883ad5b7cfc9a387 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 15 May 2024 10:42:16 +0530 Subject: [PATCH 3/7] added a test for timeput --- query/vector/vector_test.go | 41 +++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/query/vector/vector_test.go b/query/vector/vector_test.go index 7ad9dab6091..37d49f40f83 100644 --- a/query/vector/vector_test.go +++ b/query/vector/vector_test.go @@ -29,6 +29,7 @@ import ( "github.com/dgraph-io/dgo/v230/protos/api" "github.com/dgraph-io/dgraph/dgraphtest" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/require" ) @@ -432,6 +433,46 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) { dropPredicate("vtest") } +func TestVectorDeadlockwithTimeout(t *testing.T) { + dc = dgraphtest.NewComposeCluster() + var cleanup func() + client, cleanup, err := dc.Client() + x.Panic(err) + defer cleanup() + + for i := 0; i < 5; i++ { + ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace) + require.NoError(t, err) + + err = client.Alter(context.Background(), &api.Operation{ + DropAttr: "vtest", + }) + dropPredicate("vtest") + setSchema(fmt.Sprintf(vectorSchemaWithIndex, "vtest", "4", "euclidian")) + numVectors := 1000 + vectorSize := 10 + + randomVectors, _ := generateRandomVectors(numVectors, vectorSize, "vtest") + + txn := client.NewTxn() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer func() { _ = txn.Discard(ctx) }() + defer cancel() + + _, err = txn.Mutate(ctx, &api.Mutation{ + SetNquads: []byte(randomVectors), + CommitNow: true, + }) + require.Error(t, err) + + err = txn.Commit(ctx) + require.Contains(t, err.Error(), "Transaction has already been committed or discarded") + } +} + func TestVectorMutateDiffrentLengthWithDiffrentIndexes(t *testing.T) { dropPredicate("vtest") From 7b27a87d6929adcf3a792dafb0bd554a3e1eb5cf Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 15 May 2024 11:00:09 +0530 Subject: [PATCH 4/7] Added comments --- posting/mvcc.go | 9 +++++++-- worker/draft.go | 4 ++++ worker/mutation.go | 9 ++++----- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index d228ad10f0d..c052422ff6e 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -235,7 +235,7 @@ func (txn *Txn) addConflictKey(conflictKey uint64) { } // FillContext updates the given transaction context with data from this transaction. -func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { +func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool) { txn.Lock() ctx.StartTs = txn.StartTs @@ -249,7 +249,12 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { ctx.Keys = x.Unique(ctx.Keys) txn.Unlock() - txn.Update() + // If the trasnaction has errored out, we don't need to update it, as these values will never be read. + // Sometimes, the transaction might have failed due to timeout. If we let this trasnactino update, there + // could be deadlock with the running transaction. + if !isErrored { + txn.Update() + } txn.cache.fillPreds(ctx, gid) } diff --git a/worker/draft.go b/worker/draft.go index a3027024dec..dd4230d6031 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -549,6 +549,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr errCh <- process(m.Edges[start:end]) }(start, end) } + // Earlier we were returning after even if one thread had an error. We should wait for + // all the transactions to finish. We call txn.Update() when this function exists. This could cause + // a deadlock with runMutation. var errs error for i := 0; i < numGo; i++ { if err := <-errCh; err != nil { @@ -843,6 +846,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { if txn == nil { return } + // If the transaction has failed, we dont need to update it. if commit != 0 { txn.Update() } diff --git a/worker/mutation.go b/worker/mutation.go index 7ebd3cb0deb..71b3db0cbc0 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -642,9 +642,9 @@ func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) { return c.Timestamps(ctx, num) } -func fillTxnContext(tctx *api.TxnContext, startTs uint64) { +func fillTxnContext(tctx *api.TxnContext, startTs uint64, isErrored bool) { if txn := posting.Oracle().GetTxn(startTs); txn != nil { - txn.FillContext(tctx, groups().groupId()) + txn.FillContext(tctx, groups().groupId(), isErrored) } // We do not need to fill linread mechanism anymore, because transaction // start ts is sufficient to wait for, to achieve lin reads. @@ -950,9 +950,8 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext, node := groups().Node err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m}) - if err == nil { - fillTxnContext(txnCtx, m.StartTs) - } + // When we are filling txn context, we don't need to update latest delta if the transaction has failed. + fillTxnContext(txnCtx, m.StartTs, err != nil) return err } From cf87a81dc221cacd6b43a1471875aae791f36071 Mon Sep 17 00:00:00 2001 From: shivaji-dgraph Date: Wed, 15 May 2024 12:01:29 +0530 Subject: [PATCH 5/7] add test for reindexing after incr restore --- systest/vector/backup_test.go | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/systest/vector/backup_test.go b/systest/vector/backup_test.go index f11eb8b75ff..f1706e44b0b 100644 --- a/systest/vector/backup_test.go +++ b/systest/vector/backup_test.go @@ -235,3 +235,54 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) { } } } + +func TestVectorBackupRestoreReIndexing(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + require.NoError(t, gc.SetupSchema(testSchema)) + + numVectors := 1000 + pred := "project_discription_v" + rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred) + + mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + t.Log("taking backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + rdfs2, vectors2 := dgraphtest.GenerateRandomVectors(numVectors, numVectors+300, 10, pred) + + mu = &api.Mutation{SetNquads: []byte(rdfs2), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + t.Log("restoring backup \n") + require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 2, 1)) + require.NoError(t, dgraphtest.WaitForRestore(c)) + + for i := 0; i < 5; i++ { + // drop index + require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex)) + // add index + require.NoError(t, gc.SetupSchema(testSchema)) + } + vectors = append(vectors, vectors2...) + rdfs = rdfs + rdfs2 + testVectorQuery(t, gc, vectors, rdfs, pred, numVectors) +} From 75fd608e49291057c4d00f17b85a74bad971d93c Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 15 May 2024 12:28:43 +0530 Subject: [PATCH 6/7] fixed test --- query/vector/vector_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query/vector/vector_test.go b/query/vector/vector_test.go index 37d49f40f83..d5caa998010 100644 --- a/query/vector/vector_test.go +++ b/query/vector/vector_test.go @@ -441,6 +441,7 @@ func TestVectorDeadlockwithTimeout(t *testing.T) { defer cleanup() for i := 0; i < 5; i++ { + fmt.Println("Testing iteration: ", i) ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) defer cancel2() err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser, @@ -458,7 +459,7 @@ func TestVectorDeadlockwithTimeout(t *testing.T) { randomVectors, _ := generateRandomVectors(numVectors, vectorSize, "vtest") txn := client.NewTxn() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer func() { _ = txn.Discard(ctx) }() defer cancel() From ce8d36c39e4eb1f266451e6c6abb564666286a4a Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 15 May 2024 13:30:19 +0530 Subject: [PATCH 7/7] fixed test --- query/vector/vector_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/query/vector/vector_test.go b/query/vector/vector_test.go index d5caa998010..2bd4554314e 100644 --- a/query/vector/vector_test.go +++ b/query/vector/vector_test.go @@ -434,6 +434,7 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) { } func TestVectorDeadlockwithTimeout(t *testing.T) { + pred := "vtest1" dc = dgraphtest.NewComposeCluster() var cleanup func() client, cleanup, err := dc.Client() @@ -449,14 +450,14 @@ func TestVectorDeadlockwithTimeout(t *testing.T) { require.NoError(t, err) err = client.Alter(context.Background(), &api.Operation{ - DropAttr: "vtest", + DropAttr: pred, }) - dropPredicate("vtest") - setSchema(fmt.Sprintf(vectorSchemaWithIndex, "vtest", "4", "euclidian")) + dropPredicate(pred) + setSchema(fmt.Sprintf(vectorSchemaWithIndex, pred, "4", "euclidian")) numVectors := 1000 vectorSize := 10 - randomVectors, _ := generateRandomVectors(numVectors, vectorSize, "vtest") + randomVectors, _ := generateRandomVectors(numVectors, vectorSize, pred) txn := client.NewTxn() ctx, cancel := context.WithTimeout(context.Background(), time.Second)