From 457533ced4eca729bda542ec97b64af1d6c04af6 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 24 Feb 2016 09:25:03 +1100 Subject: [PATCH] scan should pass upstream a request of Long.MAX_VALUE (it should not decrement it) --- .../rx/internal/operators/OperatorScan.java | 6 +++++- .../internal/operators/OperatorScanTest.java | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorScan.java b/src/main/java/rx/internal/operators/OperatorScan.java index ccf7a74c07..547edf5c1b 100644 --- a/src/main/java/rx/internal/operators/OperatorScan.java +++ b/src/main/java/rx/internal/operators/OperatorScan.java @@ -268,8 +268,12 @@ public void setProducer(Producer p) { if (producer != null) { throw new IllegalStateException("Can't set more than one Producer!"); } + mr = missedRequested; // request one less because of the initial value, this happens once - mr = missedRequested - 1; + // and is performed only if the request is not at MAX_VALUE already + if (mr != Long.MAX_VALUE) { + mr -= 1; + } missedRequested = 0L; producer = p; } diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index 20e53668a6..e45f32f92c 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -451,4 +451,22 @@ public void onNext(Integer t) { } }); } + + @Test + public void scanShouldPassUpstreamARequestForMaxValue() { + final List requests = new ArrayList(); + Observable.just(1,2,3).doOnRequest(new Action1() { + @Override + public void call(Long n) { + requests.add(n); + } + }) + .scan(new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return 0; + }}).count().subscribe(); + + assertEquals(Arrays.asList(Long.MAX_VALUE), requests); + } }