Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x : window(timespan, unit, count) always emit empty observable if not reach max count #5104

Closed
xiemeilong opened this issue Feb 17, 2017 · 11 comments

Comments

@xiemeilong
Copy link

xiemeilong commented Feb 17, 2017

This code will print nothing all the time.

PublishSubject<String> vehicleToFetch = PublishSubject.create();
        vehicleToFetch
                .delay(2,TimeUnit.SECONDS)
                .window(10, TimeUnit.SECONDS, 5)
                .observeOn(Schedulers.io())
                .subscribe(w-> w.toList().subscribe(ws-> {
                    ws.forEach(v -> {
                        System.out.println(String.format("%s %d", v, Thread.currentThread().getId()));
                        vehicleToFetch.onNext(v);
                    });
                }));


        Observable.just("v1","v2","v3","v4")
                .subscribe(v->{
                    vehicleToFetch.onNext(v);
                });

I am using rxjava:2.0.6.

@akarnokd
Copy link
Member

There are two problems with your code:

  • You are using toList which requires a finite stream. Since you don't call onComplete on vehicleToFetch the sequence above won't print anything.
  • You are probably not waiting long enough to see the results. The default RxJava schedulers are daemon threads and when the "main" thread ends, the whole application stops.
PublishSubject<String> vehicleToFetch = PublishSubject.create();
vehicleToFetch
        .delay(2,TimeUnit.SECONDS)
        .window(10, TimeUnit.SECONDS, 5)
        .observeOn(Schedulers.io())
        .subscribe(w-> w.toList().subscribe(ws-> {
            ws.forEach(v -> {
                System.out.println(String.format("%s %d", v, Thread.currentThread().getId()));
                vehicleToFetch.onNext(v);
            });
        }));


Observable.just("v1","v2","v3","v4").subscribe(vehicleToFetch);

Thread.sleep(3000);

@xiemeilong
Copy link
Author

xiemeilong commented Feb 17, 2017

@akarnokd Thanks. I am calling toList() on a window observable ,not on vehicleToFetch, There is print if I change .window(10, TimeUnit.SECONDS, 5) to .window(10, TimeUnit.SECONDS, 4).

I am using a CountDownLatch(not show in code) to wait a long time.

@akarnokd
Copy link
Member

Yes, because the window with limit 4 will call onComplete for you on the window that toList() consumes. With 5 there is noone to call onComplete. If you had a "v5" in that case, you'd see the printout again.

@xiemeilong
Copy link
Author

Why did it not call onComplete when the timeout is reached? Does not this method mean either timeout or max count is reach will emit a window?

@akarnokd
Copy link
Member

Oh, I see it now. There is a bug in the operator that doesn't complete the old window if a new one is due to the timeout. I'll post a fix for it.

@akarnokd
Copy link
Member

Closing via #5106.

@xiemeilong
Copy link
Author

It worked after upgrade to 2.0.7, but after three times timeout, there is no timeout anymore.

@akarnokd
Copy link
Member

Do you have a new unit test for it?

@xiemeilong
Copy link
Author

You can use the same code , it will block after three new window emitted.

@akarnokd
Copy link
Member

Odd. I'll look into it again.

@akarnokd
Copy link
Member

Found a couple of remaining issues. See PR #5213; the example works with it properly for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants