Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): flow refill state (Part 1) #5295

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

discord9
Copy link
Contributor

@discord9 discord9 commented Jan 6, 2025

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

  • some util functions for refilling flow within a given time window of time index in flow
  • core util function is find_plan_time_window_expr_lower_bound in expr/utils.rs, which find a time window's lower bound given a time stamp and a scalar expr, i.e. for time window expr being date_bin(INTERVAL '5 minutes', ts) as time_window and current="2021-07-01 00:01:01.000", return Some("2021-07-01 00:00:00.000")(the lower bound of the time window)
  • other changes: refactor&helper function that are used in finding time window

PR Checklist

Please convert it to a draft if some of the following conditions are not met.

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.
  • API changes are backward compatible.
  • Schema or data changes are backward compatible.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced flow management with improved tracking of flow plans
    • Added support for evaluating scalar expressions
    • Introduced utility functions for time window calculations
  • Improvements

    • Expanded error handling capabilities
    • Added methods for retrieving and managing flow and plan information
    • Improved table source and context management
  • Testing

    • Added new test utility functions for query engine and context creation
  • Internal Changes

    • Introduced new modules and methods for better code organization and functionality

@discord9 discord9 requested review from zhongzc, waynexia and a team as code owners January 6, 2025 05:07
Copy link
Contributor

coderabbitai bot commented Jan 6, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This pull request introduces several enhancements to the flow management system, focusing on improving flow plan tracking, error handling, and utility functions across multiple components. The changes span various modules in the src/flow directory, adding new methods for managing flow plans, evaluating scalar expressions, handling time window expressions, and creating test utilities. The modifications aim to provide more robust and flexible data flow management capabilities, with a particular emphasis on metadata tracking and error conversion.

Changes

File Change Summary
src/flow/src/adapter.rs Added line to store flow plan in node context during flow creation
src/flow/src/adapter/node_context.rs Added flow_plans field, methods to manage flow plans, and send_record_batch method
src/flow/src/adapter/table_source.rs Added methods to retrieve time index column and check table existence
src/flow/src/error.rs Implemented From trait conversion for EvalError
src/flow/src/expr.rs Added utils module and From implementation for converting RecordBatch to Batch
src/flow/src/expr/linear.rs Added get_nth_expr method to MapFilterProject struct
src/flow/src/expr/scalar.rs Added eval method to ScalarExpr enum
src/flow/src/expr/utils.rs Added utility functions for time window expression handling
src/flow/src/lib.rs Added test_utils module
src/flow/src/plan.rs Added methods for plan manipulation and collection tracking
src/flow/src/plan/reduce.rs Added get_nth_expr method to KeyValPlan struct
src/flow/src/server.rs Added FrontendInvoker struct and get_all_flow_ids function
src/flow/src/test_utils.rs Added test utility functions for context and query engine creation
tests/runner/src/env.rs Modified restart_server method to unconditionally stop flownode process

Poem

🐰 Hop, hop, through the flow's embrace,
New methods dance with coding grace,
Plans tracked, errors converted with care,
Expressions eval'd beyond compare,
A rabbit's code, both swift and bright,
Bringing data's magic to new height! 🌟


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Jan 6, 2025
Copy link

codecov bot commented Jan 6, 2025

Codecov Report

Attention: Patch coverage is 63.21839% with 192 lines in your changes missing coverage. Please review.

Project coverage is 83.79%. Comparing base (69d9a28) to head (51a5efb).
Report is 6 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5295      +/-   ##
==========================================
- Coverage   84.08%   83.79%   -0.30%     
==========================================
  Files        1199     1201       +2     
  Lines      224585   225608    +1023     
==========================================
+ Hits       188852   189053     +201     
- Misses      35733    36555     +822     

@discord9 discord9 changed the title feat(flow): (Part 1)flow refill utils feat(flow): flow refill state (Part 1) Jan 6, 2025
@discord9
Copy link
Contributor Author

discord9 commented Jan 6, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 6, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (11)
src/flow/src/adapter/table_source.rs (2)

85-108: Consider revising “couldn't found” messages for clarity.

At lines 96 and 97, the error messages read “couldn't found table info,” which is grammatically incorrect. Consider updating them to “couldn't find table info” or “unable to find table info” for consistency and clarity.

-                reason: format!("Table id = {:?}, couldn't found table info", table_id),
+                reason: format!("Table id = {:?}, couldn't find table info", table_id),

197-205: Encourage consistent naming: "check_table_exist" → "check_table_exists".

Renaming this method to check_table_exists might better describe its purpose and align with commonly used naming patterns. If any references in the codebase use check_table_exist directly, those should also be renamed for consistency.

-    pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> {
+    pub async fn check_table_exists(&self, table_id: &TableId) -> Result<bool, Error> {
src/flow/src/expr.rs (1)

25-25: Document the 'utils' module purpose.

A brief explanation of what functionalities or helpers reside in utils would improve clarity and discoverability for future maintainers.

src/flow/src/server.rs (2)

443-446: Add doc comments for FrontendInvoker.

Consider adding a high-level doc comment on the struct describing its responsibilities (insertion, deletion, statement execution) to help new contributors quickly understand its role and usage.


554-591: Return early if catalogs are empty.

In get_all_flow_ids, if no catalogs exist, returning an empty list immediately might simplify logic and improve readability, though this code already handles it gracefully by skipping the loop. Just a minor readability suggestion if you anticipate empty-catalog scenarios.

 if all_catalogs.is_empty() {
+    return Ok(vec![]);
 }
src/flow/src/expr/scalar.rs (1)

314-316: Expand unit test coverage for single-row eval cases.

The TODO comment mentions aligning eval_batch with eval. Consider adding specific tests that evaluate scalar expressions on single-value slices to verify consistent behaviors.

Do you want me to open a GitHub issue or provide a PR snippet with test coverage for these single-row scenarios?

src/flow/src/adapter.rs (1)

807-808: Avoid Non-Atomic Insertions Into Shared State

In high-throughput scenarios, directly invoking node_ctx.add_flow_plan(flow_id, flow_plan.clone()) immediately before any subsequent state changes may risk inconsistent state if errors occur later. Consider storing these changes in preliminary state and only updating shared state once all validations pass.

tests/runner/src/env.rs (2)

485-485: Clarify Stoppage Intent

It's not evident why a partial restart scenario (is_full_restart == false) stops these processes. A clarifying comment could help future readers understand the reason if continuing them isn't an option.

 if let Some(server_process) = db.server_processes.clone() {
     let mut server_processes = server_process.lock().unwrap();
     for server_process in server_processes.iter_mut() {
+        // Explain why these processes need to be stopped even if is_full_restart is false
         Env::stop_server(server_process);
     }
 }

499-502: Avoid Shadowing in if let Some(...)

The pattern if let Some(mut flownode_process) = db.flownode_process.lock()... might overshadow the encapsulated reference, making code less maintainable. Consider separating the lock and the bound variable for clarity.

-let flownode_process_option = db.flownode_process.lock().expect("poisoned lock").take();
-if let Some(mut flownode_process) = flownode_process_option {
+let mut lock = db.flownode_process.lock().expect("poisoned lock");
+if let Some(mut flownode_process) = lock.take() {
     Env::stop_server(&mut flownode_process);
 }
src/flow/src/expr/linear.rs (1)

97-119: Use a Tail-Recursive Approach or Early Return

In get_nth_expr, the loop re-evaluates indices. A tail recursion or direct early return of the final expression can reduce iteration overhead. Automatic tail-call optimization might not always happen in Rust, but refactoring for clarity can help avoid confusion.

src/flow/src/error.rs (1)

262-267: Correctness of Conversion from EvalError

The EvalError -> Error conversion uses a contextual wrap and immediately unwraps the error; this is correct, but consider a direct construction for clarity, for example:

impl From<EvalError> for Error {
    fn from(e: EvalError) -> Self {
-       Err::<(), _>(e).context(EvalSnafu).unwrap_err()
+       Error::Eval { source: e }.build()
    }
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 513569e and aedb680.

📒 Files selected for processing (14)
  • src/flow/src/adapter.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (8 hunks)
  • src/flow/src/adapter/table_source.rs (2 hunks)
  • src/flow/src/error.rs (2 hunks)
  • src/flow/src/expr.rs (2 hunks)
  • src/flow/src/expr/linear.rs (1 hunks)
  • src/flow/src/expr/scalar.rs (1 hunks)
  • src/flow/src/expr/utils.rs (1 hunks)
  • src/flow/src/lib.rs (1 hunks)
  • src/flow/src/plan.rs (2 hunks)
  • src/flow/src/plan/reduce.rs (2 hunks)
  • src/flow/src/server.rs (2 hunks)
  • src/flow/src/test_utils.rs (1 hunks)
  • tests/runner/src/env.rs (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/flow/src/lib.rs
🔇 Additional comments (24)
src/flow/src/plan.rs (6)

21-22: No issues with new imports.

These lines add the BTreeSet import and a blank separator for clarity. No changes needed.


24-24: Imports for extended expression usage look good.

The additional imports (ScalarExpr, SafeMfpPlan, etc.) are necessary for referencing and manipulating expression types in the newly added plan methods. No concerns here.


195-203: Implementation of get_nth_expr appears straightforward.

This method delegates to either Mfp or Reduce for expression retrieval and returns None otherwise. The design is clear, and performance or correctness issues are not evident.


205-215: get_first_input_plan method is well-structured.

It follows a consistent match across plan variants, returning the first input where applicable. The approach is concise and functionally correct.


217-227: get_mut_first_input_plan method aligns with the immutable version.

Granting mutable access for the first input plan is consistent with get_first_input_plan. The match arms are parallel, and no additional concerns arise.


229-267: find_used_collection effectively gathers global IDs from nested plans.

The recursive approach using a BTreeSet ensures each GlobalId is collected once. Implementation is straightforward, and performance should be acceptable for typical plan sizes.

src/flow/src/expr/utils.rs (4)

1-34: License and module header verified.

No functional code in these lines. The license header and high-level documentation comments are standard.


35-81: Check multiple references to time index for future expansions.

The find_plan_time_window_expr_lower_bound logic iterates upward until it finds an expression that is not a column reference. It correctly handles the possibility that the time index is absent or that the plan references lower-level input. Consider verifying behavior if multiple columns serve as time indexes in future expansions, though it's likely out of scope now.


82-207: Monotonic expression assumption is a potential risk.

The find_time_window_lower_bound function assumes a monotonic expression, returning an error if the expression unexpectedly increases. This is correct for time-window operations but might cause unexpected behavior if new expression types appear. The exponential probe combined with binary search is an efficient approach for typical scenarios.


208-347: Tests provide good coverage.

These tests confirm that the functions return the correct lower-bound timestamps under various scenarios, including scenarios with and without explicit time indexes. The coverage appears thorough.

src/flow/src/adapter/node_context.rs (5)

Line range hint 21-69: New flow_plans field integration looks appropriate.

The additions to imports, plus the pub flow_plans: BTreeMap<FlowId, TypedPlan> field in the constructor, allow FlownodeContext to manage flow plans. Initializing with Default::default() is consistent with the rest of the struct's design.


186-201: send_record_batch method is consistent with existing buffering approach.

This method resembles send_rows but processes a RecordBatch. The code uses the broadcast buffer mechanism and updates the row count atomically. The approach is aligned with the existing design.


223-232: send_rb method complements send_record_batch.

This is a straightforward forwarding wrapper, retrieving the SourceSender by table ID and invoking send_record_batch. The error handling is consistent with the rest of the code.


267-275: Addition of add_flow_plan and get_flow_plan methods.

These methods provide a clear and accessible way to attach or retrieve flow plans by ID. They integrate cohesively with FlownodeContext.


288-288: Ensuring flow_plans removal upon remove_flow.

Removing the plan entry in remove_flow prevents stale plan references. The code properly aligns with the rest of the cleanup logic.

src/flow/src/plan/reduce.rs (2)

15-15: Additional ScalarExpr import is valid.

This aligns with the usage of get_nth_expr within KeyValPlan. No concerns.


26-34: get_nth_expr method for KeyValPlan.

The code attempts retrieval from key_plan then val_plan, adjusting indices appropriately. This mirrors the approach used in Plan::get_nth_expr, ensuring consistency.

src/flow/src/test_utils.rs (4)

1-44: Module boilerplate and imports are appropriate.

This section sets the stage for test utilities with relevant crates and usage definitions.


45-74: create_test_ctx function effectively seeds a FlownodeContext.

It initializes a dummy table mapping, sets the query context, and populates the local ID-to-name structures. This function is a helpful utility for consistent test setups.


76-124: create_test_query_engine seeds a working environment with in-memory tables.

The approach of registering the “numbers” and "numbers_with_ts" tables ensures straightforward queries in tests. The use of MemTable is well-suited here.


126-142: sql_to_substrait function is a useful bridging utility.

Encoding logical plans into Substrait and then decoding them is a robust validation approach. The function’s error handling is consistent, and it integrates seamlessly with test scenarios.

src/flow/src/adapter/table_source.rs (1)

85-108: Validate timestamp index schema before usage.

Before accessing raw_schema.timestamp_index, consider adding logic or documentation clarifying the conditions under which the timestamp index may be undefined or invalid, to prevent potential runtime issues.

src/flow/src/expr.rs (1)

58-66: Ensure handling of row diffs in conversion logic.

When constructing a Batch via From<common_recordbatch::RecordBatch>, the diffs field is set to None. If diffs are relevant to your system’s logic (for handling insertion/deletion markers), consider adding a comment or mechanism to set or update diffs where appropriate.

tests/runner/src/env.rs (1)

537-542: Ensure FlowNode Is Actually Replaced

Because you are stopping and spawning a new flownode, consider verifying that the new process is in a healthy state before replacing the old reference in db.flownode_process. A short wait or explicit health check may help avoid race conditions.

tests/runner/src/env.rs Show resolved Hide resolved
@waynexia waynexia requested a review from Copilot January 6, 2025 06:44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 5 out of 14 changed files in this pull request and generated no comments.

Files not reviewed (9)
  • tests/runner/src/env.rs: Evaluated as low risk
  • src/flow/src/lib.rs: Evaluated as low risk
  • src/flow/src/expr/scalar.rs: Evaluated as low risk
  • src/flow/src/adapter.rs: Evaluated as low risk
  • src/flow/src/expr.rs: Evaluated as low risk
  • src/flow/src/test_utils.rs: Evaluated as low risk
  • src/flow/src/plan/reduce.rs: Evaluated as low risk
  • src/flow/src/error.rs: Evaluated as low risk
  • src/flow/src/adapter/node_context.rs: Evaluated as low risk
src/flow/src/adapter/node_context.rs Show resolved Hide resolved
src/flow/src/adapter/table_source.rs Outdated Show resolved Hide resolved
src/flow/src/adapter/table_source.rs Outdated Show resolved Hide resolved
src/flow/src/expr.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@fengjiachun fengjiachun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

Comment on lines +94 to +110
if !all_ref_columns.contains(&ts_col_idx) {
UnexpectedSnafu {
reason: format!(
"Expected column {} to be referenced in expression {expr:?}",
ts_col_idx
),
}
.fail()?
}
if all_ref_columns.len() > 1 {
UnexpectedSnafu {
reason: format!(
"Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}"
),
}
.fail()?
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ensure! instead

];
let engine = create_test_query_engine();

for (sql, current, expected) in &testcases[3..] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (sql, current, expected) in &testcases[3..] {
for (sql, current, expected) in &testcases {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants