Skip to content

Commit

Permalink
2.x: Coverage improvements, logical fixes and cleanups 03/08 (#5905)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 9, 2018
1 parent 4e50ea4 commit 7646371
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,24 @@ public FlowableCache(Flowable<T> source, int capacityHint) {
protected void subscribeActual(Subscriber<? super T> t) {
// we can connect first because we replay everything anyway
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
state.addChild(rp);

t.onSubscribe(rp);

boolean doReplay = true;
if (state.addChild(rp)) {
if (rp.requested.get() == ReplaySubscription.CANCELLED) {
state.removeChild(rp);
doReplay = false;
}
}

// we ensure a single connection here to save an instance field of AtomicBoolean in state.
if (!once.get() && once.compareAndSet(false, true)) {
state.connect();
}

// no need to call rp.replay() here because the very first request will trigger it anyway
if (doReplay) {
rp.replay();
}
}

/**
Expand Down Expand Up @@ -122,22 +130,23 @@ static final class CacheState<T> extends LinkedArrayList implements FlowableSubs
/**
* Adds a ReplaySubscription to the subscribers array atomically.
* @param p the target ReplaySubscription wrapping a downstream Subscriber with state
* @return true if the ReplaySubscription was added or false if the cache is already terminated
*/
public void addChild(ReplaySubscription<T> p) {
public boolean addChild(ReplaySubscription<T> p) {
// guarding by connection to save on allocating another object
// thus there are two distinct locks guarding the value-addition and child come-and-go
for (;;) {
ReplaySubscription<T>[] a = subscribers.get();
if (a == TERMINATED) {
return;
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
ReplaySubscription<T>[] b = new ReplaySubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = p;
if (subscribers.compareAndSet(a, b)) {
return;
return true;
}
}
}
Expand Down Expand Up @@ -240,12 +249,16 @@ static final class ReplaySubscription<T>
extends AtomicInteger implements Subscription {

private static final long serialVersionUID = -2557562030197141021L;
private static final long CANCELLED = -1;
private static final long CANCELLED = Long.MIN_VALUE;
/** The actual child subscriber. */
final Subscriber<? super T> child;
/** The cache state object. */
final CacheState<T> state;

/**
* Number of items requested and also the cancelled indicator if
* it contains {@link #CANCELLED}.
*/
final AtomicLong requested;

/**
Expand All @@ -263,6 +276,9 @@ static final class ReplaySubscription<T>
*/
int index;

/** Number of items emitted so far. */
long emitted;

ReplaySubscription(Subscriber<? super T> child, CacheState<T> state) {
this.child = child;
this.state = state;
Expand All @@ -271,17 +287,8 @@ static final class ReplaySubscription<T>
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
long r = requested.get();
if (r == CANCELLED) {
return;
}
long u = BackpressureHelper.addCap(r, n);
if (requested.compareAndSet(r, u)) {
replay();
return;
}
}
BackpressureHelper.addCancel(requested, n);
replay();
}
}

Expand All @@ -303,12 +310,13 @@ public void replay() {
int missed = 1;
final Subscriber<? super T> child = this.child;
AtomicLong rq = requested;
long e = emitted;

for (;;) {

long r = rq.get();

if (r < 0L) {
if (r == CANCELLED) {
return;
}

Expand All @@ -326,9 +334,8 @@ public void replay() {
final int n = b.length - 1;
int j = index;
int k = currentIndexInBuffer;
int valuesProduced = 0;

while (j < s && r > 0) {
while (j < s && e != r) {
if (rq.get() == CANCELLED) {
return;
}
Expand All @@ -344,15 +351,14 @@ public void replay() {

k++;
j++;
r--;
valuesProduced++;
e++;
}

if (rq.get() == CANCELLED) {
return;
}

if (r == 0) {
if (r == e) {
Object o = b[k];
if (NotificationLite.isComplete(o)) {
child.onComplete();
Expand All @@ -364,15 +370,12 @@ public void replay() {
}
}

if (valuesProduced != 0) {
BackpressureHelper.producedCancel(rq, valuesProduced);
}

index = j;
currentIndexInBuffer = k;
currentBuffer = b;
}

emitted = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,7 @@ public void clear() {

@Override
public boolean isEmpty() {
Iterator<? extends R> it = current;
if (it == null) {
return queue.isEmpty();
}
return !it.hasNext();
return current == null && queue.isEmpty();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,8 @@ public void subscribe(Subscriber<? super T> child) {
buf = bufferFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
EmptySubscription.error(ex, child);
return;
}
// create a new subscriber to source
ReplaySubscriber<T> u = new ReplaySubscriber<T>(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,11 @@ static final class PublishObserver<T>
@SuppressWarnings("unchecked")
@Override
public void dispose() {
if (observers.get() != TERMINATED) {
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishObserver.this, null);
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishObserver.this, null);

DisposableHelper.dispose(s);
}
DisposableHelper.dispose(s);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,4 +419,124 @@ public void error() {
.test(0L)
.assertFailure(TestException.class);
}

@Test
public void cancelledUpFrontConnectAnyway() {
final AtomicInteger call = new AtomicInteger();
Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return call.incrementAndGet();
}
})
.cache()
.test(1L, true)
.assertNoValues();

assertEquals(1, call.get());
}

@Test
public void cancelledUpFront() {
final AtomicInteger call = new AtomicInteger();
Flowable<Object> f = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return call.incrementAndGet();
}
}).concatWith(Flowable.never())
.cache();

f.test().assertValuesOnly(1);

f.test(1L, true)
.assertEmpty();

assertEquals(1, call.get());
}

@Test
public void subscribeSubscribeRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final Flowable<Integer> cache = Flowable.range(1, 500).cache();

final TestSubscriber<Integer> to1 = new TestSubscriber<Integer>();
final TestSubscriber<Integer> to2 = new TestSubscriber<Integer>();

Runnable r1 = new Runnable() {
@Override
public void run() {
cache.subscribe(to1);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
cache.subscribe(to2);
}
};

TestHelper.race(r1, r2);

to1
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500)
.assertComplete()
.assertNoErrors();

to2
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500)
.assertComplete()
.assertNoErrors();
}
}

@Test
public void subscribeCompleteRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final PublishProcessor<Integer> ps = PublishProcessor.<Integer>create();

final Flowable<Integer> cache = ps.cache();

cache.test();

final TestSubscriber<Integer> to = new TestSubscriber<Integer>();

Runnable r1 = new Runnable() {
@Override
public void run() {
cache.subscribe(to);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
ps.onComplete();
}
};

TestHelper.race(r1, r2);

to
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
}
}

@Test
public void backpressure() {
Flowable.range(1, 5)
.cache()
.test(0)
.assertEmpty()
.requestMore(2)
.assertValuesOnly(1, 2)
.requestMore(3)
.assertResult(1, 2, 3, 4, 5);
}
}
Loading

0 comments on commit 7646371

Please sign in to comment.