Skip to content

Commit

Permalink
DOSE-1070 [Backport of DOSE-1020 to 6.0.13.0] I/O hang waiting on REA…
Browse files Browse the repository at this point in the history
…D to complete (openzfs#246)

If there are 2 concurrent calls to ObjectAccess::get_object_cached() for
the same object, the 2nd caller will wait for the first to complete.  If
the GET fails (e.g. due to object not existing), we will drop the
watch_once::Sender before calling send(), so the 2nd caller's recv()
will fail, causing it to retry.

However, when the 1st caller's GET failed, it neglected to remove its
entry from the ObjectCache::reading.  When the 2nd caller retries, it
will again find reader (indicating that a GET is supposed to be in
progress) and wait for it, but since the Sender was dropped, its recv()
will fail, triggering another retry.

The fix is to always remove the entry from ObjectCache::reading when the
GET completes, whether or not it succeeded.

Bonus cleanup: restructure this code so that there's only one code path
that needs to remove the entry from ObjectCache::reading.
  • Loading branch information
ahrens authored Feb 25, 2022
1 parent c36ef95 commit e35b890
Showing 1 changed file with 25 additions and 27 deletions.
52 changes: 25 additions & 27 deletions cmd/zfs_object_agent/zettaobject/src/object_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,34 +630,32 @@ impl ObjectAccess {
let (tx, rx) = watch_once::channel::<Bytes>();
c.reading.insert(key.clone(), (true, rx));
Either::Left(async move {
let bytes =
match self.get_object_from_s3(key.clone(), stat_type, None).await {
Ok(bytes) => bytes,
Err(e) => return Some(Err(e)),
};

// This GET may have been marked non-cacheable by invalidate_cache().
// In that case, value has been changed by a concurrent PUT, but
// since we initiated our GET before the PUT, the old value is
// sufficient for us. But we don't want other GET's to see the
// potentially-old value that we got, so we don't add it to the cache
// or send it to other waiting GET's.
let cacheable = {
let mut myc = CACHE.lock().unwrap();
let (cacheable, _) = myc.reading.remove(&key).unwrap();
if cacheable {
myc.cache.put(key, bytes.clone());
let result =
self.get_object_from_s3(key.clone(), stat_type, None).await;

// We need to remove the `reading` entry regardless of the result.
let mut myc = CACHE.lock().unwrap();
let (cacheable, _) = myc.reading.remove(&key).unwrap();
match result {
Ok(bytes) => {
// This GET may have been marked non-cacheable by
// invalidate_cache(). In that case, the object's contents
// have been changed by a concurrent PUT, but since we
// initiated our GET before the PUT, the old value is
// sufficient for us. But we don't want other GET's (which
// may have been initiated after the PUT completed) to see
// the potentially-old value that we got, so we don't add it
// to the cache or send it to other waiting GET's.
if cacheable {
myc.cache.put(key, bytes.clone());
// We removed and dropped the rx, so there may be no more
// receivers, so we can't unwrap().
tx.send(bytes.clone()).ok();
}
Some(Ok(bytes))
}
cacheable
};

if cacheable {
// We removed and dropped the rx, so there may be no more
// receivers, so we can't unwrap().
tx.send(bytes.clone()).ok();
Err(e) => Some(Err(e)),
}

Some(Ok(bytes))
})
}
// If the in-progress GET is not cacheable, it won't send us the value.
Expand All @@ -671,7 +669,7 @@ impl ObjectAccess {
Ok(bytes) => Some(Ok(bytes)),
// Sender doesn't have a value for us. The caller will retry.
Err(_) => {
trace!("{}: waited for failed GET, retrying", key);
debug!("{}: waited for failed GET, retrying", key);
None
}
}
Expand Down

0 comments on commit e35b890

Please sign in to comment.