Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 0.20.5 into 1.x #1714

Merged
merged 6 commits into from
Oct 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class BufferUntilSubscriberTest {

@Test
public void testIssue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
for (int i = 0; i < numbers.length; i++)
numbers[i] = i + 1;
final int NITERS = 250;
final CountDownLatch latch = new CountDownLatch(NITERS);
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishSubject s = PublishSubject.create();
final AtomicBoolean completed = new AtomicBoolean();
Observable.from(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Func1<Observable<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(Observable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
if (integer >= 5 && completed.compareAndSet(false, true)) {
s.onCompleted();
}
// do some work
Math.pow(Math.random(), Math.random());
return integer * 2;
}
});
}
})
.toList()
.doOnNext(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
counter.incrementAndGet();
latch.countDown();
innerLatch.countDown();
}
})
.subscribe();
if (!innerLatch.await(30, TimeUnit.SECONDS))
Assert.fail("Failed inner latch wait, iteration " + iters);
}
if (!latch.await(30, TimeUnit.SECONDS))
Assert.fail("Incomplete! Went through " + latch.getCount() + " iterations");
else
Assert.assertEquals(NITERS, counter.get());
}
}
8 changes: 5 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,17 @@ public void call(Subscriber<? super R> o) {
* @return the source Observable, transformed by the transformer function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public <R> Observable<R> compose(Transformer<? super T, R> transformer) {
return transformer.call(this);
@SuppressWarnings("unchecked")
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
// Casting to Observable<R> is type-safe because we know Observable is covariant.
return (Observable<R>) transformer.call(this);
}

/**
* Transformer function used by {@link #compose}.
* @warn more complete description needed
*/
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<R>> {
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
// cover for generics insanity
}

Expand Down
184 changes: 81 additions & 103 deletions src/main/java/rx/internal/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.EmptyObserver;
import rx.observers.Subscribers;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -51,6 +51,9 @@
*/
public class BufferUntilSubscriber<T> extends Subject<T, T> {

@SuppressWarnings("rawtypes")
private final static Observer EMPTY_OBSERVER = new EmptyObserver();

/**
* @warn create() undescribed
* @return
Expand All @@ -62,25 +65,22 @@ public static <T> BufferUntilSubscriber<T> create() {

/** The common state. */
static final class State<T> {
/** The first observer or the one which buffers until the first arrives. */
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
/** Allow a single subscriber only. */
volatile int first;
volatile Observer<? super T> observerRef = null;
/** Field updater for observerRef. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
/** Field updater for first. */
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");

boolean casFirst(int expected, int next) {
return FIRST_UPDATER.compareAndSet(this, expected, next);
}
void setObserverRef(Observer<? super T> o) {
observerRef = o;

boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
}

Object guard = new Object();
/* protected by guard */
boolean emitting = false;

final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
final NotificationLite<T> nl = NotificationLite.instance();
}

static final class OnSubscribeAction<T> implements OnSubscribe<T> {
Expand All @@ -92,122 +92,100 @@ public OnSubscribeAction(State<T> state) {

@Override
public void call(final Subscriber<? super T> s) {
if (state.casFirst(0, 1)) {
final NotificationLite<T> nl = NotificationLite.instance();
// drain queued notifications before subscription
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
Object o;
while ((o = buffered.buffer.poll()) != null) {
nl.accept(s, o);
}
// register real observer for pass-thru ... and drain any further events received on first notification
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
if (state.casObserverRef(null, s)) {
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.setObserverRef(Subscribers.empty());
state.observerRef = EMPTY_OBSERVER;
}
}));
boolean win = false;
synchronized (state.guard) {
if (!state.emitting) {
state.emitting = true;
win = true;
}
}
if (win) {
final NotificationLite<T> nl = NotificationLite.instance();
while(true) {
Object o;
while ((o = state.buffer.poll()) != null) {
nl.accept(state.observerRef, o);
}
synchronized (state.guard) {
if (state.buffer.isEmpty()) {
// Although the buffer is empty, there is still a chance
// that further events may be put into the `buffer`.
// `emit(Object v)` should handle it.
state.emitting = false;
break;
}
}
}
}
} else {
s.onError(new IllegalStateException("Only one subscriber allowed!"));
}
}

}
final State<T> state;


private boolean forward = false;

private BufferUntilSubscriber(State<T> state) {
super(new OnSubscribeAction<T>(state));
this.state = state;
}

@Override
public void onCompleted() {
state.observerRef.onCompleted();
}

@Override
public void onError(Throwable e) {
state.observerRef.onError(e);
private void emit(Object v) {
synchronized (state.guard) {
state.buffer.add(v);
if (state.observerRef != null && !state.emitting) {
// Have an observer and nobody is emitting,
// should drain the `buffer`
forward = true;
state.emitting = true;
}
}
if (forward) {
Object o;
while ((o = state.buffer.poll()) != null) {
state.nl.accept(state.observerRef, o);
}
// Because `emit(Object v)` will be called in sequence,
// no event will be put into `buffer` after we drain it.
}
}

@Override
public void onNext(T t) {
state.observerRef.onNext(t);
}

/**
* This is a temporary observer between buffering and the actual that gets into the line of notifications
* from the producer and will drain the queue of any items received during the race of the initial drain and
* switching this.
*
* It will then immediately swap itself out for the actual (after a single notification), but since this is
* now being done on the same producer thread no further buffering will occur.
*/
private static final class PassThruObserver<T> extends Subscriber<T> {

private final Observer<? super T> actual;
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
private final ConcurrentLinkedQueue<Object> buffer;
private final State<T> state;

PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
State<T> state) {
this.actual = actual;
this.buffer = buffer;
this.state = state;
public void onCompleted() {
if (forward) {
state.observerRef.onCompleted();
}

@Override
public void onCompleted() {
drainIfNeededAndSwitchToActual();
actual.onCompleted();
else {
emit(state.nl.completed());
}
}

@Override
public void onError(Throwable e) {
drainIfNeededAndSwitchToActual();
actual.onError(e);
@Override
public void onError(Throwable e) {
if (forward) {
state.observerRef.onError(e);
}

@Override
public void onNext(T t) {
drainIfNeededAndSwitchToActual();
actual.onNext(t);
else {
emit(state.nl.error(e));
}

private void drainIfNeededAndSwitchToActual() {
final NotificationLite<T> nl = NotificationLite.instance();
Object o;
while ((o = buffer.poll()) != null) {
nl.accept(this, o);
}
// now we can safely change over to the actual and get rid of the pass-thru
// but only if not unsubscribed
state.setObserverRef(actual);
}

}

private static final class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
private static final NotificationLite<Object> nl = NotificationLite.instance();

@Override
public void onCompleted() {
buffer.add(nl.completed());
}

@Override
public void onError(Throwable e) {
buffer.add(nl.error(e));
@Override
public void onNext(T t) {
if (forward) {
state.observerRef.onNext(t);
}

@Override
public void onNext(T t) {
buffer.add(nl.next(t));
else {
emit(state.nl.next(t));
}

}
}
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void onError(Throwable e) {
boolean sendOnComplete = false;
synchronized (this) {
wip--;
if (wip == 0 && completed) {
if ((wip == 0 && completed) || (wip < 0)) {
sendOnComplete = true;
}
}
Expand Down
19 changes: 1 addition & 18 deletions src/main/java/rx/internal/operators/OperatorTakeUntil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,7 @@ public OperatorTakeUntil(final Observable<? extends E> other) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new SerializedSubscriber<T>(child) {

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onNext(t);
}

};
final Subscriber<T> parent = new SerializedSubscriber<T>(child);

other.unsafeSubscribe(new Subscriber<E>(child) {

Expand Down
Loading