Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(functions): manually drop state in function eval_aggr function #5080

Merged
merged 5 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
}

unsafe fn drop_state(&self, place: StateAddr) {
self.nested.drop_state(place)
self.nested.drop_state(self.nested_place(place))
}

fn convert_const_to_full(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<const NULLABLE_RESULT: bool, const STKIP_NULL: bool> AggregateFunction
}

unsafe fn drop_state(&self, place: StateAddr) {
self.nested.drop_state(place)
self.nested.drop_state(self.nested_place(place))
}

fn convert_const_to_full(&self) -> bool {
Expand Down
24 changes: 19 additions & 5 deletions common/functions/src/aggregates/aggregator_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,30 @@ pub fn eval_aggr(
let cols: Vec<ColumnRef> = columns.iter().map(|c| c.column().clone()).collect();

let func = factory.get(name, params, arguments)?;
let data_type = func.return_type()?;

let arena = Bump::new();
let place = arena.alloc_layout(func.state_layout());
let addr = place.into();
func.init_state(addr);
func.accumulate(addr, &cols, None, rows)?;

let data_type = func.return_type()?;
let mut builder = data_type.create_mutable(1024);
func.merge_result(addr, builder.as_mut())?;
let f = func.clone();
// we need a temporary function to catch the errors
let apply = || -> Result<ColumnRef> {
func.accumulate(addr, &cols, None, rows)?;
let mut builder = data_type.create_mutable(1024);
func.merge_result(addr, builder.as_mut())?;

Ok(builder.to_column())
};

Ok(builder.to_column())
let result = apply();

if f.need_manual_drop_state() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need exit_guard

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply function is used to catch the whole errors.

exit_guard can not catch the Bump.

Copy link
Member

@zhang2014 zhang2014 Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if panic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think exit_guard can't catch the panic either.

Copy link
Member

@zhang2014 zhang2014 Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe {
f.drop_state(addr);
}
}
drop(arena);
result
}