Skip to content

Commit

Permalink
2.x: fix window() with time+size emission problems
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 22, 2017
1 parent cd91a9f commit d3936da
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ static final class WindowExactBoundedSubscriber<T>
final int bufferSize;
final boolean restartTimerOnMaxSize;
final long maxSize;
final Scheduler.Worker worker;

long count;

Expand All @@ -290,8 +291,6 @@ static final class WindowExactBoundedSubscriber<T>

UnicastProcessor<T> window;

Scheduler.Worker worker;

volatile boolean terminated;

final SequentialDisposable timer = new SequentialDisposable();
Expand All @@ -307,6 +306,11 @@ static final class WindowExactBoundedSubscriber<T>
this.bufferSize = bufferSize;
this.maxSize = maxSize;
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
if (restartTimerOnMaxSize) {
worker = scheduler.createWorker();
} else {
worker = null;
}
}

@Override
Expand Down Expand Up @@ -342,10 +346,7 @@ public void onSubscribe(Subscription s) {
Disposable d;
ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
if (restartTimerOnMaxSize) {
Scheduler.Worker sw = scheduler.createWorker();
worker = sw;
sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
d = sw;
d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
} else {
d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
}
Expand Down Expand Up @@ -451,6 +452,10 @@ public void cancel() {

public void dispose() {
DisposableHelper.dispose(timer);
Worker w = worker;
if (w != null) {
w.dispose();
}
}

void drainLoop() {
Expand Down Expand Up @@ -495,9 +500,9 @@ void drainLoop() {

if (isHolder) {
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
if (producerIndex == consumerIndexHolder.index) {
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
w.onComplete();

count = 0;
w = UnicastProcessor.<T>create(bufferSize);
window = w;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ static final class WindowExactBoundedObserver<T>
final boolean restartTimerOnMaxSize;
final long maxSize;

final Scheduler.Worker worker;

long count;

long producerIndex;
Expand All @@ -262,7 +264,6 @@ static final class WindowExactBoundedObserver<T>

UnicastSubject<T> window;

Scheduler.Worker worker;

volatile boolean terminated;

Expand All @@ -279,6 +280,11 @@ static final class WindowExactBoundedObserver<T>
this.bufferSize = bufferSize;
this.maxSize = maxSize;
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
if (restartTimerOnMaxSize) {
worker = scheduler.createWorker();
} else {
worker = null;
}
}

@Override
Expand All @@ -302,10 +308,7 @@ public void onSubscribe(Disposable s) {
Disposable d;
ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
if (restartTimerOnMaxSize) {
Scheduler.Worker sw = scheduler.createWorker();
worker = sw;
sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
d = sw;
d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
} else {
d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
}
Expand Down Expand Up @@ -394,6 +397,10 @@ public boolean isDisposed() {

void disposeTimer() {
DisposableHelper.dispose(timer);
Worker w = worker;
if (w != null) {
w.dispose();
}
}

void drainLoop() {
Expand Down Expand Up @@ -438,9 +445,9 @@ void drainLoop() {

if (isHolder) {
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
if (producerIndex == consumerIndexHolder.index) {
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
w.onComplete();

count = 0;
w = UnicastSubject.create(bufferSize);
window = w;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,4 +704,107 @@ public void sizeTimeTimeout() {

ts.values().get(0).test().assertResult();
}

@Test
public void periodicWindowCompletion() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimer() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionBounded() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimerBounded() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimerBoundedSomeData() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true)
.test();

ps.onNext(1);
ps.onNext(2);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(22)
.assertNoErrors()
.assertNotComplete();
}
@Test
public void countRestartsOnTimeTick() {
TestScheduler scheduler = new TestScheduler();
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();

TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
.test();

// window #1
ps.onNext(1);
ps.onNext(2);

scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);

// window #2
ps.onNext(3);
ps.onNext(4);
ps.onNext(5);
ps.onNext(6);

ts.assertValueCount(2)
.assertNoErrors()
.assertNotComplete();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -603,4 +603,137 @@ public void sizeTimeTimeout() {

ts.values().get(0).test().assertResult();
}

@Test
public void periodicWindowCompletion() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimer() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionBounded() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimerBounded() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
.test();

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(21)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void periodicWindowCompletionRestartTimerBoundedSomeData() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true)
.test();

ps.onNext(1);
ps.onNext(2);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

ts.assertValueCount(22)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void countRestartsOnTimeTick() {
TestScheduler scheduler = new TestScheduler();
Subject<Integer> ps = PublishSubject.<Integer>create();

TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
.test();

// window #1
ps.onNext(1);
ps.onNext(2);

scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);

// window #2
ps.onNext(3);
ps.onNext(4);
ps.onNext(5);
ps.onNext(6);

ts.assertValueCount(2)
.assertNoErrors()
.assertNotComplete();
}

@Test
public void test() throws Exception {
final Subject<String> vehicleToFetch = PublishSubject.<String>create().toSerialized();
vehicleToFetch
.delay(2,TimeUnit.SECONDS)
.window(10, TimeUnit.SECONDS, 5, true)
.observeOn(Schedulers.io())
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> w) throws Exception {
w.toList().subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> ws) throws Exception {
for (String v : ws) {
System.out.println(String.format("%s %d", v, Thread.currentThread().getId()));
vehicleToFetch.onNext(v);
};
}
});
}
});


Observable.just("v1","v2","v3","v4")
.concatWith(Observable.<String>never())
.subscribe(vehicleToFetch);

Thread.sleep(600000);
}
}

0 comments on commit d3936da

Please sign in to comment.