From c65c3c40cd90361beb8a2da25d56b39fcd7422b5 Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Thu, 24 Oct 2024 11:18:47 -0400 Subject: [PATCH] [WIP] Eliminate deadlock between db.AssignShardSet and shard.FetchBlocksMetadataV2 --- src/dbnode/storage/namespace.go | 10 ++++++---- src/dbnode/storage/namespace_readers.go | 26 +++++++++---------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 36be46ebda..70a61165b3 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -567,9 +567,6 @@ func (n *dbNamespace) assignShardSet( if br := n.blockRetriever; br != nil { br.AssignShardSet(shardSet) } - if mgr := n.namespaceReaderMgr; mgr != nil { - mgr.assignShardSet(shardSet) - } n.Unlock() n.closeShards(closing, false) @@ -612,7 +609,12 @@ func (n *dbNamespace) closeShards(shards []databaseShard, blockUntilClosed bool) func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) error { // Allow the reader cache to tick. - n.namespaceReaderMgr.tick() + { + n.RLock() + shardSet := n.shardSet + n.RUnlock() + n.namespaceReaderMgr.tick(shardSet) + } // Fetch the owned shards. shards := n.OwnedShards() diff --git a/src/dbnode/storage/namespace_readers.go b/src/dbnode/storage/namespace_readers.go index cad8b1db32..c1684ab1e3 100644 --- a/src/dbnode/storage/namespace_readers.go +++ b/src/dbnode/storage/namespace_readers.go @@ -74,9 +74,7 @@ type databaseNamespaceReaderManager interface { latestVolume(shard uint32, blockStart xtime.UnixNano) (int, error) - assignShardSet(shardSet sharding.ShardSet) - - tick() + tick(activeShardSet sharding.ShardSet) close() } @@ -109,7 +107,6 @@ type namespaceReaderManager struct { closedReaders []cachedReader openReaders map[cachedOpenReaderKey]cachedReader - shardSet sharding.ShardSet metrics namespaceReaderManagerMetrics } @@ -167,7 +164,6 @@ func newNamespaceReaderManager( bytesPool: opts.BytesPool(), logger: opts.InstrumentOptions().Logger(), openReaders: make(map[cachedOpenReaderKey]cachedReader), - shardSet: sharding.NewEmptyShardSet(sharding.DefaultHashFn(1)), metrics: newNamespaceReaderManagerMetrics(namespaceScope), } @@ -205,14 +201,9 @@ func (m *namespaceReaderManager) filesetExistsAt( m.namespace.ID(), shard, blockStart, latestVolume) } -func (m *namespaceReaderManager) assignShardSet(shardSet sharding.ShardSet) { - m.Lock() - defer m.Unlock() - m.shardSet = shardSet -} +func shardExistsWithLock(shardSet sharding.ShardSet, shard uint32) bool { + _, err := shardSet.LookupStateByID(shard) -func (m *namespaceReaderManager) shardExistsWithLock(shard uint32) bool { - _, err := m.shardSet.LookupStateByID(shard) // NB(bodu): LookupStateByID returns ErrInvalidShardID when shard // does not exist in the shard map which means the shard is not available. return err == nil @@ -426,18 +417,19 @@ func (m *namespaceReaderManager) put(reader fs.DataFileSetReader) error { return nil } -func (m *namespaceReaderManager) tick() { - m.tickWithThreshold(expireCachedReadersAfterNumTicks) +func (m *namespaceReaderManager) tick(activeShardSet sharding.ShardSet) { + m.tickWithThreshold(activeShardSet, expireCachedReadersAfterNumTicks) } func (m *namespaceReaderManager) close() { m.blockLeaseManager.UnregisterLeaser(m) // Perform a tick but make the threshold zero so all readers must be expired - m.tickWithThreshold(0) + emptyShardSet := sharding.NewEmptyShardSet(sharding.DefaultHashFn(1)) + m.tickWithThreshold(emptyShardSet, 0) } -func (m *namespaceReaderManager) tickWithThreshold(threshold int) { +func (m *namespaceReaderManager) tickWithThreshold(activeShardSet sharding.ShardSet, threshold int) { m.Lock() defer m.Unlock() @@ -469,7 +461,7 @@ func (m *namespaceReaderManager) tickWithThreshold(threshold int) { // Also check to see if shard is still available and remove cached readers for // shards that are no longer available. This ensures cached readers are eventually // consistent with shard state. - !m.shardExistsWithLock(key.shard) { + !shardExistsWithLock(activeShardSet, key.shard) { // Close before removing ref if err := elem.reader.Close(); err != nil { m.logger.Error("error closing reader from reader cache", zap.Error(err))