Skip to content

Commit

Permalink
Revert "refactor(VirtualFile::crashsafe_overwrite): avoid Handle::blo…
Browse files Browse the repository at this point in the history
…ck_on in callers" (#6765)

Reverts #6731

On high tenant count Pageservers in staging, memory and CPU usage shoots
to 100% with this change. (NB: staging currently has tokio-epoll-uring
enabled)

Will analyze tomorrow.


https://neondb.slack.com/archives/C03H1K0PGKH/p1707933875639379?thread_ts=1707929541.125329&cid=C03H1K0PGKH
  • Loading branch information
problame authored Feb 14, 2024
1 parent fff2468 commit 024372a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 89 deletions.
44 changes: 1 addition & 43 deletions libs/utils/src/crashsafe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
io,
};

use camino::{Utf8Path, Utf8PathBuf};
Expand Down Expand Up @@ -161,48 +161,6 @@ pub async fn durable_rename(
Ok(())
}

/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
///
/// The file is first written to the specified `tmp_path`, and in a second
/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
/// and atomic rename guarantee that, if we crash at any point, there will never
/// be a partially written file at `final_path` (but maybe at `tmp_path`).
///
/// Callers are responsible for serializing calls of this function for a given `final_path`.
/// If they don't, there may be an error due to conflicting `tmp_path`, or there will
/// be no error and the content of `final_path` will be the "winner" caller's `content`.
/// I.e., the atomticity guarantees still hold.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // don't keep the fd open for longer than we have to

std::fs::rename(tmp_path, final_path)?;

let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;

final_parent_dirfd.sync_all()?;
Ok(())
}

#[cfg(test)]
mod tests {

Expand Down
5 changes: 2 additions & 3 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;

Expand Down Expand Up @@ -325,8 +325,7 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);

let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");

VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
Expand Down
33 changes: 23 additions & 10 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -2877,10 +2878,17 @@ impl Tenant {

let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {config_path}")
})
})
})
.await??;

Ok(())
}
Expand All @@ -2907,12 +2915,17 @@ impl Tenant {

let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})?;
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
.await??;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())
Expand Down
11 changes: 8 additions & 3 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,14 @@ impl<'a> TenantDownloader<'a> {
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
let heatmap_path_bg = heatmap_path.clone();
VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes)
.await
.maybe_fatal_err(&context_msg)?;
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
.expect("Blocking task is never aborted")
.maybe_fatal_err(&context_msg)?;

tracing::debug!("Wrote local heatmap to {}", heatmap_path);

Expand Down
72 changes: 43 additions & 29 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;

pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
Expand Down Expand Up @@ -403,34 +404,47 @@ impl VirtualFile {
Ok(vfile)
}

/// Async version of [`::utils::crashsafe::overwrite`].
/// Writes a file to the specified `final_path` in a crash safe fasion
///
/// # NB:
///
/// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
/// it did at an earlier time.
/// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: B,
) -> std::io::Result<()> {
// TODO: use tokio_epoll_uring if configured as `io_engine`.
// See https://github.com/neondatabase/neon/issues/6663

tokio::task::spawn_blocking(move || {
let slice_storage;
let content_len = content.bytes_init();
let content = if content.bytes_init() > 0 {
slice_storage = Some(content.slice(0..content_len));
slice_storage.as_deref().expect("just set it to Some()")
} else {
&[]
};
utils::crashsafe::overwrite(&final_path, &tmp_path, content)
})
.await
.expect("blocking task is never aborted")
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
}

/// Call File::sync_all() on the underlying File.
Expand Down Expand Up @@ -1323,7 +1337,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");

VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1332,7 +1346,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);

VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1354,7 +1368,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());

VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();

Expand Down

1 comment on commit 024372a

@github-actions
Copy link

@github-actions github-actions bot commented on 024372a Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2511 tests run: 2384 passed, 2 failed, 125 skipped (full report)


Failures on Postgres 14

  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]: release
  • test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30] or test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]"
Flaky tests (6)

Postgres 16

  • test_crafted_wal_end[last_wal_record_crossing_segment]: debug
  • test_delete_tenant_exercise_crash_safety_failpoints[real_s3-timeline-delete-before-index-delete-False-Check.RETRY_WITHOUT_RESTART]: release

Postgres 15

  • test_create_snapshot: debug
  • test_fixture_restart: debug

Postgres 14

  • test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]: release
  • test_statvfs_pressure_usage: debug

Code coverage (full report)

  • functions: 56.0% (12859 of 22974 functions)
  • lines: 82.6% (69522 of 84141 lines)

The comment gets automatically updated with the latest test results
024372a at 2024-02-14T21:01:00.235Z :recycle:

Please sign in to comment.