Skip to content

Commit

Permalink
Merge pull request #964 from akarnokd/SubjectSubscriptionManagerFix
Browse files Browse the repository at this point in the history
SubjectSubscriptionManager fix.
  • Loading branch information
benjchristensen committed Mar 13, 2014
2 parents da289ec + 2c82d48 commit 52c388b
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ public void call() {
}
}
}));

if (subscription.isUnsubscribed()) {
addedObserver = false;
break;
}
// on subscribe add it to the map of outbound observers to notify
newState = current.addObserver(subscription, observer);
}
Expand Down Expand Up @@ -202,15 +205,21 @@ public State<T> removeObserver(Subscription s) {
// we are empty, nothing to remove
if (this.observers.length == 0) {
return this;
} else
if (this.observers.length == 1) {
if (this.subscriptions[0].equals(s)) {
return createNewWith(EMPTY_S, EMPTY_O);
}
return this;
}
int n = Math.max(this.observers.length - 1, 1);
int n = this.observers.length - 1;
int copied = 0;
Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n);
SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n);
Subscription[] newsubscriptions = new Subscription[n];
SubjectObserver[] newobservers = new SubjectObserver[n];

for (int i = 0; i < this.subscriptions.length; i++) {
Subscription s0 = this.subscriptions[i];
if (s0 != s) {
if (!s0.equals(s)) {
if (copied == n) {
// if s was not found till the end of the iteration
// we return ourselves since no modification should
Expand All @@ -229,7 +238,13 @@ public State<T> removeObserver(Subscription s) {
// if somehow copied less than expected, truncate the arrays
// if s is unique, this should never happen
if (copied < n) {
return createNewWith(Arrays.copyOf(newsubscriptions, copied), Arrays.copyOf(newobservers, copied));
Subscription[] newsubscriptions2 = new Subscription[copied];
System.arraycopy(newsubscriptions, 0, newsubscriptions2, 0, copied);

SubjectObserver[] newobservers2 = new SubjectObserver[copied];
System.arraycopy(newobservers, 0, newobservers2, 0, copied);

return createNewWith(newsubscriptions2, newobservers2);
}
return createNewWith(newsubscriptions, newobservers);
}
Expand Down
42 changes: 42 additions & 0 deletions rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import rx.Observable;

import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

public class BehaviorSubjectTest {

Expand Down Expand Up @@ -237,4 +239,44 @@ public void testCompletedAfterErrorIsNotSent3() {
verify(o2, never()).onNext(any());
verify(observer, never()).onError(any(Throwable.class));
}
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorSubject<String> src = BehaviorSubject.create((String)null);

for (int i = 0; i < 10; i++) {
@SuppressWarnings("unchecked")
final Observer<Object> o = mock(Observer.class);
InOrder inOrder = inOrder(o);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.first()
.flatMap(new Func1<String, Observable<String>>() {

@Override
public Observable<String> call(String t1) {
return Observable.from(t1 + ", " + t1);
}
})
.subscribe(new Observer<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onCompleted() {
o.onCompleted();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}
}
}
42 changes: 41 additions & 1 deletion rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.subjects;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
Expand Down Expand Up @@ -299,4 +298,45 @@ public void testReSubscribe() {

private final Throwable testException = new Throwable();

@Test(timeout = 1000)
public void testUnsubscriptionCase() {
PublishSubject<String> src = PublishSubject.create();

for (int i = 0; i < 10; i++) {
@SuppressWarnings("unchecked")
final Observer<Object> o = mock(Observer.class);
InOrder inOrder = inOrder(o);
String v = "" + i;
System.out.printf("Turn: %d%n", i);
src.first()
.flatMap(new rx.util.functions.Func1<String, Observable<String>>() {

@Override
public Observable<String> call(String t1) {
return Observable.from(t1 + ", " + t1);
}
})
.subscribe(new Observer<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onCompleted() {
o.onCompleted();
}
});
src.onNext(v);

inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}
}
}
45 changes: 43 additions & 2 deletions rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.subjects;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CountDownLatch;
Expand All @@ -25,10 +24,12 @@
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import rx.Observable;

import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class ReplaySubjectTest {
Expand Down Expand Up @@ -356,4 +357,44 @@ public void testSubscriptionLeak() {

assertEquals(0, replaySubject.subscriberCount());
}
}
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
ReplaySubject<String> src = ReplaySubject.create();

for (int i = 0; i < 10; i++) {
@SuppressWarnings("unchecked")
final Observer<Object> o = mock(Observer.class);
InOrder inOrder = inOrder(o);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.first()
.flatMap(new Func1<String, Observable<String>>() {

@Override
public Observable<String> call(String t1) {
return Observable.from(t1 + ", " + t1);
}
})
.subscribe(new Observer<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
o.onNext(t);
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onCompleted() {
o.onCompleted();
}
});
inOrder.verify(o).onNext("0, 0");
inOrder.verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}
}}

0 comments on commit 52c388b

Please sign in to comment.