From eb6ae379ea83de6e1049ecf745e7fc97d1881ef0 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sun, 28 Sep 2014 11:15:46 +1000 Subject: [PATCH] abandon ReadWriteReentrantLock and simplify locking to a single lock that is used for subscription and disconnect --- .../operators/OnSubscribeRefCount.java | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java index 7a28197174..df77d7db3b 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java @@ -17,8 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import rx.Observable.OnSubscribe; import rx.Subscriber; @@ -43,10 +42,9 @@ public final class OnSubscribeRefCount implements OnSubscribe { private final AtomicInteger subscriptionCount = new AtomicInteger(0); /** - * Ensures that subscribers wait for the first subscription to be assigned - * to baseSubcription before being subscribed themselves. + * Use this lock for every subscription and disconnect action. */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantLock lock = new ReentrantLock(); /** * Constructor. @@ -61,48 +59,26 @@ public OnSubscribeRefCount(ConnectableObservable source) { @Override public void call(final Subscriber subscriber) { + lock.lock(); if (subscriptionCount.incrementAndGet() == 1) { - // ensure secondary subscriptions wait for baseSubscription to be - // set by first subscription - lock.writeLock().lock(); - final AtomicBoolean writeLocked = new AtomicBoolean(true); - + try { // need to use this overload of connect to ensure that - // baseSubscription is set in the case that source is a synchronous - // Observable - source.connect(new Action1() { - @Override - public void call(Subscription subscription) { - - try { - baseSubscription.add(subscription); - - // handle unsubscribing from the base subscription - subscriber.add(disconnect()); - - // ready to subscribe to source so do it - source.unsafeSubscribe(subscriber); - } finally { - // release the write lock - lock.writeLock().unlock(); - writeLocked.set(false); - } - } - }); + // baseSubscription is set in the case that source is a + // synchronous Observable + source.connect(onSubscribe(subscriber, writeLocked)); } finally { // need to cover the case where the source is subscribed to - // outside of this class thus preventing the above Action1 + // outside of this class thus preventing the above Action1 // being called if (writeLocked.get()) { // Action1 was not called - lock.writeLock().unlock(); + lock.unlock(); } } } else { - lock.readLock().lock(); try { // handle unsubscribing from the base subscription subscriber.add(disconnect()); @@ -111,22 +87,49 @@ public void call(Subscription subscription) { source.unsafeSubscribe(subscriber); } finally { // release the read lock - lock.readLock().unlock(); + lock.unlock(); } } } + private Action1 onSubscribe(final Subscriber subscriber, + final AtomicBoolean writeLocked) { + return new Action1() { + @Override + public void call(Subscription subscription) { + + try { + baseSubscription.add(subscription); + + // handle unsubscribing from the base subscription + subscriber.add(disconnect()); + + // ready to subscribe to source so do it + source.unsafeSubscribe(subscriber); + } finally { + // release the write lock + lock.unlock(); + writeLocked.set(false); + } + } + }; + } + private Subscription disconnect() { return Subscriptions.create(new Action0() { @Override public void call() { - if (subscriptionCount.decrementAndGet() == 0) { - baseSubscription.unsubscribe(); - - // need a new baseSubscription because once unsubscribed - // stays that way - baseSubscription = new CompositeSubscription(); + lock.lock(); + try { + if (subscriptionCount.decrementAndGet() == 0) { + baseSubscription.unsubscribe(); + // need a new baseSubscription because once + // unsubscribed stays that way + baseSubscription = new CompositeSubscription(); + } + } finally { + lock.unlock(); } } });