Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_service: getpage batching: refactor & minor fixes #9792

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions libs/metrics/src/more_process_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.

use once_cell::sync::Lazy;
use prometheus::Gauge;

use crate::UIntGauge;

pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
cpu_seconds_highres: Gauge,
}

const NMETRICS: usize = 1;
const NMETRICS: usize = 2;

static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");
}
let convertible_to_f64: i32 =
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
convertible_to_f64 as f64
});

impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
Expand All @@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
mfs.extend(self.vmlck.collect())
}
}
if let Ok(stat) = myself.stat() {
let cpu_seconds = stat.utime + stat.stime;
self.cpu_seconds_highres
.set(cpu_seconds as f64 / *CLK_TCK_F64);
mfs.extend(self.cpu_seconds_highres.collect());
}
mfs
}
}
Expand All @@ -43,7 +63,23 @@ impl Collector {
.cloned(),
);

Self { descs, vmlck }
let cpu_seconds_highres = Gauge::new(
"libmetrics_process_cpu_seconds_highres",
"Total user and system CPU time spent in seconds.\
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
)
.unwrap();
descs.extend(
prometheus::core::Collector::desc(&cpu_seconds_highres)
.into_iter()
.cloned(),
);

Self {
descs,
vmlck,
cpu_seconds_highres,
}
}
}

Expand Down
156 changes: 95 additions & 61 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,15 @@ struct PageServerHandler {

timeline_handles: TimelineHandles,

/// Messages queued up for the next processing batch
next_batch: Option<BatchedFeMessage>,

/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
}

struct Carry {
msg: BatchedFeMessage,
started_at: Instant,
}

struct TimelineHandles {
wrapper: TenantManagerWrapper,
/// Note on size: the typical size of this map is 1. The largest size we expect
Expand Down Expand Up @@ -582,7 +584,6 @@ impl PageServerHandler {
connection_ctx,
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
next_batch: None,
server_side_batch_timeout,
}
}
Expand Down Expand Up @@ -611,55 +612,99 @@ impl PageServerHandler {
)
}

#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn read_batch_from_connection<IO>(
problame marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_id: &TenantId,
timeline_id: &TimelineId,
maybe_carry: &mut Option<Carry>,
ctx: &RequestContext,
) -> Result<Option<BatchOrEof>, QueryError>
) -> Result<BatchOrEof, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let mut batch = self.next_batch.take();
let mut batch_started_at: Option<std::time::Instant> = None;

let next_batch: Option<BatchedFeMessage> = loop {
let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
(Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
tokio::time::sleep_until((started_at + batch_timeout).into()),
),
_ => futures::future::Either::Right(futures::future::pending()),
};
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();

let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?

loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
problame marked this conversation as resolved.
Show resolved Hide resolved
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
}
}
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batch.is_some());
break None;
_ = batching_deadline => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
}
msg = pgb.read_message() => { msg }
};

let msg_start = Instant::now();

// Rest of this loop body is trying to batch `msg` into `batch`.
// If we can add msg to batch we continue into the next loop iteration.
// If we can't add msg to batch batch, we carry `msg` over to the next call.

let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
return Ok(Some(BatchOrEof::Eof));
return Ok(BatchOrEof::Eof);
}
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
problame marked this conversation as resolved.
Show resolved Hide resolved
None => {
return Ok(Some(BatchOrEof::Eof));
return Ok(BatchOrEof::Eof);
} // client disconnected
};
trace!("query: {copy_data_bytes:?}");

fail::fail_point!("ps::handle-pagerequest-message");

// parse request
Expand Down Expand Up @@ -701,11 +746,11 @@ impl PageServerHandler {
span,
error: $error,
};
let batch_and_error = match batch {
Some(b) => smallvec::smallvec![b, error],
let batch_and_error = match maybe_carry.take() {
Some(carry) => smallvec::smallvec![carry.msg, error],
None => smallvec::smallvec![error],
};
Ok(Some(BatchOrEof::Batch(batch_and_error)))
Ok(BatchOrEof::Batch(batch_and_error))
}};
}

Expand Down Expand Up @@ -758,26 +803,20 @@ impl PageServerHandler {
}
};

let batch_timeout = match self.server_side_batch_timeout {
Some(value) => value,
None => {
// Batching is not enabled - stop on the first message.
return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
}
};

// check if we can batch
match (&mut batch, this_msg) {
//
// batch
//
match (maybe_carry.as_mut(), this_msg) {
(None, this_msg) => {
batch = Some(this_msg);
*maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
}
(
Some(BatchedFeMessage::GetPage {
Some(Carry { msg: BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}),
}, started_at: _}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
Expand All @@ -787,19 +826,22 @@ impl PageServerHandler {
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
true
Expand All @@ -809,21 +851,17 @@ impl PageServerHandler {
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
(Some(carry), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
let carry = std::mem::replace(carry,
Carry {
msg: this_msg,
started_at: msg_start,
});
return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
}
}

// batching impl piece
let started_at = batch_started_at.get_or_insert_with(Instant::now);
if started_at.elapsed() > batch_timeout {
break None;
}
};

self.next_batch = next_batch;
Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
}
}

/// Pagestream sub-protocol handler.
Expand Down Expand Up @@ -861,22 +899,17 @@ impl PageServerHandler {
}
}

// If [`PageServerHandler`] is reused for multiple pagestreams,
// then make sure to not process requests from the previous ones.
self.next_batch = None;
let mut carry: Option<Carry> = None;

loop {
let maybe_batched = self
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
.await?;
let batched = match maybe_batched {
Some(BatchOrEof::Batch(b)) => b,
Some(BatchOrEof::Eof) => {
BatchOrEof::Batch(b) => b,
BatchOrEof::Eof => {
break;
}
None => {
continue;
}
};

for batch in batched {
Expand Down Expand Up @@ -922,6 +955,7 @@ impl PageServerHandler {
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
Expand Down
3 changes: 2 additions & 1 deletion test_runner/performance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Some handy pytest flags for local development:
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
- `--preserve-database-files` to skip cleanup
- `--out-dir` to produce a JSON with the recorded test metrics

# What performance tests do we have and how we run them

Expand All @@ -36,6 +37,6 @@ All tests run only once. Usually to obtain more consistent performance numbers,

## Results collection

Local test results for main branch, and results of daily performance tests, are stored in a neon project deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
Local test results for main branch, and results of daily performance tests, are stored in a [neon project](https://console.neon.tech/app/projects/withered-sky-69117821) deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.

There is also an inconsistency in test naming. Test name should be the same across platforms, and results can be differentiated by the platform field. But currently, platform is sometimes included in test name because of the way how parametrization works in pytest. I.e. there is a platform switch in the dashboard with neon-local-ci and neon-staging variants. I.e. some tests under neon-local-ci value for a platform switch are displayed as `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]` and `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]` which is highly confusing.
Loading
Loading