Skip to content

Commit

Permalink
abandon ReadWriteReentrantLock and simplify locking to a single lock …
Browse files Browse the repository at this point in the history
…that is used for subscription and disconnect
  • Loading branch information
davidmoten committed Sep 28, 2014
1 parent 055057d commit eb6ae37
Showing 1 changed file with 44 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;

import rx.Observable.OnSubscribe;
import rx.Subscriber;
Expand All @@ -43,10 +42,9 @@ public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
private final AtomicInteger subscriptionCount = new AtomicInteger(0);

/**
* Ensures that subscribers wait for the first subscription to be assigned
* to baseSubcription before being subscribed themselves.
* Use this lock for every subscription and disconnect action.
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantLock lock = new ReentrantLock();

/**
* Constructor.
Expand All @@ -61,48 +59,26 @@ public OnSubscribeRefCount(ConnectableObservable<? extends T> source) {
@Override
public void call(final Subscriber<? super T> subscriber) {

lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {

// ensure secondary subscriptions wait for baseSubscription to be
// set by first subscription
lock.writeLock().lock();

final AtomicBoolean writeLocked = new AtomicBoolean(true);

try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a synchronous
// Observable
source.connect(new Action1<Subscription>() {
@Override
public void call(Subscription subscription) {

try {
baseSubscription.add(subscription);

// handle unsubscribing from the base subscription
subscriber.add(disconnect());

// ready to subscribe to source so do it
source.unsafeSubscribe(subscriber);
} finally {
// release the write lock
lock.writeLock().unlock();
writeLocked.set(false);
}
}
});
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the above Action1
// outside of this class thus preventing the above Action1
// being called
if (writeLocked.get()) {
// Action1 was not called
lock.writeLock().unlock();
lock.unlock();
}
}
} else {
lock.readLock().lock();
try {
// handle unsubscribing from the base subscription
subscriber.add(disconnect());
Expand All @@ -111,22 +87,49 @@ public void call(Subscription subscription) {
source.unsafeSubscribe(subscriber);
} finally {
// release the read lock
lock.readLock().unlock();
lock.unlock();
}
}

}

private Action1<Subscription> onSubscribe(final Subscriber<? super T> subscriber,
final AtomicBoolean writeLocked) {
return new Action1<Subscription>() {
@Override
public void call(Subscription subscription) {

try {
baseSubscription.add(subscription);

// handle unsubscribing from the base subscription
subscriber.add(disconnect());

// ready to subscribe to source so do it
source.unsafeSubscribe(subscriber);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
}
}
};
}

private Subscription disconnect() {
return Subscriptions.create(new Action0() {
@Override
public void call() {
if (subscriptionCount.decrementAndGet() == 0) {
baseSubscription.unsubscribe();

// need a new baseSubscription because once unsubscribed
// stays that way
baseSubscription = new CompositeSubscription();
lock.lock();
try {
if (subscriptionCount.decrementAndGet() == 0) {
baseSubscription.unsubscribe();
// need a new baseSubscription because once
// unsubscribed stays that way
baseSubscription = new CompositeSubscription();
}
} finally {
lock.unlock();
}
}
});
Expand Down

0 comments on commit eb6ae37

Please sign in to comment.