Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interval #228

Merged
merged 7 commits into from
Apr 18, 2013
Merged

Interval #228

merged 7 commits into from
Apr 18, 2013

Conversation

jmhofer
Copy link
Contributor

@jmhofer jmhofer commented Apr 8, 2013

I have attempted to implement the "Interval" operator (#55) here. As far as I can see, it seems to work. This is based on the quite fresh work on schedulers.

It doesn't work with the currentThread or newThread schedulers, I guess due to the simple SleepingAction. It does work with a ScheduledExecutorService, though.

For testing this conveniently, I also wrote a test scheduler with adjustable time.

Looking forward to any review comments.

@cloudbees-pull-request-builder

RxJava-pull-requests #83 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #84 SUCCESS
This pull request looks good

@mairbek
Copy link
Contributor

mairbek commented Apr 11, 2013

Let's consider adding new method to the Scheduler

schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) 

It would allow to leverage ScheduledExecutorService#scheduleAtFixedRate for the ExecutorScheduler.
For other schedulers we could use recursive Action scheduling from pull request #229.

@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 12, 2013

I agree, schedulePeriodically would be helpful here.

@mairbek
Copy link
Contributor

mairbek commented Apr 12, 2013

@jmhofer do you plan to work on this feature? I'm thinking about implementing Buffer operator #16 and this one will be very helpful for me.

@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 12, 2013

I'll have a look at it.

@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 12, 2013

Maybe we should also combine all the longs and units into a Timespan class? - It doesn't feel right to always have them as separate parameters.

jmhofer pushed a commit to jmhofer/RxJava that referenced this pull request Apr 12, 2013
@benjchristensen
Copy link
Member

Anyone know why .Net wouldn't have a schedulePeriodically concept?

If we were to add that it would only really work with ScheduledExecutorService and thus most Scheduler implementations would have to rely upon the generic ScheduledExecutorService that I've added in #235 called GenericScheduledExecutorService.

This suggests to me that it isn't the right solution to add another interface method as it seems very tied to a particular implementation. Can't this be achieved with recursive schedulers which now work after the changes you did in #229?

@benjchristensen
Copy link
Member

I'm curious about this:

It doesn't work with the currentThread or newThread schedulers, I guess due to the simple SleepingAction. It does work with a ScheduledExecutorService, though.

It seems that this operator needs to be async and must default to using something like Schedulers.threadPoolForComputation().

Documentation at MSDN (http://msdn.microsoft.com/en-us/library/hh228911(v=vs.103).aspx) suggests that this defaults to using a thread-pool scheduler:

The following example code uses the Interval operator to generate a sequence of long integers staring at zero. Each integer in the sequence is generated after the two second period has expired. Each integer is written to the console window along with the current time to show the period parameter in effect. The generation of the integer sequence is scheduled to run on a thread from the .Net thread pool so the main thread is not blocked and can process the enter key being pressed to stop the interval.

Perhaps NewThreadScheduler will work as of #235 because it now uses GenericScheduledExecutorService when a delay is passed in instead of SleepingAction.

What happens in .Net if someone tries to use ImmediateScheduler or CurrentThreadScheduler? Anyone have access to .Net to try that?

@benjchristensen
Copy link
Member

Maybe we should also combine all the longs and units into a Timespan class? - It doesn't feel right to always have them as separate parameters.

If we're going to consider doing this then we should do it as part of #235 before I release another version since this would change the Schedulers interface.

Java itself always keeps the 2 separate though so perhaps the idiomatic thing to do in Java is to keep them separate? I don't have a strong opinion on this one but the decision made now will last a very long time.

@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 17, 2013

Actually, .Net does have SchedulePeriodic. See here: https://rx.codeplex.com/SourceControl/changeset/view/e24677887e1727fb3b5dd614d996aa6d113b3834#Rx/NET/Source/System.Reactive.Interfaces/Reactive/Concurrency/ISchedulerPeriodic.cs

Imho it's useful to avoid recursive scheduling where the schedulers support this directly, as well as to do the slightly messy recursive scheduling ourselves for other types of schedulers, so that the user doesn't have to.

@benjchristensen
Copy link
Member

Ah interesting, thanks for educating me on that.

@benjchristensen
Copy link
Member

This doesn't look like it's been integrated into Observable, but I'm going to merge as it looks useful to get this in ... the TestScheduler especially while Interval continues getting work done.

benjchristensen added a commit that referenced this pull request Apr 18, 2013
@benjchristensen benjchristensen merged commit 6507abc into ReactiveX:master Apr 18, 2013
@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 19, 2013

Great, thanks (this still needs a bit of work if/when we do periodic scheduling).

@benjchristensen
Copy link
Member

Are you okay with me releasing current code on the master branch and then the rest of interval coming later?

Is there anything else about master branch as it stands that should be changed before I release? I'd like to do so today.

@jmhofer
Copy link
Contributor Author

jmhofer commented Apr 19, 2013

That's alright with me.

@zsxwing
Copy link
Member

zsxwing commented Oct 11, 2013

@benjchristensen, now the interval operators in RxJava and Rx.Net have different behaviors when the scheduler is CurrentThread.

In Rx.Net, the following codes

            var o = Observable.Interval(TimeSpan.FromMilliseconds(100), Scheduler.CurrentThread);
            o.Take(5).Subscribe(
                x => Console.WriteLine(x)
            );
            o.Take(4).Subscribe(
                x => Console.WriteLine(x)
            );
            Console.ReadLine();

output

0
1
2
3
4
0
1
2
3


In RxJava, the following codes

            Observable<Long> o =  Observable.interval(100, TimeUnit.MILLISECONDS,
                    Schedulers.currentThread());
            o.take(5).subscribe(new Action1<Long>() {
                @Override
                public void call(Long t1) {
                    System.out.println(t1);
                }
            });
            o.take(4).subscribe(new Action1<Long>() {
                @Override
                public void call(Long t1) {
                    System.out.println(t1);
                }
            });

output

0
1
2
3
4

and the current thread is blocked at the first subscribe.

Is it OK, or the interval operator should not be used with CurrentThread?

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants