Skip to content

Commit

Permalink
Fix tracing bug and add write size event (#33197)
Browse files Browse the repository at this point in the history
This PR fixes a bug in commit tracing where we would increment the `commit_id` even if we didn't push a persistence write to the queue. This fixed by making `start_commit` return an `Option<BoxFuture<..>>` to write to persistence and increment the `commit_id` if it is some. Also adds the size and number of documents written to the traces.

GitOrigin-RevId: d7eda2d624bfe005883d91ce60378f4f58e53466
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Jan 15, 2025
1 parent b91e35f commit c146152
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 29 deletions.
64 changes: 39 additions & 25 deletions crates/database/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ impl<RT: Runtime> Committer<RT> {

let mut root_span = None;
// Keep a monotonically increasing id to keep track of honeycomb traces
// Each commit_id tracks a single write to persistence, from the time the commit
// message is received until the time the commit has been published. We skip
// read-only transactions or commits that fail to validate.
let mut commit_id = 0;
// Keep track of the commit_id that is currently being traced.
let mut span_commit_id = None;
Expand Down Expand Up @@ -323,27 +326,31 @@ impl<RT: Runtime> Committer<RT> {
write_source,
parent_trace,
}) => {
// Skip read-only transactions.
if transaction.is_readonly() {
let _ = result.send(Ok(*transaction.begin_timestamp));
} else {
let root_span = root_span.get_or_insert_with(|| {
span_commit_id = Some(commit_id);
Span::root("commit", SpanContext::random())
});
let _span =
Span::enter_with_parent("start_commit", root_span);
let root = initialize_root_from_parent("handle_commit_message", parent_trace.clone())
.with_property(|| ("time_in_queue_ms", format!("{}", queue_timer.elapsed().as_secs_f64() * 1000.0)));
let _guard = root.set_local_parent();
drop(queue_timer);
self.start_commit(transaction,
result,
write_source,
parent_trace,
commit_id,
root_span);
commit_id += 1;
let root_span_ref = root_span.get_or_insert_with(|| {
span_commit_id = Some(commit_id);
Span::root("commit", SpanContext::random())
});
let _span =
Span::enter_with_parent("start_commit", root_span_ref);
let root = initialize_root_from_parent("handle_commit_message", parent_trace.clone())
.with_property(|| ("time_in_queue_ms", format!("{}", queue_timer.elapsed().as_secs_f64() * 1000.0)));
let _guard = root.set_local_parent();
drop(queue_timer);
if let Some(persistence_write_future) = self.start_commit(transaction,
result,
write_source,
parent_trace,
commit_id,
root_span_ref) {
self.persistence_writes.push_back(persistence_write_future);
commit_id += 1;
} else if span_commit_id == Some(commit_id) {
// If the span_commit_id is the same as the commit_id, that means we created a root span in this block
// and it didn't get incremented, so it's not a write to persistence and we should not trace it.
// We also need to reset the span_commit_id and root_span.
root_span_ref.cancel();
root_span = None;
span_commit_id = None;
}
},
#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -819,6 +826,8 @@ impl<RT: Runtime> Committer<RT> {
}

#[minitrace::trace]
/// Returns a future to add to the pending_writes queue, if the commit
/// should be written.
fn start_commit(
&mut self,
transaction: FinalTransaction,
Expand All @@ -827,7 +836,12 @@ impl<RT: Runtime> Committer<RT> {
parent_trace: EncodedSpan,
commit_id: usize,
root_span: &Span,
) {
) -> Option<BoxFuture<'static, anyhow::Result<PersistenceWrite>>> {
// Skip read-only transactions.
if transaction.is_readonly() {
let _ = result.send(Ok(*transaction.begin_timestamp));
return None;
}
let commit_timer = metrics::commit_timer();
metrics::log_write_tx(&transaction);

Expand All @@ -843,7 +857,7 @@ impl<RT: Runtime> Committer<RT> {
Ok(v) => v,
Err(e) => {
let _ = result.send(Err(e));
return;
return None;
},
};

Expand All @@ -855,7 +869,7 @@ impl<RT: Runtime> Committer<RT> {
parent_trace.clone(),
);
let outer_span = Span::enter_with_parents("outer_write_commit", [root_span, &request_span]);
self.persistence_writes.push_back(
Some(
async move {
Self::track_commit(
usage_tracking,
Expand All @@ -876,7 +890,7 @@ impl<RT: Runtime> Committer<RT> {
}
.in_span(outer_span)
.boxed(),
);
)
}

#[minitrace::trace]
Expand Down
28 changes: 24 additions & 4 deletions crates/mysql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod metrics;
#[cfg(test)]
mod tests;
use std::{
borrow::Cow,
cmp,
collections::{
BTreeMap,
Expand Down Expand Up @@ -44,6 +45,7 @@ use common::{
ResolvedDocument,
},
errors::lease_lost_error,
heap_size::HeapSize,
index::{
IndexEntry,
IndexKeyBytes,
Expand Down Expand Up @@ -260,12 +262,30 @@ impl<RT: Runtime> Persistence for MySqlPersistence<RT> {
conflict_strategy: ConflictStrategy,
) -> anyhow::Result<()> {
anyhow::ensure!(documents.len() <= MAX_INSERT_SIZE);
anyhow::ensure!(documents.iter().all(|update| {
let mut write_size = 0;
for update in &documents {
match &update.value {
Some(doc) => update.id == doc.id_with_table_id(),
None => true,
Some(doc) => {
anyhow::ensure!(update.id == doc.id_with_table_id());
write_size += doc.heap_size();
},
None => {},
}
}));
}
metrics::log_write_bytes(write_size);
metrics::log_write_documents(documents.len());
Event::add_to_local_parent("write_to_persistence_size", || {
[
(
Cow::Borrowed("num_documents"),
Cow::Owned(documents.len().to_string()),
),
(
Cow::Borrowed("write_size"),
Cow::Owned(write_size.to_string()),
),
]
});

// True, the below might end up failing and not changing anything.
self.newly_created.store(false, SeqCst);
Expand Down
14 changes: 14 additions & 0 deletions crates/mysql/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use common::pool_stats::ConnectionPoolStats;
use metrics::{
log_counter_with_labels,
log_distribution,
log_distribution_with_labels,
register_convex_counter,
register_convex_gauge,
Expand Down Expand Up @@ -349,6 +350,19 @@ pub fn insert_index_chunk_timer(cluster_name: &str) -> StatusTimer {
timer
}

register_convex_histogram!(MYSQL_WRITE_BYTES, "Number of bytes written in MySQL writes");
pub fn log_write_bytes(size: usize) {
log_distribution(&MYSQL_WRITE_BYTES, size as f64);
}

register_convex_histogram!(
MYSQL_WRITE_DOCUMENTS,
"Number of documents written in MySQL writes",
);
pub fn log_write_documents(size: usize) {
log_distribution(&MYSQL_WRITE_DOCUMENTS, size as f64);
}

register_convex_histogram!(
MYSQL_LEASE_ACQUIRE_SECONDS,
"Time to acquire a lease",
Expand Down

0 comments on commit c146152

Please sign in to comment.