Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the delay operator #384

Closed
wants to merge 7 commits into from
Closed

Conversation

jmhofer
Copy link
Contributor

@jmhofer jmhofer commented Sep 14, 2013

This implements the operator from #36 in all four variants.

The tests also found a bug in interval that I fixed.

Maybe this is a bug of map though - map throws exceptions in onNext, not sure if it should be allowed to do that - the SafeObserver that it relies on comes too late for scheduled actions. However, this can be discussed and fixed independently.

@cloudbees-pull-request-builder

RxJava-pull-requests #281 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Maybe this is a bug of map though - map throws exceptions in onNext, not sure if it should be allowed to do that - the SafeObserver that it relies on comes too late for scheduled actions. However, this can be discussed and fixed independently.

The idea is that Func implementations from internal code functions correctly and does not break the Rx contract (that onNext should not throw). Thus, we shouldn't need the SafeObserver around anything internal that complies with the Rx contract.

The SafeObserver only wraps functions and observers that are passed in from outside the RxJava codebase as those are untrusted. This was discussed further at #216 and then more recently changes to map were made for error handling: #314

* Delays the observable sequence by the given time interval.
*/
public static <T> OnSubscribeFunc<T> delay(final Observable<? extends T> source, long delay, TimeUnit unit) {
return delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use Schedulers.threadPoolForComputation() so we reuse the existing ScheduledExecutorService rather than creating a new one for every single call to delay.

@cloudbees-pull-request-builder

RxJava-pull-requests #282 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #283 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Only errors emitted by the source Observable are not delayed.

Why was this decision made? Is that how Rx.Net works (the MSDN docs don't say).

This means we'd ignore onNext events scheduled to be delivered since we'd push onError before them. That breaks the Rx contract so don't know why we'd want to do this.

It seems that onError and onCompleted should be delayed just like onNext - all notifications scheduled for future delivery.

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delay, TimeUnit unit) {
long newDelay = unit.toNanos(delay) + this.unit.toNanos(this.delay);
return underlying.schedule(state, action, newDelay, TimeUnit.NANOSECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will inject non-determinism ... notifications will be capable of interleaving and being out of order.

I think we need to combine this with ScheduledObserver which maintains a queue and event loop for handling each notification sequentially on the given scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, can anyone verify what Rx.Net does here related to order?

Can events become out of order when using delay or does it retain order guarantees (as I expect it would)?

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 22, 2013

Concerning the question whether Rx.NET emits errors without delay: Here it says that they get emitted without delay. MSDN doesn't say anything about it.

So I tried it out in F#, and it actually doesn't get delayed:

#light
open Core.Printf
open System
open System.Reactive.Linq

module Program =
  exception ObsvError

  [<EntryPoint>]
  let Main(args) =
    let mapToError = fun (x: int64) -> if x > 4L then raise ObsvError else x
    let obsv = Observable.Interval(TimeSpan.FromSeconds 1.0).Select(mapToError).Delay(TimeSpan.FromSeconds 4.5)
    let sub = obsv.Subscribe(onNext = (fun x -> printfn "%d" x), 
                             onError = (fun exn -> Console.WriteLine "Error!"))            
    Console.ReadLine() |> ignore
    sub.Dispose()
    0

Will output:

0
1
Error!

If the error weren't delayed at all, however, it should happen before 1. If it were delayed fully, it should happen after 4. So that's a bit strange.

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 22, 2013

I guess all that means that we'll have to introduce a specific timestamped queue here.

@benjchristensen
Copy link
Member

@headinthebox Erik, can you provide guidance on what we should do here? It seems that onError should be delayed if any onNext are still outstanding, but that isn't what happens in .Net and that seems to break the Rx contract.

@benjchristensen
Copy link
Member

Erik has confirmed that 'onError' should emit immediately and that if the 'onNext' events should not be lost then onErrorResumeNext or something similar should be used before delay or materialize can be used before passing into delay to treat onError as an event rather than exception.

@benjchristensen
Copy link
Member

@jmhofer

Based on Erik's confirmation is this good to merge?

@jmhofer
Copy link
Contributor Author

jmhofer commented Oct 22, 2013

Unfortunately, I didn't get around to another close look here yet.

I'm afraid that this might still need a queue and some synchronization in order to be clean concurrency-wise. Imho the current implementation should work for sane use cases, though.

@headinthebox
Copy link
Contributor

I must say that I am surprised that in .NET

xs.Delay(t)

does not behave the same as

xs.Select(x => Observable.Timer(t).Select(_ => x)).Concat()

Erik

On Oct 22, 2013, at 10:05 PM, Joachim Hofer notifications@github.com wrote:

Unfortunately, I didn't get around to another close look here yet.

I'm afraid that this might still need a queue and some synchronization in order to be clean concurrency-wise. Imho the current implementation should work for sane use cases, though.


Reply to this email directly or view it on GitHub.

@benjchristensen benjchristensen mentioned this pull request Oct 22, 2013
@samuelgruetter
Copy link
Contributor

Could we implement delay with something like

xs.map(x => Observable.timer(t).map(_ => x).cache()).concat()

@headinthebox
Copy link
Contributor

That’s what I effectively do below; except I am not sure why you add the extra call to cache.

I am digging into the .NET implementation as we speak
since I am not sure that implementation is correct.

On Oct 23, 2013, at 11:44 AM, samuelgruetter notifications@github.com wrote:

Could we implement delay with something like

xs.map(x => Observable.timer(t).map(_ => x).cache()).concat()

Reply to this email directly or view it on GitHub.

@samuelgruetter
Copy link
Contributor

concat(Observable<Observable<T>>) has never more than two subscriptions at the same time: one to the outer Observable<Observable<T>> and one to the current inner Observable<T>. For delay however, if the time distance between two elements is smaller than the delay, both TimerObservables created for delaying these two elements must have a subscriber as soon as they're created to make sure that they start counting time. Comcat doesn't do these necessary subscriptions, and that's why I added cache().

(Note that I'm making these claims without having access to a computer where I can do tests, so this risks being complete nonsense ;-) )

@headinthebox
Copy link
Contributor

The .NET implementation has a bug. The version using the Delay operator
delivers one value, after the input has experienced an OnError and waits
for 10 seconds to deliver the answer and fail. The values in X~~>Y don't
matter.

[0] 31~~>44
[Exception of type 'System.Exception' was thrown.]

If you draw the marble diagram, the correct answer is to call onError after
4 seconds.

{Exception of type 'System.Exception' was thrown.}

Erik

var xs = Observable.Interval(TimeSpan.FromSeconds(1))

   .Select(x => 

     { 

      if(x == 4)  throw new Exception();

        return "["+x+"] "+System.DateTime.Now.Millisecond; 

     })

     .Delay(TimeSpan.FromSeconds(10))

     .Select(x => x+"~~>"+System.DateTime.Now.Millisecond);

xs.Subscribe

( x => Console.WriteLine(x)

, e => Console.WriteLine("["+e.Message+"]")

, () => Console.WriteLine("!")

);

var zs = Observable.Interval(TimeSpan.FromSeconds(1))

   .Select(x => 

     { 

      if(x == 4)  throw new Exception();

      return "{"+x+"} "+System.DateTime.Now.Millisecond;

      })

     .Select(x => Observable.Timer(TimeSpan.FromSeconds(10)).Select(_ =>

x)).Concat()

     .Select(x => x+"~~>"+System.DateTime.Now.Millisecond);

zs.Subscribe

( x => Console.WriteLine(x)

, e => Console.WriteLine("{"+e.Message+"}")

, () => Console.WriteLine("!")

);

From: erik meijer [mailto:erik.meijer@meijcrosoft.com]
Sent: Wednesday, October 23, 2013 2:52 AM
To: Netflix/RxJava
Cc: Netflix/RxJava; Erik Meijer
Subject: Re: [RxJava] Implemented the delay operator (#384)

That's what I effectively do below; except I am not sure why you add the
extra call to cache.

I am digging into the .NET implementation as we speak

since I am not sure that implementation is correct.

On Oct 23, 2013, at 11:44 AM, samuelgruetter notifications@github.com
wrote:

Could we implement delay with something like

xs.map(x => Observable.timer(t).map(_ => x).cache()).concat()

Reply to this email directly or view it on GitHub
#384 (comment) .
<https://github.com/notifications/beacon/f5Np9-JVjG56Yyi3j3ToPDeurdEmqyCr9bz
IdcgJ4T5obnlPCkNxbsmS18XcQofG.gif>

@benjchristensen
Copy link
Member

@jmhofer Can you rebase this with master so it can be merged and ensure it matches the final answers we got from @headinthebox ?

@jonnolen
Copy link

this is a +1 on this PR.

@jonnolen
Copy link

I have rebased this onto netflix/rxjava/master and issued a PR to @jmhofer... here is my fork: https://github.com/jonnolen/RxJava/tree/delay

@benjchristensen
Copy link
Member

Completed in #576

jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants