Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Eliminate deadlock between db.AssignShardSet and shard.FetchBlocksMetadataV2 #4309

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,6 @@ func (n *dbNamespace) assignShardSet(
if br := n.blockRetriever; br != nil {
br.AssignShardSet(shardSet)
}
if mgr := n.namespaceReaderMgr; mgr != nil {
mgr.assignShardSet(shardSet)
}

n.Unlock()
n.closeShards(closing, false)
Expand Down Expand Up @@ -612,7 +609,12 @@ func (n *dbNamespace) closeShards(shards []databaseShard, blockUntilClosed bool)

func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) error {
// Allow the reader cache to tick.
n.namespaceReaderMgr.tick()
{
n.RLock()
shardSet := n.shardSet
n.RUnlock()
n.namespaceReaderMgr.tick(shardSet)
}

// Fetch the owned shards.
shards := n.OwnedShards()
Expand Down
26 changes: 9 additions & 17 deletions src/dbnode/storage/namespace_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ type databaseNamespaceReaderManager interface {

latestVolume(shard uint32, blockStart xtime.UnixNano) (int, error)

assignShardSet(shardSet sharding.ShardSet)

tick()
tick(activeShardSet sharding.ShardSet)

close()
}
Expand Down Expand Up @@ -109,7 +107,6 @@ type namespaceReaderManager struct {

closedReaders []cachedReader
openReaders map[cachedOpenReaderKey]cachedReader
shardSet sharding.ShardSet

metrics namespaceReaderManagerMetrics
}
Expand Down Expand Up @@ -167,7 +164,6 @@ func newNamespaceReaderManager(
bytesPool: opts.BytesPool(),
logger: opts.InstrumentOptions().Logger(),
openReaders: make(map[cachedOpenReaderKey]cachedReader),
shardSet: sharding.NewEmptyShardSet(sharding.DefaultHashFn(1)),
metrics: newNamespaceReaderManagerMetrics(namespaceScope),
}

Expand Down Expand Up @@ -205,14 +201,9 @@ func (m *namespaceReaderManager) filesetExistsAt(
m.namespace.ID(), shard, blockStart, latestVolume)
}

func (m *namespaceReaderManager) assignShardSet(shardSet sharding.ShardSet) {
m.Lock()
defer m.Unlock()
m.shardSet = shardSet
}
func shardExistsWithLock(shardSet sharding.ShardSet, shard uint32) bool {
_, err := shardSet.LookupStateByID(shard)

func (m *namespaceReaderManager) shardExistsWithLock(shard uint32) bool {
_, err := m.shardSet.LookupStateByID(shard)
// NB(bodu): LookupStateByID returns ErrInvalidShardID when shard
// does not exist in the shard map which means the shard is not available.
return err == nil
Expand Down Expand Up @@ -426,18 +417,19 @@ func (m *namespaceReaderManager) put(reader fs.DataFileSetReader) error {
return nil
}

func (m *namespaceReaderManager) tick() {
m.tickWithThreshold(expireCachedReadersAfterNumTicks)
func (m *namespaceReaderManager) tick(activeShardSet sharding.ShardSet) {
m.tickWithThreshold(activeShardSet, expireCachedReadersAfterNumTicks)
}

func (m *namespaceReaderManager) close() {
m.blockLeaseManager.UnregisterLeaser(m)

// Perform a tick but make the threshold zero so all readers must be expired
m.tickWithThreshold(0)
emptyShardSet := sharding.NewEmptyShardSet(sharding.DefaultHashFn(1))
m.tickWithThreshold(emptyShardSet, 0)
}

func (m *namespaceReaderManager) tickWithThreshold(threshold int) {
func (m *namespaceReaderManager) tickWithThreshold(activeShardSet sharding.ShardSet, threshold int) {
m.Lock()
defer m.Unlock()

Expand Down Expand Up @@ -469,7 +461,7 @@ func (m *namespaceReaderManager) tickWithThreshold(threshold int) {
// Also check to see if shard is still available and remove cached readers for
// shards that are no longer available. This ensures cached readers are eventually
// consistent with shard state.
!m.shardExistsWithLock(key.shard) {
!shardExistsWithLock(activeShardSet, key.shard) {
// Close before removing ref
if err := elem.reader.Close(); err != nil {
m.logger.Error("error closing reader from reader cache", zap.Error(err))
Expand Down