Skip to content

Commit d3d9c5b

Browse files
fix(core): fix deadlock in runMutation and txn.Update() (#9085)
We call txn.Update() once the transaction has finished. This moves all the uncommited posting lists to a delta map in the txn. This can cause a deadlock with runMutation. Mostly we have seen it when a transaction has a timeout. RunMutation keeps on going, while txn.Update() gets triggered. This PR fixes it by not calling txn.Update() if the transaction has failed. We would futher look into cancelling runMutation at that time too. Fixes: https://linear.app/hypermode/issue/DGR-477/dgraph-v24-alpha-2-hangs --------- Co-authored-by: ShivajiKharse <115525374+shivaji-dgraph@users.noreply.github.com> Co-authored-by: shivaji-dgraph <shivaji@dgraph.io>
1 parent c437860 commit d3d9c5b

File tree

6 files changed

+121
-10
lines changed

6 files changed

+121
-10
lines changed

posting/mvcc.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (txn *Txn) addConflictKey(conflictKey uint64) {
235235
}
236236

237237
// FillContext updates the given transaction context with data from this transaction.
238-
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
238+
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool) {
239239
txn.Lock()
240240
ctx.StartTs = txn.StartTs
241241

@@ -249,7 +249,12 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
249249
ctx.Keys = x.Unique(ctx.Keys)
250250

251251
txn.Unlock()
252-
txn.Update()
252+
// If the trasnaction has errored out, we don't need to update it, as these values will never be read.
253+
// Sometimes, the transaction might have failed due to timeout. If we let this trasnactino update, there
254+
// could be deadlock with the running transaction.
255+
if !isErrored {
256+
txn.Update()
257+
}
253258
txn.cache.fillPreds(ctx, gid)
254259
}
255260

query/vector/vector_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/dgraph-io/dgo/v230/protos/api"
3131
"github.com/dgraph-io/dgraph/dgraphtest"
32+
"github.com/dgraph-io/dgraph/x"
3233
"github.com/stretchr/testify/require"
3334
)
3435

@@ -432,6 +433,48 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) {
432433
dropPredicate("vtest")
433434
}
434435

436+
func TestVectorDeadlockwithTimeout(t *testing.T) {
437+
pred := "vtest1"
438+
dc = dgraphtest.NewComposeCluster()
439+
var cleanup func()
440+
client, cleanup, err := dc.Client()
441+
x.Panic(err)
442+
defer cleanup()
443+
444+
for i := 0; i < 5; i++ {
445+
fmt.Println("Testing iteration: ", i)
446+
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
447+
defer cancel2()
448+
err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser,
449+
dgraphtest.DefaultPassword, x.GalaxyNamespace)
450+
require.NoError(t, err)
451+
452+
err = client.Alter(context.Background(), &api.Operation{
453+
DropAttr: pred,
454+
})
455+
dropPredicate(pred)
456+
setSchema(fmt.Sprintf(vectorSchemaWithIndex, pred, "4", "euclidian"))
457+
numVectors := 1000
458+
vectorSize := 10
459+
460+
randomVectors, _ := generateRandomVectors(numVectors, vectorSize, pred)
461+
462+
txn := client.NewTxn()
463+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
464+
defer func() { _ = txn.Discard(ctx) }()
465+
defer cancel()
466+
467+
_, err = txn.Mutate(ctx, &api.Mutation{
468+
SetNquads: []byte(randomVectors),
469+
CommitNow: true,
470+
})
471+
require.Error(t, err)
472+
473+
err = txn.Commit(ctx)
474+
require.Contains(t, err.Error(), "Transaction has already been committed or discarded")
475+
}
476+
}
477+
435478
func TestVectorMutateDiffrentLengthWithDiffrentIndexes(t *testing.T) {
436479
dropPredicate("vtest")
437480

systest/vector/backup_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,54 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) {
235235
}
236236
}
237237
}
238+
239+
func TestVectorBackupRestoreReIndexing(t *testing.T) {
240+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
241+
c, err := dgraphtest.NewLocalCluster(conf)
242+
require.NoError(t, err)
243+
defer func() { c.Cleanup(t.Failed()) }()
244+
require.NoError(t, c.Start())
245+
246+
gc, cleanup, err := c.Client()
247+
require.NoError(t, err)
248+
defer cleanup()
249+
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
250+
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))
251+
252+
hc, err := c.HTTPClient()
253+
require.NoError(t, err)
254+
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
255+
dgraphtest.DefaultPassword, x.GalaxyNamespace))
256+
257+
require.NoError(t, gc.SetupSchema(testSchema))
258+
259+
numVectors := 1000
260+
pred := "project_discription_v"
261+
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred)
262+
263+
mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
264+
_, err = gc.Mutate(mu)
265+
require.NoError(t, err)
266+
267+
t.Log("taking backup \n")
268+
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))
269+
270+
rdfs2, vectors2 := dgraphtest.GenerateRandomVectors(numVectors, numVectors+300, 10, pred)
271+
272+
mu = &api.Mutation{SetNquads: []byte(rdfs2), CommitNow: true}
273+
_, err = gc.Mutate(mu)
274+
require.NoError(t, err)
275+
t.Log("restoring backup \n")
276+
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 2, 1))
277+
require.NoError(t, dgraphtest.WaitForRestore(c))
278+
279+
for i := 0; i < 5; i++ {
280+
// drop index
281+
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))
282+
// add index
283+
require.NoError(t, gc.SetupSchema(testSchema))
284+
}
285+
vectors = append(vectors, vectors2...)
286+
rdfs = rdfs + rdfs2
287+
testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
288+
}

tok/hnsw/helper.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,10 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes(
433433
err := ph.getVecFromUid(entry, c, vec)
434434
if err != nil || len(*vec) == 0 {
435435
// The entry vector has been deleted. We have to create a new entry vector.
436-
entry, err := ph.PickStartNode(ctx, c, vec)
436+
entry, err := ph.calculateNewEntryVec(ctx, c, vec)
437437
if err != nil {
438-
return 0, []*index.KeyValue{}, err
438+
// No other node exists, go with the new node that has come
439+
return create_edges(inUuid)
439440
}
440441
return create_edges(entry)
441442
}

worker/draft.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -549,12 +549,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
549549
errCh <- process(m.Edges[start:end])
550550
}(start, end)
551551
}
552+
// Earlier we were returning after even if one thread had an error. We should wait for
553+
// all the transactions to finish. We call txn.Update() when this function exists. This could cause
554+
// a deadlock with runMutation.
555+
var errs error
552556
for i := 0; i < numGo; i++ {
553557
if err := <-errCh; err != nil {
554-
return err
558+
if errs == nil {
559+
errs = errors.New("Got error while running mutation")
560+
}
561+
errs = errors.Wrapf(err, errs.Error())
555562
}
556563
}
557-
return nil
564+
return errs
558565
}
559566

560567
func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
@@ -839,7 +846,10 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
839846
if txn == nil {
840847
return
841848
}
842-
txn.Update()
849+
// If the transaction has failed, we dont need to update it.
850+
if commit != 0 {
851+
txn.Update()
852+
}
843853
// We start with 20 ms, so that we end up waiting 5 mins by the end.
844854
// If there is any transient issue, it should get fixed within that timeframe.
845855
err := x.ExponentialRetry(int(x.Config.MaxRetries),

worker/mutation.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -642,9 +642,9 @@ func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
642642
return c.Timestamps(ctx, num)
643643
}
644644

645-
func fillTxnContext(tctx *api.TxnContext, startTs uint64) {
645+
func fillTxnContext(tctx *api.TxnContext, startTs uint64, isErrored bool) {
646646
if txn := posting.Oracle().GetTxn(startTs); txn != nil {
647-
txn.FillContext(tctx, groups().groupId())
647+
txn.FillContext(tctx, groups().groupId(), isErrored)
648648
}
649649
// We do not need to fill linread mechanism anymore, because transaction
650650
// start ts is sufficient to wait for, to achieve lin reads.
@@ -950,7 +950,8 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext,
950950

951951
node := groups().Node
952952
err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m})
953-
fillTxnContext(txnCtx, m.StartTs)
953+
// When we are filling txn context, we don't need to update latest delta if the transaction has failed.
954+
fillTxnContext(txnCtx, m.StartTs, err != nil)
954955
return err
955956
}
956957

0 commit comments

Comments
 (0)