Skip to content

Commit

Permalink
Merge pull request #658 from splitgraph/extract-sync-planner
Browse files Browse the repository at this point in the history
Extract the sync planning logic into a separate module
  • Loading branch information
gruuya authored Sep 9, 2024
2 parents 1a49077 + 45d23b6 commit 4728860
Show file tree
Hide file tree
Showing 7 changed files with 786 additions and 702 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ on: [push, pull_request]

jobs:
CI:
# Enable pull request trigger only from forks, since otherwise we get 2 job runs in PRs
if:
github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name !=
github.repository
name: Lint, build, test
runs-on: ubuntu-latest
env:
Expand Down
52 changes: 37 additions & 15 deletions src/sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const SQUASHED_BYTES: &str = "seafowl_changeset_writer_squashed_bytes_total";
const SQUASHED_ROWS: &str = "seafowl_changeset_writer_squashed_rows_total";
const PRUNING_TIME: &str = "seafowl_changeset_writer_pruning_time_milliseconds";
const PRUNING_FILES: &str = "seafowl_changeset_writer_pruning_files_total";
const PLANNING_TIME: &str = "seafowl_changeset_writer_planning_time_milliseconds";
const FLUSH_TIME: &str = "seafowl_changeset_writer_flush_time_seconds";
const FLUSH_BYTES: &str = "seafowl_changeset_writer_flush_bytes_total";
const FLUSH_ROWS: &str = "seafowl_changeset_writer_flush_rows_total";
Expand All @@ -25,7 +26,7 @@ const SEQUENCE_DURABLE: &str = "seafowl_changeset_writer_sequence_durable_bytes"
const SEQUENCE_MEMORY: &str = "seafowl_changeset_writer_sequence_memory_bytes";

#[derive(Clone)]
pub struct SyncMetrics {
pub struct SyncWriterMetrics {
pub request_bytes: Counter,
pub request_rows: Counter,
pub in_memory_bytes: Gauge,
Expand All @@ -34,22 +35,21 @@ pub struct SyncMetrics {
pub squash_time: Histogram,
pub squashed_bytes: Counter,
pub squashed_rows: Counter,
pub pruning_time: Histogram,
pub pruning_files: Histogram,
pub planning_time: Histogram,
pub flush_time: Histogram,
pub flush_bytes: Counter,
pub flush_rows: Counter,
pub flush_last: Gauge,
pub flush_lag: Histogram,
}

impl Default for SyncMetrics {
impl Default for SyncWriterMetrics {
fn default() -> Self {
Self::new()
}
}

impl SyncMetrics {
impl SyncWriterMetrics {
fn new() -> Self {
describe_counter!(
REQUEST_BYTES,
Expand Down Expand Up @@ -83,14 +83,7 @@ impl SyncMetrics {
SQUASHED_ROWS,
"The reduction in row count due to batch squashing"
);
describe_histogram!(
PRUNING_TIME,
"The time taken to prune partition files to re-write"
);
describe_histogram!(
PRUNING_FILES,
"The file count that partition pruning identified"
);
describe_histogram!(PLANNING_TIME, "The time taken to construct the flush plan");
describe_histogram!(FLUSH_TIME, "The time taken to flush a collections of syncs");
describe_counter!(FLUSH_BYTES, "The total byte size flushed");
describe_counter!(FLUSH_ROWS, "The total row count flushed");
Expand All @@ -111,8 +104,7 @@ impl SyncMetrics {
squash_time: histogram!(SQUASH_TIME),
squashed_bytes: counter!(SQUASHED_BYTES),
squashed_rows: counter!(SQUASHED_ROWS),
pruning_time: histogram!(PRUNING_TIME),
pruning_files: histogram!(PRUNING_FILES),
planning_time: histogram!(PLANNING_TIME),
flush_time: histogram!(FLUSH_TIME),
flush_bytes: counter!(FLUSH_BYTES),
flush_rows: counter!(FLUSH_ROWS),
Expand All @@ -131,3 +123,33 @@ impl SyncMetrics {
sequence_memory.set(sequence as f64);
}
}

#[derive(Clone)]
pub struct SyncPlanMetrics {
pub pruning_time: Histogram,
pub pruning_files: Histogram,
}

impl Default for SyncPlanMetrics {
fn default() -> Self {
Self::new()
}
}

impl SyncPlanMetrics {
fn new() -> Self {
describe_histogram!(
PRUNING_TIME,
"The time taken to prune partition files to re-write"
);
describe_histogram!(
PRUNING_FILES,
"The file count that partition pruning identified"
);

Self {
pruning_time: histogram!(PRUNING_TIME),
pruning_files: histogram!(PRUNING_FILES),
}
}
}
1 change: 1 addition & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::RwLock;
use tracing::warn;

mod metrics;
mod planner;
pub mod schema;
mod utils;
pub(crate) mod writer;
Expand Down
Loading

0 comments on commit 4728860

Please sign in to comment.