From a568e78bb854d722b5fb30710a45b8d6baa0779c Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 3 Aug 2023 09:05:47 +0000 Subject: [PATCH 01/10] fix datarace with memqdb.go? --- qdb/memqdb.go | 49 ++++++++++++++++-------- qdb/memqdb_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 16 deletions(-) create mode 100644 qdb/memqdb_test.go diff --git a/qdb/memqdb.go b/qdb/memqdb.go index 0642f8d73..1bd7f02ef 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -12,6 +12,7 @@ import ( ) type MemQDB struct { + // TODO create more mutex per map if needed mu sync.RWMutex Locks map[string]*sync.RWMutex `json:"locks"` @@ -39,6 +40,14 @@ func NewMemQDB(backupPath string) (*MemQDB, error) { Dataspaces: map[string]*Dataspace{}, Routers: map[string]*Router{}, Transactions: map[string]*DataTransferTransaction{}, + Freq: map[string]bool{}, + Krs: map[string]*KeyRange{}, + Locks: map[string]*sync.RWMutex{}, + Shards: map[string]*Shard{}, + Shrules: map[string]*ShardingRule{}, + Dataspaces: map[string]*Dataspace{}, + Routers: map[string]*Router{}, + Transactions: map[string]*DataTransferTransaction{}, backupPath: backupPath, }, nil @@ -154,6 +163,8 @@ func (q *MemQDB) DropShardingRuleAll(ctx context.Context) ([]*ShardingRule, erro func (q *MemQDB) GetShardingRule(ctx context.Context, id string) (*ShardingRule, error) { spqrlog.Zero.Debug().Str("rule", id).Msg("memqdb: get sharding rule") + q.mu.RLock() + defer q.mu.RUnlock() rule, ok := q.Shrules[id] if ok { return rule, nil @@ -200,8 +211,8 @@ func (q *MemQDB) AddKeyRange(ctx context.Context, keyRange *KeyRange) error { func (q *MemQDB) GetKeyRange(ctx context.Context, id string) (*KeyRange, error) { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: get key range") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() krs, ok := q.Krs[id] if !ok { @@ -305,8 +316,8 @@ func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error { func (q *MemQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error) { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: check locked key range") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() krs, ok := q.Krs[id] if !ok { @@ -323,8 +334,16 @@ func (q *MemQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, func (q *MemQDB) ShareKeyRange(id string) error { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: sharing key with key") - q.Locks[id].RLock() - defer q.Locks[id].RUnlock() + q.mu.RLock() + defer q.mu.RUnlock() + + lock, ok := q.Locks[id] + if !ok { + return fmt.Errorf("no such key") + } + + lock.RLock() + defer lock.RUnlock() return nil } @@ -336,13 +355,12 @@ func (q *MemQDB) ShareKeyRange(id string) error { func (q *MemQDB) RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error { q.mu.Lock() defer q.mu.Unlock() - return ExecuteCommands(q.DumpState, NewUpdateCommand(q.Transactions, key, info)) } func (q *MemQDB) GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error) { - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() ans, ok := q.Transactions[key] if !ok { @@ -354,7 +372,6 @@ func (q *MemQDB) GetTransferTx(ctx context.Context, key string) (*DataTransferTr func (q *MemQDB) RemoveTransferTx(ctx context.Context, key string) error { q.mu.Lock() defer q.mu.Unlock() - return ExecuteCommands(q.DumpState, NewDeleteCommand(q.Transactions, key)) } @@ -380,8 +397,8 @@ func (q *MemQDB) DeleteRouter(ctx context.Context, id string) error { func (q *MemQDB) ListRouters(ctx context.Context) ([]*Router, error) { spqrlog.Zero.Debug().Msg("memqdb: list routers") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() var ret []*Router for _, v := range q.Routers { @@ -414,8 +431,8 @@ func (q *MemQDB) AddShard(ctx context.Context, shard *Shard) error { func (q *MemQDB) ListShards(ctx context.Context) ([]*Shard, error) { spqrlog.Zero.Debug().Msg("memqdb: list shards") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() var ret []*Shard for _, v := range q.Shards { @@ -432,8 +449,8 @@ func (q *MemQDB) ListShards(ctx context.Context) ([]*Shard, error) { func (q *MemQDB) GetShard(ctx context.Context, id string) (*Shard, error) { spqrlog.Zero.Debug().Str("shard", id).Msg("memqdb: get shard") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() if _, ok := q.Shards[id]; ok { return &Shard{ID: id}, nil diff --git a/qdb/memqdb_test.go b/qdb/memqdb_test.go new file mode 100644 index 000000000..bb10fb830 --- /dev/null +++ b/qdb/memqdb_test.go @@ -0,0 +1,94 @@ +package qdb_test + +import ( + "context" + "sync" + "testing" + + "github.com/pg-sharding/spqr/qdb" + "github.com/stretchr/testify/assert" +) + +const MemQDBPath = "memqdb.json" + +var mockDataspace *qdb.Dataspace = &qdb.Dataspace{"123"} +var mockShard *qdb.Shard = &qdb.Shard{ + ID: "shard_id", + Hosts: []string{"host1", "host2"}, +} +var mockKeyRange *qdb.KeyRange = &qdb.KeyRange{ + LowerBound: []byte{1, 2}, + UpperBound: []byte{3, 4}, + ShardID: mockShard.ID, + KeyRangeID: "key_range_id", +} +var mockRouter *qdb.Router = &qdb.Router{ + Address: "address", + ID: "router_id", + State: qdb.CLOSED, +} +var mockShardingRule *qdb.ShardingRule = &qdb.ShardingRule{ + ID: "sharding_rule_id", + TableName: "fake_table", + Entries: []qdb.ShardingRuleEntry{ + { + Column: "i", + }, + }, +} +var mockDataTransferTransaction *qdb.DataTransferTransaction = &qdb.DataTransferTransaction{ + ToShardId: mockShard.ID, + FromShardId: mockShard.ID, + FromTxName: "fake_tx_1", + ToTxName: "fake_tx_2", + FromStatus: "fake_st_1", + ToStatus: "fake_st_2", +} + +// must run with -race +func TestMemqdbRacing(t *testing.T) { + assert := assert.New(t) + + memqdb, err := qdb.RestoreQDB(MemQDBPath) + assert.NoError(err) + + var wg sync.WaitGroup + + ctx := context.TODO() + + methods := []func(){ + func() { memqdb.AddDataspace(ctx, mockDataspace) }, + func() { memqdb.AddKeyRange(ctx, mockKeyRange) }, + func() { memqdb.AddRouter(ctx, mockRouter) }, + func() { memqdb.AddShard(ctx, mockShard) }, + func() { memqdb.AddShardingRule(ctx, mockShardingRule) }, + func() { + memqdb.RecordTransferTx(ctx, mockDataTransferTransaction.FromShardId, mockDataTransferTransaction) + }, + func() { memqdb.ListDataspaces(ctx) }, + func() { memqdb.ListKeyRanges(ctx) }, + func() { memqdb.ListRouters(ctx) }, + func() { memqdb.ListShardingRules(ctx) }, + func() { memqdb.ListShards(ctx) }, + func() { memqdb.GetKeyRange(ctx, mockKeyRange.KeyRangeID) }, + func() { memqdb.GetShard(ctx, mockShard.ID) }, + func() { memqdb.GetShardingRule(ctx, mockShardingRule.ID) }, + func() { memqdb.GetTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + func() { memqdb.ShareKeyRange(mockKeyRange.KeyRangeID) }, + func() { memqdb.DropKeyRange(ctx, mockKeyRange.KeyRangeID) }, + func() { memqdb.DropKeyRangeAll(ctx) }, + func() { memqdb.DropShardingRule(ctx, mockShardingRule.ID) }, + func() { memqdb.DropShardingRuleAll(ctx) }, + func() { memqdb.RemoveTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + } + + for _, m := range methods { + wg.Add(1) + go func(m func()) { + m() + wg.Done() + }(m) + } + + wg.Wait() +} From b61892219232fe3139d0e1b4809750ccda40a163 Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 3 Aug 2023 09:52:39 +0000 Subject: [PATCH 02/10] fix linter + ci --- .github/workflows/tests.yaml | 2 +- Makefile | 3 ++ qdb/memqdb_test.go | 92 ++++++++++++++++++++++++------------ 3 files changed, 65 insertions(+), 32 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8a8a746c8..49713bddf 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -17,7 +17,7 @@ jobs: uses: actions/checkout@v2 - name: unit tests - run: make unittest + run: make unittest_with_race e2e: name: e2e diff --git a/Makefile b/Makefile index cd558e45c..f8221ff4d 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,9 @@ pooler_run: ####################### TESTS ####################### +unittest_with_race: + go test -race ./cmd/... ./pkg/... ./router/... ./qdb/... + unittest: go test ./cmd/... ./pkg/... ./router/... ./qdb/... diff --git a/qdb/memqdb_test.go b/qdb/memqdb_test.go index bb10fb830..ec47ca926 100644 --- a/qdb/memqdb_test.go +++ b/qdb/memqdb_test.go @@ -53,42 +53,72 @@ func TestMemqdbRacing(t *testing.T) { assert.NoError(err) var wg sync.WaitGroup - ctx := context.TODO() - methods := []func(){ - func() { memqdb.AddDataspace(ctx, mockDataspace) }, - func() { memqdb.AddKeyRange(ctx, mockKeyRange) }, - func() { memqdb.AddRouter(ctx, mockRouter) }, - func() { memqdb.AddShard(ctx, mockShard) }, - func() { memqdb.AddShardingRule(ctx, mockShardingRule) }, - func() { - memqdb.RecordTransferTx(ctx, mockDataTransferTransaction.FromShardId, mockDataTransferTransaction) + methods := []func() error{ + func() error { return memqdb.AddDataspace(ctx, mockDataspace) }, + func() error { return memqdb.AddKeyRange(ctx, mockKeyRange) }, + func() error { return memqdb.AddRouter(ctx, mockRouter) }, + func() error { return memqdb.AddShard(ctx, mockShard) }, + func() error { return memqdb.AddShardingRule(ctx, mockShardingRule) }, + func() error { + return memqdb.RecordTransferTx(ctx, mockDataTransferTransaction.FromShardId, mockDataTransferTransaction) + }, + func() error { + _, err_local := memqdb.ListDataspaces(ctx) + return err_local + }, + func() error { + _, err_local := memqdb.ListKeyRanges(ctx) + return err_local + }, + func() error { + _, err_local := memqdb.ListRouters(ctx) + return err_local + }, + func() error { + _, err_local := memqdb.ListShardingRules(ctx) + return err_local + }, + func() error { + _, err_local := memqdb.ListShards(ctx) + return err_local }, - func() { memqdb.ListDataspaces(ctx) }, - func() { memqdb.ListKeyRanges(ctx) }, - func() { memqdb.ListRouters(ctx) }, - func() { memqdb.ListShardingRules(ctx) }, - func() { memqdb.ListShards(ctx) }, - func() { memqdb.GetKeyRange(ctx, mockKeyRange.KeyRangeID) }, - func() { memqdb.GetShard(ctx, mockShard.ID) }, - func() { memqdb.GetShardingRule(ctx, mockShardingRule.ID) }, - func() { memqdb.GetTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, - func() { memqdb.ShareKeyRange(mockKeyRange.KeyRangeID) }, - func() { memqdb.DropKeyRange(ctx, mockKeyRange.KeyRangeID) }, - func() { memqdb.DropKeyRangeAll(ctx) }, - func() { memqdb.DropShardingRule(ctx, mockShardingRule.ID) }, - func() { memqdb.DropShardingRuleAll(ctx) }, - func() { memqdb.RemoveTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + func() error { + _, err_local := memqdb.GetKeyRange(ctx, mockKeyRange.KeyRangeID) + return err_local + }, + func() error { + _, err_local := memqdb.GetShard(ctx, mockShard.ID) + return err_local + }, + func() error { + _, err_local := memqdb.GetShardingRule(ctx, mockShardingRule.ID) + return err_local + }, + func() error { + _, err_local := memqdb.GetTransferTx(ctx, mockDataTransferTransaction.FromShardId) + return err_local + }, + func() error { return memqdb.ShareKeyRange(mockKeyRange.KeyRangeID) }, + func() error { return memqdb.DropKeyRange(ctx, mockKeyRange.KeyRangeID) }, + func() error { return memqdb.DropKeyRangeAll(ctx) }, + func() error { return memqdb.DropShardingRule(ctx, mockShardingRule.ID) }, + func() error { + _, err_local := memqdb.DropShardingRuleAll(ctx) + return err_local + }, + func() error { return memqdb.RemoveTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, } - for _, m := range methods { - wg.Add(1) - go func(m func()) { - m() - wg.Done() - }(m) + for i := 0; i < 10; i++ { + for _, m := range methods { + wg.Add(1) + go func(m func() error) { + _ = m() + wg.Done() + }(m) + } } - wg.Wait() } From b6e41d77a7edad0c2c1261076ecb1af2c1c2cd5c Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 3 Aug 2023 11:01:41 +0000 Subject: [PATCH 03/10] add missing methods --- qdb/memqdb.go | 24 +++++++++++-- qdb/memqdb_test.go | 88 +++++++++++++++++----------------------------- 2 files changed, 54 insertions(+), 58 deletions(-) diff --git a/qdb/memqdb.go b/qdb/memqdb.go index 1bd7f02ef..6b8f929a6 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -286,6 +286,7 @@ func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: lock key range") q.mu.Lock() defer q.mu.Unlock() + defer spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: exit: lock key range") krs, ok := q.Krs[id] if !ok { @@ -293,7 +294,15 @@ func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { } err := ExecuteCommands(q.DumpState, NewUpdateCommand(q.Freq, id, true), - NewCustomCommand(q.Locks[id].Lock, q.Locks[id].Unlock)) + NewCustomCommand(func() { + if lock, ok := q.Locks[id]; ok { + lock.Lock() + } + }, func() { + if lock, ok := q.Locks[id]; ok { + lock.Unlock() + } + })) if err != nil { return nil, err } @@ -302,16 +311,25 @@ func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { } func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error { - spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: lock key range") + spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: unlock key range") q.mu.Lock() defer q.mu.Unlock() + defer spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: exit: unlock key range") if !q.Freq[id] { return fmt.Errorf("key range %v not locked", id) } return ExecuteCommands(q.DumpState, NewUpdateCommand(q.Freq, id, false), - NewCustomCommand(q.Locks[id].Unlock, q.Locks[id].Lock)) + NewCustomCommand(func() { + if lock, ok := q.Locks[id]; ok { + lock.Unlock() + } + }, func() { + if lock, ok := q.Locks[id]; ok { + lock.Lock() + } + })) } func (q *MemQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error) { diff --git a/qdb/memqdb_test.go b/qdb/memqdb_test.go index ec47ca926..5b59001e0 100644 --- a/qdb/memqdb_test.go +++ b/qdb/memqdb_test.go @@ -55,70 +55,48 @@ func TestMemqdbRacing(t *testing.T) { var wg sync.WaitGroup ctx := context.TODO() - methods := []func() error{ - func() error { return memqdb.AddDataspace(ctx, mockDataspace) }, - func() error { return memqdb.AddKeyRange(ctx, mockKeyRange) }, - func() error { return memqdb.AddRouter(ctx, mockRouter) }, - func() error { return memqdb.AddShard(ctx, mockShard) }, - func() error { return memqdb.AddShardingRule(ctx, mockShardingRule) }, - func() error { - return memqdb.RecordTransferTx(ctx, mockDataTransferTransaction.FromShardId, mockDataTransferTransaction) + methods := []func(){ + func() { _ = memqdb.AddDataspace(ctx, mockDataspace) }, + func() { _ = memqdb.AddKeyRange(ctx, mockKeyRange) }, + func() { _ = memqdb.AddRouter(ctx, mockRouter) }, + func() { _ = memqdb.AddShard(ctx, mockShard) }, + func() { _ = memqdb.AddShardingRule(ctx, mockShardingRule) }, + func() { + _ = memqdb.RecordTransferTx(ctx, mockDataTransferTransaction.FromShardId, mockDataTransferTransaction) }, - func() error { - _, err_local := memqdb.ListDataspaces(ctx) - return err_local + func() { _, _ = memqdb.ListDataspaces(ctx) }, + func() { _, _ = memqdb.ListKeyRanges(ctx) }, + func() { _, _ = memqdb.ListRouters(ctx) }, + func() { _, _ = memqdb.ListShardingRules(ctx) }, + func() { _, _ = memqdb.ListShards(ctx) }, + func() { _, _ = memqdb.GetKeyRange(ctx, mockKeyRange.KeyRangeID) }, + func() { _, _ = memqdb.GetShard(ctx, mockShard.ID) }, + func() { _, _ = memqdb.GetShardingRule(ctx, mockShardingRule.ID) }, + func() { _, _ = memqdb.GetTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + func() { _ = memqdb.ShareKeyRange(mockKeyRange.KeyRangeID) }, + func() { _ = memqdb.DropKeyRange(ctx, mockKeyRange.KeyRangeID) }, + func() { _ = memqdb.DropKeyRangeAll(ctx) }, + func() { _ = memqdb.DropShardingRule(ctx, mockShardingRule.ID) }, + func() { _, _ = memqdb.DropShardingRuleAll(ctx) }, + func() { _ = memqdb.RemoveTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + func() { _ = memqdb.LockRouter(ctx, mockRouter.ID) }, + func() { + _, _ = memqdb.LockKeyRange(ctx, mockKeyRange.KeyRangeID) + _ = memqdb.UnlockKeyRange(ctx, mockKeyRange.KeyRangeID) }, - func() error { - _, err_local := memqdb.ListKeyRanges(ctx) - return err_local - }, - func() error { - _, err_local := memqdb.ListRouters(ctx) - return err_local - }, - func() error { - _, err_local := memqdb.ListShardingRules(ctx) - return err_local - }, - func() error { - _, err_local := memqdb.ListShards(ctx) - return err_local - }, - func() error { - _, err_local := memqdb.GetKeyRange(ctx, mockKeyRange.KeyRangeID) - return err_local - }, - func() error { - _, err_local := memqdb.GetShard(ctx, mockShard.ID) - return err_local - }, - func() error { - _, err_local := memqdb.GetShardingRule(ctx, mockShardingRule.ID) - return err_local - }, - func() error { - _, err_local := memqdb.GetTransferTx(ctx, mockDataTransferTransaction.FromShardId) - return err_local - }, - func() error { return memqdb.ShareKeyRange(mockKeyRange.KeyRangeID) }, - func() error { return memqdb.DropKeyRange(ctx, mockKeyRange.KeyRangeID) }, - func() error { return memqdb.DropKeyRangeAll(ctx) }, - func() error { return memqdb.DropShardingRule(ctx, mockShardingRule.ID) }, - func() error { - _, err_local := memqdb.DropShardingRuleAll(ctx) - return err_local - }, - func() error { return memqdb.RemoveTransferTx(ctx, mockDataTransferTransaction.FromShardId) }, + func() { _ = memqdb.UpdateKeyRange(ctx, mockKeyRange) }, + func() { _ = memqdb.DeleteRouter(ctx, mockRouter.ID) }, } - for i := 0; i < 10; i++ { for _, m := range methods { wg.Add(1) - go func(m func() error) { - _ = m() + go func(m func()) { + m() wg.Done() }(m) } + wg.Wait() + } wg.Wait() } From 95a83d403bae60d3f378c3c88e1f76e3a904152e Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Fri, 4 Aug 2023 12:32:02 +0000 Subject: [PATCH 04/10] resolve conflicts --- qdb/memqdb.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/qdb/memqdb.go b/qdb/memqdb.go index 6b8f929a6..0098a77e7 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -40,14 +40,6 @@ func NewMemQDB(backupPath string) (*MemQDB, error) { Dataspaces: map[string]*Dataspace{}, Routers: map[string]*Router{}, Transactions: map[string]*DataTransferTransaction{}, - Freq: map[string]bool{}, - Krs: map[string]*KeyRange{}, - Locks: map[string]*sync.RWMutex{}, - Shards: map[string]*Shard{}, - Shrules: map[string]*ShardingRule{}, - Dataspaces: map[string]*Dataspace{}, - Routers: map[string]*Router{}, - Transactions: map[string]*DataTransferTransaction{}, backupPath: backupPath, }, nil From 32471f6430b390494237415e8f26a8a910e9c579 Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Mon, 7 Aug 2023 08:43:52 +0000 Subject: [PATCH 05/10] fix deadlock: sharekey with unlockkey --- qdb/command.go | 3 +++ qdb/memqdb.go | 8 ++++---- test/feature/spqr_test.go | 1 + 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/qdb/command.go b/qdb/command.go index 3a073306c..da8039433 100644 --- a/qdb/command.go +++ b/qdb/command.go @@ -1,5 +1,7 @@ package qdb +import "github.com/pg-sharding/spqr/pkg/spqrlog" + type Command interface { Do() Undo() @@ -102,6 +104,7 @@ func ExecuteCommands(saver func() error, commands ...Command) error { } err := saver() if err != nil { + spqrlog.Zero.Info().Msg("memqdb: undo commands") for _, c := range commands { c.Undo() } diff --git a/qdb/memqdb.go b/qdb/memqdb.go index 0098a77e7..ca565f44f 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -276,8 +276,8 @@ func (q *MemQDB) ListKeyRanges(_ context.Context) ([]*KeyRange, error) { func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: lock key range") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() defer spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: exit: lock key range") krs, ok := q.Krs[id] @@ -304,8 +304,8 @@ func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: unlock key range") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + defer q.mu.RUnlock() defer spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: exit: unlock key range") if !q.Freq[id] { diff --git a/test/feature/spqr_test.go b/test/feature/spqr_test.go index 0468d05e3..3c753775b 100644 --- a/test/feature/spqr_test.go +++ b/test/feature/spqr_test.go @@ -288,6 +288,7 @@ func (tctx *testContext) queryPostgresql(host string, query string, args interfa tctx.commandRetcode = 0 if err != nil { tctx.commandRetcode = 1 + tctx.commandOutput = err.Error() tctx.sqlUserQueryError.Store(host, err.Error()) } tctx.sqlQueryResult = result From bdfa6b30454fe130b0b1df423b12cdafdfcf3f1e Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Mon, 7 Aug 2023 10:31:40 +0000 Subject: [PATCH 06/10] fix deadlock: dropkeyrange with unlockkeyrange --- pkg/coord/local/clocal.go | 9 +-- qdb/command.go | 53 +++++++++------ qdb/memqdb.go | 133 ++++++++++++++++++++++++++++++-------- qdb/memqdb_test.go | 2 +- 4 files changed, 142 insertions(+), 55 deletions(-) diff --git a/pkg/coord/local/clocal.go b/pkg/coord/local/clocal.go index 28c080355..e6b9db1ad 100644 --- a/pkg/coord/local/clocal.go +++ b/pkg/coord/local/clocal.go @@ -202,16 +202,9 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er }(qr.qdb, ctx, req.KeyRangeIDLeft) // TODO: krRight seems to be empty. - if krright, err = qr.qdb.LockKeyRange(ctx, req.KeyRangeIDRight); err != nil { + if krright, err = qr.qdb.GetKeyRange(ctx, req.KeyRangeIDRight); err != nil { return err } - defer func(qdb qdb.QDB, ctx context.Context, keyRangeID string) { - err := qdb.UnlockKeyRange(ctx, keyRangeID) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("") - return - } - }(qr.qdb, ctx, req.KeyRangeIDRight) if err = qr.qdb.DropKeyRange(ctx, krright.KeyRangeID); err != nil { return err diff --git a/qdb/command.go b/qdb/command.go index da8039433..b2aec49f7 100644 --- a/qdb/command.go +++ b/qdb/command.go @@ -3,8 +3,8 @@ package qdb import "github.com/pg-sharding/spqr/pkg/spqrlog" type Command interface { - Do() - Undo() + Do() error + Undo() error } func NewDeleteCommand[T any](m map[string]T, key string) *DeleteCommand[T] { @@ -18,17 +18,19 @@ type DeleteCommand[T any] struct { present bool } -func (c *DeleteCommand[T]) Do() { +func (c *DeleteCommand[T]) Do() error { c.value, c.present = c.m[c.key] delete(c.m, c.key) + return nil } -func (c *DeleteCommand[T]) Undo() { +func (c *DeleteCommand[T]) Undo() error { if !c.present { delete(c.m, c.key) } else { c.m[c.key] = c.value } + return nil } func NewUpdateCommand[T any](m map[string]T, key string, value T) *UpdateCommand[T] { @@ -43,17 +45,19 @@ type UpdateCommand[T any] struct { present bool } -func (c *UpdateCommand[T]) Do() { +func (c *UpdateCommand[T]) Do() error { c.prevValue, c.present = c.m[c.key] c.m[c.key] = c.value + return nil } -func (c *UpdateCommand[T]) Undo() { +func (c *UpdateCommand[T]) Undo() error { if !c.present { delete(c.m, c.key) } else { c.m[c.key] = c.prevValue } + return nil } func NewDropCommand[T any](m map[string]T) *DropCommand[T] { @@ -65,7 +69,7 @@ type DropCommand[T any] struct { copy map[string]T } -func (c *DropCommand[T]) Do() { +func (c *DropCommand[T]) Do() error { c.copy = make(map[string]T) for k, v := range c.m { c.copy[k] = v @@ -73,40 +77,49 @@ func (c *DropCommand[T]) Do() { for k := range c.m { delete(c.m, k) } + return nil } -func (c *DropCommand[T]) Undo() { +func (c *DropCommand[T]) Undo() error { for k, v := range c.copy { c.m[k] = v } + return nil } -func NewCustomCommand(do func(), undo func()) *CustomCommand { +func NewCustomCommand(do func() error, undo func() error) *CustomCommand { return &CustomCommand{do: do, undo: undo} } type CustomCommand struct { - do func() - undo func() + do func() error + undo func() error } -func (c *CustomCommand) Do() { - c.do() +func (c *CustomCommand) Do() error { + return c.do() } -func (c *CustomCommand) Undo() { - c.undo() +func (c *CustomCommand) Undo() error { + return c.undo() } func ExecuteCommands(saver func() error, commands ...Command) error { - for _, c := range commands { - c.Do() + firstError := len(commands) + var err error + for i, c := range commands { + err = c.Do() + if err != nil { + firstError = i + } + } + if err == nil { + err = saver() } - err := saver() if err != nil { spqrlog.Zero.Info().Msg("memqdb: undo commands") - for _, c := range commands { - c.Undo() + for _, c := range commands[:firstError] { + err = c.Undo() } return err } diff --git a/qdb/memqdb.go b/qdb/memqdb.go index ca565f44f..03256ed52 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -13,8 +13,10 @@ import ( type MemQDB struct { // TODO create more mutex per map if needed - mu sync.RWMutex + mu sync.RWMutex + muDeletedKrs sync.RWMutex + deletedKrs map[string]bool Locks map[string]*sync.RWMutex `json:"locks"` Freq map[string]bool `json:"freq"` Krs map[string]*KeyRange `json:"krs"` @@ -40,6 +42,7 @@ func NewMemQDB(backupPath string) (*MemQDB, error) { Dataspaces: map[string]*Dataspace{}, Routers: map[string]*Router{}, Transactions: map[string]*DataTransferTransaction{}, + deletedKrs: map[string]bool{}, backupPath: backupPath, }, nil @@ -224,8 +227,35 @@ func (q *MemQDB) UpdateKeyRange(ctx context.Context, keyRange *KeyRange) error { func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: drop key range") + + q.muDeletedKrs.Lock() + spqrlog.Zero.Debug().Str("key-range", id).Msg("mark to drop new locks on that key range") + if q.deletedKrs[id] { + q.muDeletedKrs.Unlock() + return fmt.Errorf("key range '%s' already deleted", id) + } + q.deletedKrs[id] = true + q.muDeletedKrs.Unlock() + + defer func() { + q.muDeletedKrs.Lock() + defer q.muDeletedKrs.Unlock() + spqrlog.Zero.Debug().Str("key-range", id).Msg("delete previous mark") + delete(q.deletedKrs, id) + }() + + q.mu.RLock() + + spqrlog.Zero.Debug().Str("key-range", id).Msg("Wait someone to unlock key-range") + if lock, ok := q.Locks[id]; ok { + lock.Lock() + defer lock.Unlock() + } + q.mu.RUnlock() + q.mu.Lock() defer q.mu.Unlock() + spqrlog.Zero.Debug().Str("key-range", id).Msg("delete key-range") return ExecuteCommands(q.DumpState, NewDeleteCommand(q.Krs, id), NewDeleteCommand(q.Freq, id), NewDeleteCommand(q.Locks, id)) @@ -233,27 +263,50 @@ func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { func (q *MemQDB) DropKeyRangeAll(ctx context.Context) error { spqrlog.Zero.Debug().Msg("memqdb: drop all key ranges") - q.mu.Lock() - defer q.mu.Unlock() + q.mu.RLock() + q.muDeletedKrs.Lock() + spqrlog.Zero.Debug().Msg("mark to drop new locks on all key ranges") + ids := make([]string, 0) + for id := range q.Locks { + if q.deletedKrs[id] { + q.muDeletedKrs.Unlock() + q.mu.RUnlock() + return fmt.Errorf("key range '%s' already deleted", id) + } + ids = append(ids, id) + q.deletedKrs[id] = true + } + q.muDeletedKrs.Unlock() + defer func() { + q.muDeletedKrs.Lock() + defer q.muDeletedKrs.Unlock() + spqrlog.Zero.Debug().Msg("delete previous marks") + + for _, id := range ids { + delete(q.deletedKrs, id) + } + }() + spqrlog.Zero.Debug().Msg("Wait someone to unlock key-ranges") var locks []*sync.RWMutex + for _, l := range q.Locks { + l.Lock() + locks = append(locks, l) + } + defer func() { + for _, l := range locks { + l.Unlock() + } + }() + spqrlog.Zero.Debug().Msg("memqdb: acquired all locks") - return ExecuteCommands(q.DumpState, - NewCustomCommand(func() { - for _, l := range q.Locks { - l.Lock() - locks = append(locks, l) - } - spqrlog.Zero.Debug().Msg("memqdb: acquired all locks") - }, func() {}), - NewDropCommand(q.Krs), NewDropCommand(q.Locks), - NewCustomCommand(func() { - for _, l := range locks { - l.Unlock() - } - }, - func() {}), - ) + q.mu.RUnlock() + + q.mu.Lock() + defer q.mu.Unlock() + spqrlog.Zero.Debug().Msg("delete key-ranges") + + return ExecuteCommands(q.DumpState, NewDropCommand(q.Krs), NewDropCommand(q.Locks)) } func (q *MemQDB) ListKeyRanges(_ context.Context) ([]*KeyRange, error) { @@ -274,6 +327,27 @@ func (q *MemQDB) ListKeyRanges(_ context.Context) ([]*KeyRange, error) { return ret, nil } +func (q *MemQDB) TryLockKeyRange(lock *sync.RWMutex, id string, read bool) error { + q.muDeletedKrs.RLock() + + if _, ok := q.deletedKrs[id]; ok { + q.muDeletedKrs.RUnlock() + return fmt.Errorf("key range '%s' deleted", id) + } + q.muDeletedKrs.RUnlock() + + if read { + lock.RLock() + } else { + lock.Lock() + } + + if _, ok := q.Krs[id]; !ok { + return fmt.Errorf("key range '%s' deleted after lock acuired", id) + } + return nil +} + func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: lock key range") q.mu.RLock() @@ -286,14 +360,16 @@ func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error) { } err := ExecuteCommands(q.DumpState, NewUpdateCommand(q.Freq, id, true), - NewCustomCommand(func() { + NewCustomCommand(func() error { if lock, ok := q.Locks[id]; ok { - lock.Lock() + return q.TryLockKeyRange(lock, id, false) } - }, func() { + return nil + }, func() error { if lock, ok := q.Locks[id]; ok { lock.Unlock() } + return nil })) if err != nil { return nil, err @@ -313,14 +389,16 @@ func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error { } return ExecuteCommands(q.DumpState, NewUpdateCommand(q.Freq, id, false), - NewCustomCommand(func() { + NewCustomCommand(func() error { if lock, ok := q.Locks[id]; ok { lock.Unlock() } - }, func() { + return nil + }, func() error { if lock, ok := q.Locks[id]; ok { - lock.Lock() + return q.TryLockKeyRange(lock, id, false) } + return nil })) } @@ -352,7 +430,10 @@ func (q *MemQDB) ShareKeyRange(id string) error { return fmt.Errorf("no such key") } - lock.RLock() + err := q.TryLockKeyRange(lock, id, true) + if err != nil { + return err + } defer lock.RUnlock() return nil diff --git a/qdb/memqdb_test.go b/qdb/memqdb_test.go index 5b59001e0..5e4451e65 100644 --- a/qdb/memqdb_test.go +++ b/qdb/memqdb_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -const MemQDBPath = "memqdb.json" +const MemQDBPath = "" var mockDataspace *qdb.Dataspace = &qdb.Dataspace{"123"} var mockShard *qdb.Shard = &qdb.Shard{ From 9b34b02fabf707c030764954dc8a68ee5e06ba4c Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Tue, 8 Aug 2023 13:41:50 +0000 Subject: [PATCH 07/10] fix Makefile --- .github/workflows/tests.yaml | 2 +- Makefile | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 49713bddf..8a8a746c8 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -17,7 +17,7 @@ jobs: uses: actions/checkout@v2 - name: unit tests - run: make unittest_with_race + run: make unittest e2e: name: e2e diff --git a/Makefile b/Makefile index f8221ff4d..d65793e77 100644 --- a/Makefile +++ b/Makefile @@ -67,11 +67,8 @@ pooler_run: ####################### TESTS ####################### -unittest_with_race: - go test -race ./cmd/... ./pkg/... ./router/... ./qdb/... - unittest: - go test ./cmd/... ./pkg/... ./router/... ./qdb/... + go test -race ./cmd/... ./pkg/... ./router/... ./qdb/... regress_local: proxy_2sh_run ./script/regress_local.sh From 78f225663bce6ad7fdce4eedcc12f32054aec196 Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 10 Aug 2023 06:48:36 +0000 Subject: [PATCH 08/10] refactor method --- qdb/command.go | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/qdb/command.go b/qdb/command.go index b2aec49f7..aa712f2c0 100644 --- a/qdb/command.go +++ b/qdb/command.go @@ -1,6 +1,10 @@ package qdb -import "github.com/pg-sharding/spqr/pkg/spqrlog" +import ( + "fmt" + + "github.com/pg-sharding/spqr/pkg/spqrlog" +) type Command interface { Do() error @@ -104,22 +108,36 @@ func (c *CustomCommand) Undo() error { return c.undo() } -func ExecuteCommands(saver func() error, commands ...Command) error { - firstError := len(commands) - var err error +func doCommands(commands ...Command) (int, error) { for i, c := range commands { - err = c.Do() + err := c.Do() if err != nil { - firstError = i + return i, err } } + return len(commands), nil +} + +func undoCommands(commands ...Command) error { + spqrlog.Zero.Info().Msg("memqdb: undo commands") + for _, c := range commands { + err := c.Undo() + if err != nil { + return err + } + } + return nil +} + +func ExecuteCommands(saver func() error, commands ...Command) error { + completed, err := doCommands(commands...) if err == nil { err = saver() } if err != nil { - spqrlog.Zero.Info().Msg("memqdb: undo commands") - for _, c := range commands[:firstError] { - err = c.Undo() + undoErr := undoCommands(commands[:completed]...) + if undoErr != nil { + return fmt.Errorf("Failed to undo command %s while: %s", undoErr.Error(), err.Error()) } return err } From 7090c9c7ff027024c752ab20304e3944c1ca1f6a Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 10 Aug 2023 06:48:58 +0000 Subject: [PATCH 09/10] fix comments --- qdb/memqdb.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/qdb/memqdb.go b/qdb/memqdb.go index d1af5dcdb..e38c982c8 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -228,8 +228,8 @@ func (q *MemQDB) UpdateKeyRange(ctx context.Context, keyRange *KeyRange) error { func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { spqrlog.Zero.Debug().Str("key-range", id).Msg("memqdb: drop key range") + // Do not allow new locks on key range we want to delete q.muDeletedKrs.Lock() - spqrlog.Zero.Debug().Str("key-range", id).Msg("mark to drop new locks on that key range") if q.deletedKrs[id] { q.muDeletedKrs.Unlock() return fmt.Errorf("key range '%s' already deleted", id) @@ -240,13 +240,12 @@ func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { defer func() { q.muDeletedKrs.Lock() defer q.muDeletedKrs.Unlock() - spqrlog.Zero.Debug().Str("key-range", id).Msg("delete previous mark") delete(q.deletedKrs, id) }() q.mu.RLock() - spqrlog.Zero.Debug().Str("key-range", id).Msg("Wait someone to unlock key-range") + // Wait until key range will be unlocked if lock, ok := q.Locks[id]; ok { lock.Lock() defer lock.Unlock() @@ -255,7 +254,6 @@ func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { q.mu.Lock() defer q.mu.Unlock() - spqrlog.Zero.Debug().Str("key-range", id).Msg("delete key-range") return ExecuteCommands(q.DumpState, NewDeleteCommand(q.Krs, id), NewDeleteCommand(q.Freq, id), NewDeleteCommand(q.Locks, id)) @@ -264,8 +262,9 @@ func (q *MemQDB) DropKeyRange(ctx context.Context, id string) error { func (q *MemQDB) DropKeyRangeAll(ctx context.Context) error { spqrlog.Zero.Debug().Msg("memqdb: drop all key ranges") q.mu.RLock() + + // Do not allow new locks on key range we want to delete q.muDeletedKrs.Lock() - spqrlog.Zero.Debug().Msg("mark to drop new locks on all key ranges") ids := make([]string, 0) for id := range q.Locks { if q.deletedKrs[id] { @@ -287,7 +286,7 @@ func (q *MemQDB) DropKeyRangeAll(ctx context.Context) error { } }() - spqrlog.Zero.Debug().Msg("Wait someone to unlock key-ranges") + // Wait until key range will be unlocked var locks []*sync.RWMutex for _, l := range q.Locks { l.Lock() @@ -304,7 +303,6 @@ func (q *MemQDB) DropKeyRangeAll(ctx context.Context) error { q.mu.Lock() defer q.mu.Unlock() - spqrlog.Zero.Debug().Msg("delete key-ranges") return ExecuteCommands(q.DumpState, NewDropCommand(q.Krs), NewDropCommand(q.Locks)) } From 58e6b4f943a15112a1cdde78067ff3549a66b225 Mon Sep 17 00:00:00 2001 From: Nikiforov Konstantin Date: Thu, 10 Aug 2023 13:59:26 +0000 Subject: [PATCH 10/10] fix error message --- qdb/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qdb/command.go b/qdb/command.go index aa712f2c0..df855174e 100644 --- a/qdb/command.go +++ b/qdb/command.go @@ -137,7 +137,7 @@ func ExecuteCommands(saver func() error, commands ...Command) error { if err != nil { undoErr := undoCommands(commands[:completed]...) if undoErr != nil { - return fmt.Errorf("Failed to undo command %s while: %s", undoErr.Error(), err.Error()) + return fmt.Errorf("failed to undo command %s while: %s", undoErr.Error(), err.Error()) } return err }