-
Notifications
You must be signed in to change notification settings - Fork 333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flow): flow refill state (Part 1) #5295
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request introduces several enhancements to the flow management system, focusing on improving flow plan tracking, error handling, and utility functions across multiple components. The changes span various modules in the Changes
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5295 +/- ##
==========================================
- Coverage 84.08% 83.79% -0.30%
==========================================
Files 1199 1201 +2
Lines 224585 225608 +1023
==========================================
+ Hits 188852 189053 +201
- Misses 35733 36555 +822 |
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (11)
src/flow/src/adapter/table_source.rs (2)
85-108
: Consider revising “couldn't found” messages for clarity.At lines 96 and 97, the error messages read “couldn't found table info,” which is grammatically incorrect. Consider updating them to “couldn't find table info” or “unable to find table info” for consistency and clarity.
- reason: format!("Table id = {:?}, couldn't found table info", table_id), + reason: format!("Table id = {:?}, couldn't find table info", table_id),
197-205
: Encourage consistent naming: "check_table_exist" → "check_table_exists".Renaming this method to
check_table_exists
might better describe its purpose and align with commonly used naming patterns. If any references in the codebase usecheck_table_exist
directly, those should also be renamed for consistency.- pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> { + pub async fn check_table_exists(&self, table_id: &TableId) -> Result<bool, Error> {src/flow/src/expr.rs (1)
25-25
: Document the 'utils' module purpose.A brief explanation of what functionalities or helpers reside in
utils
would improve clarity and discoverability for future maintainers.src/flow/src/server.rs (2)
443-446
: Add doc comments forFrontendInvoker
.Consider adding a high-level doc comment on the struct describing its responsibilities (insertion, deletion, statement execution) to help new contributors quickly understand its role and usage.
554-591
: Return early if catalogs are empty.In
get_all_flow_ids
, if no catalogs exist, returning an empty list immediately might simplify logic and improve readability, though this code already handles it gracefully by skipping the loop. Just a minor readability suggestion if you anticipate empty-catalog scenarios.if all_catalogs.is_empty() { + return Ok(vec![]); }
src/flow/src/expr/scalar.rs (1)
314-316
: Expand unit test coverage for single-roweval
cases.The TODO comment mentions aligning
eval_batch
witheval
. Consider adding specific tests that evaluate scalar expressions on single-value slices to verify consistent behaviors.Do you want me to open a GitHub issue or provide a PR snippet with test coverage for these single-row scenarios?
src/flow/src/adapter.rs (1)
807-808
: Avoid Non-Atomic Insertions Into Shared StateIn high-throughput scenarios, directly invoking
node_ctx.add_flow_plan(flow_id, flow_plan.clone())
immediately before any subsequent state changes may risk inconsistent state if errors occur later. Consider storing these changes in preliminary state and only updating shared state once all validations pass.tests/runner/src/env.rs (2)
485-485
: Clarify Stoppage IntentIt's not evident why a partial restart scenario (
is_full_restart == false
) stops these processes. A clarifying comment could help future readers understand the reason if continuing them isn't an option.if let Some(server_process) = db.server_processes.clone() { let mut server_processes = server_process.lock().unwrap(); for server_process in server_processes.iter_mut() { + // Explain why these processes need to be stopped even if is_full_restart is false Env::stop_server(server_process); } }
499-502
: Avoid Shadowing inif let Some(...)
The pattern
if let Some(mut flownode_process) = db.flownode_process.lock()...
might overshadow the encapsulated reference, making code less maintainable. Consider separating the lock and the bound variable for clarity.-let flownode_process_option = db.flownode_process.lock().expect("poisoned lock").take(); -if let Some(mut flownode_process) = flownode_process_option { +let mut lock = db.flownode_process.lock().expect("poisoned lock"); +if let Some(mut flownode_process) = lock.take() { Env::stop_server(&mut flownode_process); }src/flow/src/expr/linear.rs (1)
97-119
: Use a Tail-Recursive Approach or Early ReturnIn
get_nth_expr
, the loop re-evaluates indices. A tail recursion or direct early return of the final expression can reduce iteration overhead. Automatic tail-call optimization might not always happen in Rust, but refactoring for clarity can help avoid confusion.src/flow/src/error.rs (1)
262-267
: Correctness of Conversion fromEvalError
The
EvalError -> Error
conversion uses a contextual wrap and immediately unwraps the error; this is correct, but consider a direct construction for clarity, for example:impl From<EvalError> for Error { fn from(e: EvalError) -> Self { - Err::<(), _>(e).context(EvalSnafu).unwrap_err() + Error::Eval { source: e }.build() } }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
src/flow/src/adapter.rs
(1 hunks)src/flow/src/adapter/node_context.rs
(8 hunks)src/flow/src/adapter/table_source.rs
(2 hunks)src/flow/src/error.rs
(2 hunks)src/flow/src/expr.rs
(2 hunks)src/flow/src/expr/linear.rs
(1 hunks)src/flow/src/expr/scalar.rs
(1 hunks)src/flow/src/expr/utils.rs
(1 hunks)src/flow/src/lib.rs
(1 hunks)src/flow/src/plan.rs
(2 hunks)src/flow/src/plan/reduce.rs
(2 hunks)src/flow/src/server.rs
(2 hunks)src/flow/src/test_utils.rs
(1 hunks)tests/runner/src/env.rs
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/flow/src/lib.rs
🔇 Additional comments (24)
src/flow/src/plan.rs (6)
21-22
: No issues with new imports.These lines add the
BTreeSet
import and a blank separator for clarity. No changes needed.
24-24
: Imports for extended expression usage look good.The additional imports (
ScalarExpr
,SafeMfpPlan
, etc.) are necessary for referencing and manipulating expression types in the newly added plan methods. No concerns here.
195-203
: Implementation ofget_nth_expr
appears straightforward.This method delegates to either
Mfp
orReduce
for expression retrieval and returnsNone
otherwise. The design is clear, and performance or correctness issues are not evident.
205-215
:get_first_input_plan
method is well-structured.It follows a consistent match across plan variants, returning the first input where applicable. The approach is concise and functionally correct.
217-227
:get_mut_first_input_plan
method aligns with the immutable version.Granting mutable access for the first input plan is consistent with
get_first_input_plan
. The match arms are parallel, and no additional concerns arise.
229-267
:find_used_collection
effectively gathers global IDs from nested plans.The recursive approach using a
BTreeSet
ensures eachGlobalId
is collected once. Implementation is straightforward, and performance should be acceptable for typical plan sizes.src/flow/src/expr/utils.rs (4)
1-34
: License and module header verified.No functional code in these lines. The license header and high-level documentation comments are standard.
35-81
: Check multiple references to time index for future expansions.The
find_plan_time_window_expr_lower_bound
logic iterates upward until it finds an expression that is not a column reference. It correctly handles the possibility that the time index is absent or that the plan references lower-level input. Consider verifying behavior if multiple columns serve as time indexes in future expansions, though it's likely out of scope now.
82-207
: Monotonic expression assumption is a potential risk.The
find_time_window_lower_bound
function assumes a monotonic expression, returning an error if the expression unexpectedly increases. This is correct for time-window operations but might cause unexpected behavior if new expression types appear. The exponential probe combined with binary search is an efficient approach for typical scenarios.
208-347
: Tests provide good coverage.These tests confirm that the functions return the correct lower-bound timestamps under various scenarios, including scenarios with and without explicit time indexes. The coverage appears thorough.
src/flow/src/adapter/node_context.rs (5)
Line range hint
21-69
: Newflow_plans
field integration looks appropriate.The additions to imports, plus the
pub flow_plans: BTreeMap<FlowId, TypedPlan>
field in the constructor, allowFlownodeContext
to manage flow plans. Initializing withDefault::default()
is consistent with the rest of the struct's design.
186-201
:send_record_batch
method is consistent with existing buffering approach.This method resembles
send_rows
but processes aRecordBatch
. The code uses the broadcast buffer mechanism and updates the row count atomically. The approach is aligned with the existing design.
223-232
:send_rb
method complementssend_record_batch
.This is a straightforward forwarding wrapper, retrieving the
SourceSender
by table ID and invokingsend_record_batch
. The error handling is consistent with the rest of the code.
267-275
: Addition ofadd_flow_plan
andget_flow_plan
methods.These methods provide a clear and accessible way to attach or retrieve flow plans by ID. They integrate cohesively with
FlownodeContext
.
288-288
: Ensuringflow_plans
removal uponremove_flow
.Removing the plan entry in
remove_flow
prevents stale plan references. The code properly aligns with the rest of the cleanup logic.src/flow/src/plan/reduce.rs (2)
15-15
: AdditionalScalarExpr
import is valid.This aligns with the usage of
get_nth_expr
withinKeyValPlan
. No concerns.
26-34
:get_nth_expr
method forKeyValPlan
.The code attempts retrieval from
key_plan
thenval_plan
, adjusting indices appropriately. This mirrors the approach used inPlan::get_nth_expr
, ensuring consistency.src/flow/src/test_utils.rs (4)
1-44
: Module boilerplate and imports are appropriate.This section sets the stage for test utilities with relevant crates and usage definitions.
45-74
:create_test_ctx
function effectively seeds aFlownodeContext
.It initializes a dummy table mapping, sets the query context, and populates the local ID-to-name structures. This function is a helpful utility for consistent test setups.
76-124
:create_test_query_engine
seeds a working environment with in-memory tables.The approach of registering the “numbers” and "numbers_with_ts" tables ensures straightforward queries in tests. The use of
MemTable
is well-suited here.
126-142
:sql_to_substrait
function is a useful bridging utility.Encoding logical plans into Substrait and then decoding them is a robust validation approach. The function’s error handling is consistent, and it integrates seamlessly with test scenarios.
src/flow/src/adapter/table_source.rs (1)
85-108
: Validate timestamp index schema before usage.Before accessing
raw_schema.timestamp_index
, consider adding logic or documentation clarifying the conditions under which the timestamp index may be undefined or invalid, to prevent potential runtime issues.src/flow/src/expr.rs (1)
58-66
: Ensure handling of row diffs in conversion logic.When constructing a
Batch
viaFrom<common_recordbatch::RecordBatch>
, thediffs
field is set toNone
. If diffs are relevant to your system’s logic (for handling insertion/deletion markers), consider adding a comment or mechanism to set or updatediffs
where appropriate.tests/runner/src/env.rs (1)
537-542
: EnsureFlowNode
Is Actually ReplacedBecause you are stopping and spawning a new flownode, consider verifying that the new process is in a healthy state before replacing the old reference in
db.flownode_process
. A short wait or explicit health check may help avoid race conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 5 out of 14 changed files in this pull request and generated no comments.
Files not reviewed (9)
- tests/runner/src/env.rs: Evaluated as low risk
- src/flow/src/lib.rs: Evaluated as low risk
- src/flow/src/expr/scalar.rs: Evaluated as low risk
- src/flow/src/adapter.rs: Evaluated as low risk
- src/flow/src/expr.rs: Evaluated as low risk
- src/flow/src/test_utils.rs: Evaluated as low risk
- src/flow/src/plan/reduce.rs: Evaluated as low risk
- src/flow/src/error.rs: Evaluated as low risk
- src/flow/src/adapter/node_context.rs: Evaluated as low risk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
if !all_ref_columns.contains(&ts_col_idx) { | ||
UnexpectedSnafu { | ||
reason: format!( | ||
"Expected column {} to be referenced in expression {expr:?}", | ||
ts_col_idx | ||
), | ||
} | ||
.fail()? | ||
} | ||
if all_ref_columns.len() > 1 { | ||
UnexpectedSnafu { | ||
reason: format!( | ||
"Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}" | ||
), | ||
} | ||
.fail()? | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ensure!
instead
]; | ||
let engine = create_test_query_engine(); | ||
|
||
for (sql, current, expected) in &testcases[3..] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (sql, current, expected) in &testcases[3..] { | |
for (sql, current, expected) in &testcases { |
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
find_plan_time_window_expr_lower_bound
inexpr/utils.rs
, which find a time window's lower bound given a time stamp and a scalar expr, i.e. for time window expr beingdate_bin(INTERVAL '5 minutes', ts) as time_window
andcurrent="2021-07-01 00:01:01.000"
, returnSome("2021-07-01 00:00:00.000")
(the lower bound of the time window)PR Checklist
Please convert it to a draft if some of the following conditions are not met.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Testing
Internal Changes