Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: get_stats merge and simplify #1150

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Jan 30, 2025

Fixes #XXXX.

Description

  • Use serde and Query to abstract away param extraction
  • Merge get_stats from logstream.rs(used in both ingestor and standalone) and query_logstream.rs(used in querier)
  • Organize imports in affected files

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Stream statistics now support optional date filtering, offering more tailored queries and clearer output presentation.
    • Introduced a new method for aggregating multiple stream statistics into a single instance.
  • Refactor

    • Consolidated the aggregation logic for stream metrics to ensure more accurate and reliable cumulative data.
    • Streamlined the routing and error handling for statistics retrieval, resulting in more efficient and robust responses.
    • Enhanced clarity and maintainability in the code structure related to stream statistics.
    • Updated method signatures for improved parameter handling and efficiency.
    • Removed outdated functions to simplify the codebase.

@de-sh de-sh changed the title refactor: simplify query param extraction refactor: simplify query param extraction and merge get_stats Jan 31, 2025
@de-sh de-sh changed the title refactor: simplify query param extraction and merge get_stats refactor: get_stats merge and simplify Feb 4, 2025
Copy link

coderabbitai bot commented Feb 13, 2025

Walkthrough

The changes refactor several components associated with stream statistics handling. A new method has been added to the QueriedStats struct for merging statistics, replacing an older standalone function. In the logstream handler, an outdated date-based function has been removed, and the main statistics endpoint now accepts structured query parameters. The query server route has been updated to reference the new implementation. Additionally, the stats module has been enhanced with a new StatsParams struct to support date-specific queries and improved import organization.

Changes

File(s) Change Summary
src/handlers/http/cluster/utils.rs Added new pub fn merge(stats: Vec<Self>) -> Self method to QueriedStats struct that aggregates stats; removed the old merge_quried_stats function.
src/handlers/http/logstream.rs Removed get_stats_date; updated get_stats to accept Query<StatsParams> instead of HttpRequest; enhanced error handling with conditional checks and refined statistics formatting.
src/handlers/http/modal/query/querier_logstream.rs Removed the get_stats function and cleaned up related imports by removing unused ingestor fetching functions.
src/handlers/http/modal/query_server.rs Updated the /stats route to use logstream::get_stats in place of querier_logstream::get_stats, reflecting a change in the source module for statistics retrieval.
src/stats.rs Reorganized and added imports (including serde); introduced a new StatsParams struct with an optional date field and a corresponding get_stats method to support date-specific queries; updated struct annotations for serialization using Serialize/Deserialize.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant S as HTTP Server
    participant LS as LogStream Handler
    participant Stats as Stats Module

    C->>S: GET /stats?date=YYYY-MM-DD
    S->>LS: Handle get_stats(Query<StatsParams>, stream_name)
    LS->>LS: Validate stream (skipped under test cfg)
    alt Internal Stream
        LS->>Stats: fetch ingestion statistics via ingestors
    else External Stream
        LS->>Stats: retrieve local store stats
    end
    Stats-->>LS: Aggregated statistics
    LS->>S: Create HTTP Response with stats
    S->>C: Return HTTP Response
Loading

Poem

I'm a rabbit in the codewood, hopping with glee,
Merging stats and shedding old functions—oh, how fancy can it be!
Routes and queries now dance in a streamlined flow,
Each byte and label in perfect row.
With whiskers twitching at every bug set free,
I nibble on changes beneath the digital tree.
🐇 Hooray for clean code and a neat, bouncy spree!


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
src/handlers/http/logstream.rs (3)

224-227: Refined function signature for typed query parameters.

Accepting Query<StatsParams> instead of an HttpRequest is a clean improvement that reduces manual parsing and fosters clarity. As a future enhancement, consider validating the stream_name and ensuring the StatsParams fields are in valid ranges, if necessary.


278-283: Consistent string formatting for ingestion stats.

Appending "Bytes" is straightforward. Watch out for very large values (potential numeric overflow) although typical use should suffice.


296-308: Repeated block for stats instantiation.

The logic looks correct and consistent with the earlier block. Consider refactoring these two blocks into a shared function to avoid duplication (DRY principle).

src/stats.rs (1)

211-214: Add documentation for the new struct.

The StatsParams struct lacks documentation explaining its purpose and the expected format of the date field.

Add documentation comments:

+/// Parameters for retrieving stream statistics.
+/// Used to filter statistics by date.
 #[derive(Debug, Deserialize)]
 pub struct StatsParams {
+    /// Optional date in YYYY-MM-DD format to filter statistics.
     pub date: Option<String>,
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20e66a4 and 82f04d6.

📒 Files selected for processing (5)
  • src/handlers/http/cluster/utils.rs (1 hunks)
  • src/handlers/http/logstream.rs (8 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (2 hunks)
  • src/handlers/http/modal/query_server.rs (1 hunks)
  • src/stats.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (15)
src/handlers/http/logstream.rs (11)

19-20: Imports look good.

The addition of StatusCode and Query is consistent with the refactor to handle HTTP responses and structured query parameters, respectively.


39-39: Usage of StatsParams.

Good inclusion of StatsParams to facilitate typed, date-based or other parameterized stats retrieval.


45-46: Imports for cluster utilities.

Importing fetch_stats_from_ingestors and related types ensures a clearer modular approach to gather stats, especially when running in Query mode.


230-230: Test-mode condition check.

Using cfg!(not(test)) here prevents potential test failures due to missing in-memory data. This approach is sensible; just be sure any integration or end-to-end tests still handle stream setup as needed.


247-248: Short-circuit stats retrieval.

Returning early if params.get_stats(&stream_name) yields a valid result is a neat step to accommodate date-based or custom stats. This approach looks good.


254-262: Conditional ingestor stats retrieval.

Fetching ingestor stats only if we're in Query mode and the stream is internal. This seems intentional but might skip potential external streams. Double-check whether external streams are meant to be included or not.


286-288: Size formatting for storage stats.

Matches the ingestion stats approach and is consistent. No issues detected.


315-317: Merging multiple ingestor stats.

QueriedStats::merge is a clear, reusable method to unify multiple stats sources. This is a clean approach.


322-322: Returning aggregated stats.

Building the final HTTP response with StatusCode::OK and the merged stats is straightforward and clear.


633-638: Expanded test imports.

The usage of anyhow::bail and StatsParams suggests robust test coverage. Consider adding more tests for different StatsParams variants.

Also applies to: 640-640


650-661: New test for nonexistent stream.

Good test coverage ensuring get_stats returns StreamNotFound for an unknown stream name. Adding more tests (e.g., handling date-based scenarios) would further strengthen confidence.

src/handlers/http/modal/query/querier_logstream.rs (3)

20-20: Import consolidation.

The updated imports from actix_web properly reduce clutter. No issues found.


31-32: Shifted cluster and error imports.

Removing references to old stats-collection methods and using StreamError from logstream::error aligns with the consolidated approach to stats retrieval.


36-36: Added stats module import.

Including stats here suggests potential usage in stream deletion or synchronization logic. Looks consistent with the updated design.

src/handlers/http/modal/query_server.rs (1)

286-286: LGTM!

The route handler change aligns with the PR objectives to merge and simplify the get_stats functionality.

Comment on lines +54 to +129
let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

let cumulative_ingestion =
stats
.iter()
.map(|x| &x.ingestion)
.fold(IngestionStats::default(), |acc, x| IngestionStats {
count: acc.count + x.count,

size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_count: acc.lifetime_count + x.lifetime_count,
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_count: acc.deleted_count + x.deleted_count,
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

let cumulative_storage =
stats
.iter()
.map(|x| &x.storage)
.fold(StorageStats::default(), |acc, x| StorageStats {
size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fragile string-based parsing of byte sizes.

Splitting on " Bytes" and re-parsing is susceptible to format changes. A more robust approach is to store sizes as numeric fields within the QueriedStats, IngestionStats, and StorageStats, then only format them when returning or displaying. This prevents runtime errors if the format changes and simplifies logic.

 pub fn merge(stats: Vec<Self>) -> Self {
     // Instead of brandishing " Bytes" mid-calculation, define numeric fields in these structs:
     //   - e.g., `pub size_in_bytes: u64`
     // Summation is then just numeric addition.
     // Then, in display or response, format with "Bytes".
 }

Also applies to: 95-128, 130-136

Comment on lines +49 to +52

pub fn merge(stats: Vec<Self>) -> Self {
// get the stream name
let stream_name = stats[1].stream.clone();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential index out-of-bounds.

Indexing stats[1] can panic if the vector is empty or has only one element. Consider checking the length first or referencing stats.first() or stats[0] if intended.

 pub fn merge(stats: Vec<Self>) -> Self {
-    let stream_name = stats[1].stream.clone();
+    if stats.is_empty() {
+        return QueriedStats::default();
+    }
+    let stream_name = stats[0].stream.clone(); // or handle if there's only 1
    ...
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn merge(stats: Vec<Self>) -> Self {
// get the stream name
let stream_name = stats[1].stream.clone();
pub fn merge(stats: Vec<Self>) -> Self {
// get the stream name
if stats.is_empty() {
return QueriedStats::default();
}
let stream_name = stats[0].stream.clone(); // or handle if there's only 1
...
}

src/stats.rs Outdated
Comment on lines 217 to 239
pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
let date = self.date?;
let event_labels = event_labels_date(stream_name, "json", &date);
let storage_size_labels = storage_size_labels_date(stream_name, &date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

Some(Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add date format validation.

The get_stats method accepts any string as a date without validation. Consider adding date format validation to ensure the input is in the expected format.

Add date validation:

 impl StatsParams {
     pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
         let date = self.date?;
+        // Validate date format (YYYY-MM-DD)
+        if !date.chars().count() == 10 
+            || !date.chars().all(|c| c.is_ascii_digit() || c == '-')
+            || !date.split('-').count() == 3 {
+            return None;
+        }
         let event_labels = event_labels_date(stream_name, "json", &date);

Committable suggestion skipped: line range outside the PR's diff.


⚠️ Potential issue

Add error handling for metric retrieval.

The current implementation uses unwrap() which can cause panics if metric retrieval fails. Consider using proper error handling to gracefully handle failures.

Apply this diff to add error handling:

 impl StatsParams {
     pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
         let date = self.date?;
         let event_labels = event_labels_date(stream_name, "json", &date);
         let storage_size_labels = storage_size_labels_date(stream_name, &date);
         let events_ingested = EVENTS_INGESTED_DATE
             .get_metric_with_label_values(&event_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;
         let ingestion_size = EVENTS_INGESTED_SIZE_DATE
             .get_metric_with_label_values(&event_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;
         let storage_size = EVENTS_STORAGE_SIZE_DATE
             .get_metric_with_label_values(&storage_size_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;

         Some(Stats {
             events: events_ingested,
             ingestion: ingestion_size,
             storage: storage_size,
         })
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
let date = self.date?;
let event_labels = event_labels_date(stream_name, "json", &date);
let storage_size_labels = storage_size_labels_date(stream_name, &date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;
Some(Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
})
}
pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
let date = self.date?;
let event_labels = event_labels_date(stream_name, "json", &date);
let storage_size_labels = storage_size_labels_date(stream_name, &date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.ok()?
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.ok()?
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.ok()?
.get() as u64;
Some(Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
})
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

254-262: Improve error handling for ingestor stats.

The error handling for get_stream could be more explicit. Consider separating the stream type check to provide clearer error messages.

-    let ingestor_stats = if PARSEABLE.options.mode == Mode::Query
-        && PARSEABLE
-            .get_stream(&stream_name)
-            .is_ok_and(|s| s.get_stream_type() == StreamType::Internal)
-    {
-        Some(fetch_stats_from_ingestors(&stream_name).await?)
-    } else {
-        None
-    };
+    let ingestor_stats = if PARSEABLE.options.mode == Mode::Query {
+        match PARSEABLE.get_stream(&stream_name) {
+            Ok(stream) if stream.get_stream_type() == StreamType::Internal => {
+                Some(fetch_stats_from_ingestors(&stream_name).await?)
+            }
+            Ok(_) => None,
+            Err(e) => return Err(e.into()),
+        }
+    } else {
+        None
+    };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 82f04d6 and 3902bd2.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (8 hunks)
🔇 Additional comments (3)
src/handlers/http/logstream.rs (3)

19-48: LGTM!

The import changes are well-organized and properly support the new functionality.


224-227: LGTM! Function signature improvements.

The change from HttpRequest to Query<StatsParams> provides better type safety and structured parameter handling.


635-636: LGTM! Test updates.

The test updates correctly reflect the new function signature and import changes.

Also applies to: 647-650

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

275-315: Reduce code duplication in stats creation.

The stats creation code is identical in both match arms of stream_meta.first_event_at. This duplication can be eliminated.

Consider moving the common code outside the match:

let ingestion_stats = IngestionStats::new(
    stats.current_stats.events,
    format!("{} Bytes", stats.current_stats.ingestion),
    stats.lifetime_stats.events,
    format!("{} Bytes", stats.lifetime_stats.ingestion),
    stats.deleted_stats.events,
    format!("{} Bytes", stats.deleted_stats.ingestion),
    "json",
);
let storage_stats = StorageStats::new(
    format!("{} Bytes", stats.current_stats.storage),
    format!("{} Bytes", stats.lifetime_stats.storage),
    format!("{} Bytes", stats.deleted_stats.storage),
    "parquet",
);

let stats = QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3902bd2 and 92fba76.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (11 hunks)
🧰 Additional context used
🪛 GitHub Actions: Lint, Test and Coverage Report
src/handlers/http/logstream.rs

[error] 328-328: This if statement can be collapsed. For further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_if.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/handlers/http/logstream.rs (2)

19-48: LGTM! Well-organized imports.

The imports are logically grouped and support the refactored stats functionality.


226-229: LGTM! Improved parameter handling.

The function signature change to use Query<StatsParams> provides better structure and type safety for query parameters.

Comment on lines +168 to 177
if PARSEABLE.options.mode == Mode::Query
&& matches!(
PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await,
Ok(false) | Err(_)
)
{
return Err(StreamNotFound(stream_name.clone()).into());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract repeated stream existence check into a helper function.

The same error handling pattern for checking stream existence is duplicated across multiple functions. This can be refactored into a reusable helper function to improve maintainability and reduce code duplication.

Consider creating a helper function like this:

async fn ensure_stream_exists(stream_name: &str) -> Result<(), StreamError> {
    if PARSEABLE.options.mode == Mode::Query
        && matches!(
            PARSEABLE
                .create_stream_and_schema_from_storage(stream_name)
                .await,
            Ok(false) | Err(_)
        )
    {
        return Err(StreamNotFound(stream_name.to_string()).into());
    }
    Ok(())
}

Then use it in all the functions:

async fn get_retention(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
    let stream_name = stream_name.into_inner();
    ensure_stream_exists(&stream_name).await?;
    // ... rest of the function
}

Also applies to: 195-204, 329-338, 443-452, 471-480

src/handlers/http/logstream.rs Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

278-283: Extract repeated size formatting into a helper function.

The " Bytes" suffix formatting is duplicated across multiple places. Consider extracting this into a helper function to improve maintainability and reduce code duplication.

+ fn format_size(size: u64) -> String {
+     format!("{} Bytes", size)
+ }

  let ingestion_stats = IngestionStats::new(
      stats.current_stats.events,
-     format!("{} Bytes", stats.current_stats.ingestion),
+     format_size(stats.current_stats.ingestion),
      stats.lifetime_stats.events,
-     format!("{} Bytes", stats.lifetime_stats.ingestion),
+     format_size(stats.lifetime_stats.ingestion),
      stats.deleted_stats.events,
-     format!("{} Bytes", stats.deleted_stats.ingestion),
+     format_size(stats.deleted_stats.ingestion),
      "json",
  );
  let storage_stats = StorageStats::new(
-     format!("{} Bytes", stats.current_stats.storage),
-     format!("{} Bytes", stats.lifetime_stats.storage),
-     format!("{} Bytes", stats.deleted_stats.storage),
+     format_size(stats.current_stats.storage),
+     format_size(stats.lifetime_stats.storage),
+     format_size(stats.deleted_stats.storage),
      "parquet",
  );

Also applies to: 286-289, 298-303, 306-309

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92fba76 and 9f54e9e.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/handlers/http/logstream.rs (3)

19-47: LGTM! Import changes look good.

The imports have been well-organized and the new imports (Query, StatsParams) align with the PR's objective of using structured query parameters.


168-177: Extract repeated stream existence check into a helper function.

The same error handling pattern for checking stream existence is duplicated across multiple functions. This can be refactored into a reusable helper function to improve maintainability and reduce code duplication.

Also applies to: 195-204, 235-244, 327-337, 386-395, 440-449, 468-477


226-249: LGTM! Improved parameter handling and error checking.

The changes effectively:

  • Use structured query parameters via StatsParams
  • Add support for date-specific queries
  • Improve error handling with better stream existence checks

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/stats.rs (1)

244-255: Add date range validation.

While the date parsing is handled correctly, consider adding validation for reasonable date ranges to prevent processing of far future or past dates.

 pub fn deserialize_date<'de, D>(deserializer: D) -> Result<Option<NaiveDate>, D::Error>
 where
     D: Deserializer<'de>,
 {
     let Some(s) = Option::<String>::deserialize(deserializer)? else {
         return Ok(None);
     };

-    NaiveDate::parse_from_str(&s, "%Y-%m-%d")
+    let date = NaiveDate::parse_from_str(&s, "%Y-%m-%d")
+        .map_err(serde::de::Error::custom)?;
+    
+    // Validate date range (e.g., not too far in the past or future)
+    let today = chrono::Local::now().date_naive();
+    if date > today || date < today.pred().pred().pred().pred().pred() { // 5 days in the past
+        return Err(serde::de::Error::custom("Date out of valid range"));
+    }
+    
+    Ok(Some(date))
-        .map(Some)
-        .map_err(serde::de::Error::custom)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9f54e9e and 5e7d8d1.

📒 Files selected for processing (1)
  • src/stats.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/stats.rs (2)

21-33: LGTM! Well-organized imports and clean struct updates.

The imports are properly organized, and the structs are correctly updated with serde traits while maintaining backward compatibility.

Also applies to: 36-48


212-242: Add error handling for metric retrieval.

The current implementation uses unwrap() which can cause panics if metric retrieval fails. Consider using proper error handling to gracefully handle failures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant