diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 2874cf9daf..88335ef03e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5425,52 +5425,6 @@ public final Observable onExceptionResumeNext(final Observable r return lift(new OperatorOnExceptionResumeNextViaObservable(resumeSequence)); } - /** - * Performs work on the source Observable in parallel by sharding it on a {@link Schedulers#computation()} - * {@link Scheduler}, and returns the resulting Observable. - *

- * - *

- *
Scheduler:
- *
{@code parallel} operates by default on the {@code computation} {@link Scheduler}.
- *
- * - * @param f - * a {@link Func1} that applies Observable Operators to {@code Observable} in parallel and - * returns an {@code Observable} - * @return an Observable that emits the results of applying {@code f} to the items emitted by the source - * Observable - * @see RxJava wiki: parallel - * @see RxJava Threading Examples - */ - public final Observable parallel(Func1, Observable> f) { - return parallel(f, Schedulers.computation()); - } - - /** - * Performs work on the source Observable in parallel by sharding it on a {@link Scheduler}, and returns - * the resulting Observable. - *

- * - *

- *
Scheduler:
- *
you specify which {@link Scheduler} this operator will use
- *
- * - * @param f - * a {@link Func1} that applies Observable Operators to {@code Observable} in parallel and - * returns an {@code Observable} - * @param s - * a {@link Scheduler} to perform the work on - * @return an Observable that emits the results of applying {@code f} to the items emitted by the source - * Observable - * @see RxJava wiki: parallel - * @see RxJava Threading Examples - */ - public final Observable parallel(final Func1, Observable> f, final Scheduler s) { - return lift(new OperatorParallel(f, s)); - } - /** * Returns a {@link ConnectableObservable}, which waits until its * {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those diff --git a/src/main/java/rx/internal/operators/OperatorParallel.java b/src/main/java/rx/internal/operators/OperatorParallel.java deleted file mode 100644 index 109bb0d410..0000000000 --- a/src/main/java/rx/internal/operators/OperatorParallel.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Observable.Operator; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; -import rx.functions.Func1; - -/** - * Identifies unit of work that can be executed in parallel on a given Scheduler. - */ -public final class OperatorParallel implements Operator { - - private final Scheduler scheduler; - private final Func1, Observable> f; - private final int degreeOfParallelism; - - public OperatorParallel(Func1, Observable> f, Scheduler scheduler) { - this.scheduler = scheduler; - this.f = f; - this.degreeOfParallelism = scheduler.parallelism(); - } - - @Override - public Subscriber call(final Subscriber child) { - @SuppressWarnings("unchecked") - final Observable[] os = new Observable[degreeOfParallelism]; - @SuppressWarnings("unchecked") - final Subscriber[] ss = new Subscriber[degreeOfParallelism]; - final ParentSubscriber subscriber = new ParentSubscriber(child, ss); - for (int i = 0; i < os.length; i++) { - final int index = i; - Observable o = Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber inner) { - ss[index] = inner; - child.add(inner); // unsubscribe chain - inner.setProducer(new Producer() { - - @Override - public void request(long n) { - // as we receive requests from observeOn propagate upstream to the parent Subscriber - subscriber.requestMore(n); - } - - }); - } - - }); - os[i] = f.call(o.observeOn(scheduler)); - } - - // subscribe BEFORE receiving data so everything is hooked up - Observable.merge(os).unsafeSubscribe(child); - return subscriber; - } - - private class ParentSubscriber extends Subscriber { - - final Subscriber child; - final Subscriber[] ss; - int index = 0; - final AtomicLong initialRequest = new AtomicLong(); - final AtomicBoolean started = new AtomicBoolean(); - - private ParentSubscriber(Subscriber child, Subscriber[] ss) { - super(child); - this.child = child; - this.ss = ss; - } - - public void requestMore(long n) { - if (started.get()) { - request(n); - } else { - initialRequest.addAndGet(n); - } - } - - @Override - public void onStart() { - if (started.compareAndSet(false, true)) { - // if no request via requestMore has been sent yet, we start with 0 (rather than default Long.MAX_VALUE). - request(initialRequest.get()); - } - } - - @Override - public void onCompleted() { - for (Subscriber s : ss) { - s.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - child.onError(e); - } - - @Override - public void onNext(T t) { - /* - * There is a possible bug here ... we could get a MissingBackpressureException - * if the processing on each of the threads is unbalanced. In other words, if 1 of the - * observeOn queues has space, but another is full, this could try emitting to one that - * is full and get a MissingBackpressureException. - * - * To solve that we'd need to check the outstanding request per Subscriber, which will - * need a more complicated mechanism to expose a type that has both the requested + the - * Subscriber to emit to. - */ - ss[index++].onNext(t); - if (index >= degreeOfParallelism) { - index = 0; - } - } - - }; - -} diff --git a/src/test/java/rx/internal/operators/OperatorParallelTest.java b/src/test/java/rx/internal/operators/OperatorParallelTest.java deleted file mode 100644 index d6a6e1ac79..0000000000 --- a/src/test/java/rx/internal/operators/OperatorParallelTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.internal.util.RxRingBuffer; -import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; - -public class OperatorParallelTest { - - @Test(timeout = 20000) - public void testParallel() { - int NUM = 1000; - final AtomicInteger count = new AtomicInteger(); - final AtomicInteger innerCount = new AtomicInteger(); - Observable.range(1, NUM).parallel( - new Func1, Observable>() { - - @Override - public Observable call(Observable o) { - return o.map(new Func1() { - - @Override - public Integer[] call(Integer t) { - try { - // randomize to try and force non-determinism - // if we see these tests fail randomly then we have a problem with merging it all back together - Thread.sleep((int) (Math.random() * 10)); - } catch (InterruptedException e) { - System.out.println("*********** error!!!!!!!"); - e.printStackTrace(); - // TODO why is this exception not being thrown? - throw new RuntimeException(e); - } - // System.out.println("V: " + t + " Thread: " + Thread.currentThread()); - innerCount.incrementAndGet(); - return new Integer[] { t, t * 99 }; - } - - }); - } - }) - .toBlocking().forEach(new Action1() { - - @Override - public void call(Integer[] v) { - count.incrementAndGet(); - // System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread()); - } - - }); - System.out.println("parallel test completed ----------"); - - // just making sure we finish and get the number we expect - assertEquals("innerCount", NUM, innerCount.get()); - assertEquals("finalCount", NUM, count.get()); - } - - @Test(timeout = 10000) - public void testParallelWithNestedAsyncWork() { - int NUM = 20; - final AtomicInteger count = new AtomicInteger(); - Observable.range(1, NUM).parallel( - new Func1, Observable>() { - - @Override - public Observable call(Observable o) { - return o.flatMap(new Func1>() { - - @Override - public Observable call(Integer t) { - return Observable.just(String.valueOf(t)).delay(100, TimeUnit.MILLISECONDS); - } - - }); - } - }).toBlocking().forEach(new Action1() { - - @Override - public void call(String v) { - count.incrementAndGet(); - } - - }); - - // just making sure we finish and get the number we expect - assertEquals(NUM, count.get()); - } - - @Test - public void testBackpressureViaOuterObserveOn() { - final AtomicInteger emitted = new AtomicInteger(); - TestSubscriber ts = new TestSubscriber(); - Observable.range(1, 100000).doOnNext(new Action1() { - - @Override - public void call(Integer t1) { - emitted.incrementAndGet(); - } - - }).parallel(new Func1, Observable>() { - - @Override - public Observable call(Observable t1) { - return t1.map(new Func1() { - - @Override - public String call(Integer t) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return String.valueOf(t); - } - - }); - } - - }).observeOn(Schedulers.newThread()).take(20000).subscribe(ts); - ts.awaitTerminalEvent(); - ts.assertNoErrors(); - System.out.println("testBackpressureViaObserveOn emitted => " + emitted.get()); - assertTrue(emitted.get() < 20000 + (RxRingBuffer.SIZE * Schedulers.computation().parallelism())); // should have no more than the buffer size beyond the 20000 in take - assertEquals(20000, ts.getOnNextEvents().size()); - } - - @Test - public void testBackpressureOnInnerObserveOn() { - final AtomicInteger emitted = new AtomicInteger(); - TestSubscriber ts = new TestSubscriber(); - Observable.range(1, 100000).doOnNext(new Action1() { - - @Override - public void call(Integer t1) { - emitted.incrementAndGet(); - } - - }).parallel(new Func1, Observable>() { - - @Override - public Observable call(Observable t1) { - return t1.map(new Func1() { - - @Override - public String call(Integer t) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return String.valueOf(t); - } - - }); - } - - }).take(20000).subscribe(ts); - ts.awaitTerminalEvent(); - ts.assertNoErrors(); - System.out.println("testBackpressureViaObserveOn emitted => " + emitted.get()); - assertTrue(emitted.get() < 20000 + (RxRingBuffer.SIZE * Schedulers.computation().parallelism())); // should have no more than the buffer size beyond the 20000 in take - assertEquals(20000, ts.getOnNextEvents().size()); - } - - @Test(timeout = 10000) - public void testBackpressureViaSynchronousTake() { - final AtomicInteger emitted = new AtomicInteger(); - TestSubscriber ts = new TestSubscriber(); - Observable.range(1, 100000).doOnNext(new Action1() { - - @Override - public void call(Integer t1) { - emitted.incrementAndGet(); - } - - }).parallel(new Func1, Observable>() { - - @Override - public Observable call(Observable t1) { - return t1.map(new Func1() { - - @Override - public String call(Integer t) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return String.valueOf(t); - } - - }); - } - - }).take(2000).subscribe(ts); - ts.awaitTerminalEvent(); - ts.assertNoErrors(); - System.out.println("emitted: " + emitted.get()); - // we allow buffering inside each parallel Observable - assertEquals(RxRingBuffer.SIZE * Schedulers.computation().parallelism(), emitted.get()); // no async, so should be perfect - assertEquals(2000, ts.getOnNextEvents().size()); - } -}