Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backpressure aware buffer/window with predicate #5567

Closed
MartinNowak opened this issue Aug 24, 2017 · 1 comment
Closed

backpressure aware buffer/window with predicate #5567

MartinNowak opened this issue Aug 24, 2017 · 1 comment
Labels

Comments

@MartinNowak
Copy link

The use-case for this is aggregating consecutive values separated by a boundary, e.g. aggregating a sorted timeseries from a database query by hour.
I tried quite a few different things, but there doesn't seem to be an optimal solution to this.

An almost working approach is to use window/buffer with a boundaryIndicator.

source = source.publish.autoConnect(2)
source.window(source.filter(row -> predicate(row))

None of the window/buffer overloads that take a publisher to determine window boundaries supports backpressure though, as the boundaries are meant to act as temporal operators, they are consumed unbounded.
#1828 (comment)

groupBy isn't a good alternative as there is no overlap between groups and the use of a hashtable is unnecessary. Also requires to manually close the previous GroupedFlowable when the next one is opened, and using a stateful keySelector that keeps returning the same key until the next separator.

window(long count) isn't applicable as the number of elements per group isn't fixed.

Writing a custom operator and using lift to perform the aggregation is possible, but fairly complex.

Adding a new window(boundaryPredicate) overload and possibly a window(openPredicate, closePredicate) seems to be an optimal solution to this use-case. The existing count and count, skip overloads could be refactored to use the predicate overloads.

@akarnokd akarnokd added the 2.x label Aug 24, 2017
@akarnokd
Copy link
Member

The extensions project contains 3 operators: bufferWhile, bufferUntil and bufferSplit that use predicates. There are no window variants with the same properties that I know of.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants