Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix refCount() connect/subscribe/cancel deadlock #5975

Merged
merged 5 commits into from
Apr 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.*;

import io.reactivex.FlowableSubscriber;
import io.reactivex.disposables.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.schedulers.Schedulers;

/**
* Returns an observable sequence that stays connected to the source as long as
Expand All @@ -31,199 +33,199 @@
* @param <T>
* the value type
*/
public final class FlowableRefCount<T> extends AbstractFlowableWithUpstream<T, T> {
public final class FlowableRefCount<T> extends Flowable<T> {

final ConnectableFlowable<T> source;
volatile CompositeDisposable baseDisposable = new CompositeDisposable();
final AtomicInteger subscriptionCount = new AtomicInteger();

/**
* Use this lock for every subscription and disconnect action.
*/
final ReentrantLock lock = new ReentrantLock();

final class ConnectionSubscriber
extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = 152064694420235350L;
final Subscriber<? super T> subscriber;
final CompositeDisposable currentBase;
final Disposable resource;

final AtomicLong requested;

ConnectionSubscriber(Subscriber<? super T> subscriber,
CompositeDisposable currentBase, Disposable resource) {
this.subscriber = subscriber;
this.currentBase = currentBase;
this.resource = resource;
this.requested = new AtomicLong();
}

@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(this, requested, s);
}
final int n;

@Override
public void onError(Throwable e) {
cleanup();
subscriber.onError(e);
}
final long timeout;

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
final TimeUnit unit;

@Override
public void onComplete() {
cleanup();
subscriber.onComplete();
}
final Scheduler scheduler;

@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(this, requested, n);
RefConnection connection;

public FlowableRefCount(ConnectableFlowable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

public FlowableRefCount(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
Scheduler scheduler) {
this.source = source;
this.n = n;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {

RefConnection conn;

boolean connect = false;
synchronized (this) {
conn = connection;
if (conn == null || conn.terminated) {
conn = new RefConnection(this);
connection = conn;
}

long c = conn.subscriberCount;
if (c == 0L && conn.timer != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this condition is only actual for resubscribeBeforeTimeout test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to see logic and tests for something that user cannot use in RxJava

As you mentioned in PR description, let's maybe expose additional time related refCount api or try to shrink unused code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #5986.

conn.timer.dispose();
}
conn.subscriberCount = c + 1;
if (!conn.connected && c + 1 == n) {
connect = true;
conn.connected = true;
}
}

@Override
public void cancel() {
SubscriptionHelper.cancel(this);
resource.dispose();
source.subscribe(new RefCountSubscriber<T>(s, this, conn));

if (connect) {
source.connect(conn);
}
}

void cleanup() {
// on error or completion we need to dispose the base CompositeDisposable
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseDisposable == currentBase) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
baseDisposable.dispose();
baseDisposable = new CompositeDisposable();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
if (rc.terminated) {
return;
}
long c = rc.subscriberCount - 1;
rc.subscriberCount = c;
if (c != 0L || !rc.connected) {
return;
}
if (timeout == 0L) {
timeout(rc);
return;
}
sd = new SequentialDisposable();
rc.timer = sd;
}
}

/**
* Constructor.
*
* @param source
* observable to apply ref count to
*/
public FlowableRefCount(ConnectableFlowable<T> source) {
super(source);
this.source = source;
sd.replace(scheduler.scheduleDirect(rc, timeout, unit));
}

@Override
public void subscribeActual(final Subscriber<? super T> subscriber) {

lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {

final AtomicBoolean writeLocked = new AtomicBoolean(true);

try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Consumer passed
// to source.connect above being called
if (writeLocked.get()) {
// Consumer passed to source.connect was not called
lock.unlock();
void terminated(RefConnection rc) {
synchronized (this) {
if (!rc.terminated) {
rc.terminated = true;
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseDisposable);
} finally {
// release the read lock
lock.unlock();
connection = null;
}
}

}

private Consumer<Disposable> onSubscribe(final Subscriber<? super T> subscriber,
final AtomicBoolean writeLocked) {
return new DisposeConsumer(subscriber, writeLocked);
void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
DisposableHelper.dispose(rc);
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
connection = null;
}
}
}

void doSubscribe(final Subscriber<? super T> subscriber, final CompositeDisposable currentBase) {
// handle disposing from the base subscription
Disposable d = disconnect(currentBase);
static final class RefConnection extends AtomicReference<Disposable>
implements Runnable, Consumer<Disposable> {

ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d);
subscriber.onSubscribe(connection);
private static final long serialVersionUID = -4552101107598366241L;

source.subscribe(connection);
}
final FlowableRefCount<?> parent;

Disposable timer;

long subscriberCount;

private Disposable disconnect(final CompositeDisposable current) {
return Disposables.fromRunnable(new DisposeTask(current));
boolean connected;

boolean terminated;

RefConnection(FlowableRefCount<?> parent) {
this.parent = parent;
}

@Override
public void run() {
parent.timeout(this);
}

@Override
public void accept(Disposable t) throws Exception {
DisposableHelper.replace(this, t);
}
}

final class DisposeConsumer implements Consumer<Disposable> {
private final Subscriber<? super T> subscriber;
private final AtomicBoolean writeLocked;
static final class RefCountSubscriber<T>
extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = -7419642935409022375L;

final Subscriber<? super T> actual;

final FlowableRefCount<T> parent;

final RefConnection connection;

Subscription upstream;

DisposeConsumer(Subscriber<? super T> subscriber, AtomicBoolean writeLocked) {
this.subscriber = subscriber;
this.writeLocked = writeLocked;
RefCountSubscriber(Subscriber<? super T> actual, FlowableRefCount<T> parent, RefConnection connection) {
this.actual = actual;
this.parent = parent;
this.connection = connection;
}

@Override
public void accept(Disposable subscription) {
try {
baseDisposable.add(subscription);
// ready to subscribe to source so do it
doSubscribe(subscriber, baseDisposable);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
if (compareAndSet(false, true)) {
parent.terminated(connection);
}
actual.onError(t);
}

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
parent.terminated(connection);
}
actual.onComplete();
}
}

final class DisposeTask implements Runnable {
private final CompositeDisposable current;
@Override
public void request(long n) {
upstream.request(n);
}

DisposeTask(CompositeDisposable current) {
this.current = current;
@Override
public void cancel() {
upstream.cancel();
if (compareAndSet(false, true)) {
parent.cancel(connection);
}
}

@Override
public void run() {
lock.lock();
try {
if (baseDisposable == current) {
if (subscriptionCount.decrementAndGet() == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}

baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
baseDisposable = new CompositeDisposable();
}
}
} finally {
lock.unlock();
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(upstream, s)) {
this.upstream = s;

actual.onSubscribe(this);
}
}
}
Expand Down
Loading