-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operation: throttle #258
Operation: throttle #258
Conversation
Awesome. (Btw I'm not entirely convinced about implementation and usage of |
Yes I had some doubts about that too. I think the easiest way to fix this is to make three distinct |
RxJava-pull-requests #123 SUCCESS |
…ationThrottle class.
RxJava-pull-requests #124 FAILURE |
Hmm that's strange. Locally I don't have any build failures, but when looking at the console output of the failed CloudBees build, I can see that the |
RxJava-pull-requests #125 FAILURE |
Afaik, a few of the tests fail sporadically (quite often, though). This is one of them. |
Yes CloudBees (the CI server) has odd thread scheduling it seems (very slow machines?). We have unit tests that rely on Thread.sleep timing instead of using latches and barriers to deterministically control when threads are released, waited upon, etc. |
if (!timerHasExpired()) { | ||
subscription.get().unsubscribe(); | ||
} | ||
subscription.set(scheduler.schedule(action, timeout, unit)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This always scheduled in the future with timeout
. Shouldn't it be the time until next timeout?
Let's say timeout is 1000ms and I get an onNext call every 50ms. This code seems to schedule each action to execute 1000ms in the future even if it comes in 950ms since the last onNext was permitted through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I hadn't thought of it that way. Excellent catch! I'll try to fix this.
In general I also question whether we even need a Scheduler for this (other than getting Instead we just remember the last value and time since we last sent it on and each time we pass That way we skip all the scheduling overhead. |
Yes but what happens that when events are few and far apart? Without a Scheduler (or some sort of timer) wouldn't it take extremely long for events to propagate? |
I envision a scenario like this: Timeout = 1000ms The only time something is skipped is when it's within 1000ms of the previously sent value. So the question is ... are we trying to delivery the latest value within each time window, or just deliver a single value per each time window? If we choose latest then the above would now look like this: Timeout = 1000ms I don't know that this behavior is necessary for functionality (what we're doing is by definition a race condition anyways) and it means that each value waits for timeout to pass before delivery which I think is worse. Using a timer for every onNext also means we get all the non-determinism of the thread-scheduler (which on a busy system can be all over the map .... 10s or 100s of milliseconds of drift easily ... timers are not very accurate). Even if "latest in window" is the required behavior I'd like to look at making it less costly and just changing the value to be delivered in each scheduled Before those implementation details though, is there strong documentation somewhere dictating whether throttle give the first or last value of a time window? |
The links to MSDN in the issue state the following: "Ignores the values from an observable sequence which are followed by another value before due time with the specified source and dueTime.". What I'm reading here is that any value which is proceeded (so after) with another value within a specified period of time will not be published. If the value is proceeded after this period of time, it is published. Although both throttle events, they each do it in another way. I'm all for better performance, but I think this breaks a bit too far away from the original specification. I believe I'm having a hackathon with Erik Meijer next week, so I can try to pick his brain on this subject too. But I'd like to hear more thoughts on this. |
I'll hold off on making changes to this pull request until we have a better view on what we want here. |
Feedback from Erik would be great, particularly with the use cases in this discussion shared with him and the performance impact of the decision. |
I like your thoughts on this. I need to spend more time on this than I have right now so will delay a little more. We definitely need different implementations, the trick is the naming, as |
I appreciate some other input on the design and naming choices for this before proceeding. Anyone else able to get involved? |
since this thread is a bit long, it would be easier for folks to jump in and help review if the current design and naming choices were summarized in the PR description. |
I've updated the PR description with the current state. We're basically trying to figure out if we need to support additional/different schemes of throttling |
We could use some help nailing down the naming convention for different throttle schemes. @mairbek @jmhofer @mttkay would you mind weighing in please? The naming convention being proposed is:
This addresses the point that 'throttle' is ambiguous and there are different approaches that can be applied with very different behavior. These could be given names instead like I also think an alias for |
I find these operations and names difficult. That 2nd alternative doesn't look useful enough to me to include it at all. But maybe that's just me. Any already established names for these schemes from people already using them would be great to hear. Personally, I'd call the 1st alternative |
First, I'd just like to add that the .NET version as illustrated in the first marble diagram (if I understand the marble diagram correctly, I haven't actually worked with Rx.NET before) is extremely useful when dealing with user input events. It's a pattern I've used plenty of times before whenever an asynchronous operation needs to be triggered based on user input that changes rapidly. A good example is a suggest-as-you-type input field: every key stroke triggers an event scheduled for execution, but may become obsolete with the next keystroke. By cancelling out events that are superseded by subsequent keystrokes, and at the same time rescheduling a new one with the same delay t, as soon as there are no more keystrokes arriving in t time, the next event emitted will reflect the most recent state from the perspective of the user. Which brings me to the naming. In a way, I see variant 0 (the .NET implementation) to carry "most recent" semantics, which sort of conflicts with the naming of alternative 2. If I understand correctly, variant 2 works on an absolute time scale rather than a relative one, so perhaps that should be reflected in the name? I think Replacing the potential source of unexpected delays in TimerTasks with a source Observable that may emit events at random points in time, the problems bear a lot of similarities. I have to say I don't really understand alternative 1 or what it accomplishes. So I guess my vote would be:
Hope I'm not adding further to the confusion since this seems to go in a different direction in terms of naming and might not align well with alternative 1 either. Take it for what it's worth :-) [0] http://docs.oracle.com/javase/6/docs/api/java/util/Timer.html |
I think the use of enums for giving the schema is awkward considering how different that is compared with the patterns everywhere else in the library, though I understand how it could be useful. I prefer the I agree with @jmhofer and think alternative 1 is likely not worth pursuing at this time unless someone can give a real reason for its existence. I suggest the following:
Since this is very subjective and arbitrary, can folks weigh in one last time with voting on the following options?
This allows the last value through for a defined time window, delaying the delivery until the window is completed to ensure it is the last and provide a regular rhythm of delivery. This uses a scheduler. a) throttleLast
This will allow the first value through for the defined time window without any delay. This does not require a scheduler. a) throttleFirst |
I'd vote for 1b and 2c. |
* Add response predicate to retry sync and async for enhancement ReactiveX#259 * ReactiveX#258 add the support to the webflux types in the circuit breaker annotation AOP * ReactiveX#258 review comments * ReactiveX#258 review comments
The throttle operation is a filtering operator which is meant to combat receiving bursts of events in short periods of time. The original Rx .Net implementation of this operator works as followed:
Upon receiving an event A, it waits a certain specified amount of time Z before propagating it to the
Observer
. If another event B is received within this period of time, the propagation of A is cancelled, and B will be propagated in stead as soon as a Z amount of time has passed since B was received.As @benjchristensen already mentioned, the use of threads in this way is quite inefficient. To this end I proposed two additional variations on this scheme of throttling here. The question now is which schemes do we want (to support)?
Any thoughts on this is highly appreciated!