Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators BO.chunkify, BO.collect, O.forIterable #636

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2460,6 +2460,21 @@ public Boolean call(T first, T second) {
}
});
}

/**
* Return an Observable which concatenates the observable sequences obtained by running the
* resultSelector for each element in the given Iterable source.
* @param <T> the Iterable sequence value type
* @param <R> the result type
* @param source the source iterable
* @param resultSelector the selector function that returns an Observable
* sequence for each value of the {@code source} iterable sequence
* @return an Observable which concatenates the observable sequences obtained by running the
* resultSelector for each element in the given Iterable source.
*/
public static <T, R> Observable<R> forIterable(Iterable<? extends T> source, Func1<? super T, ? extends Observable<? extends R>> resultSelector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this? I'm not understanding the need for this signature.

We can already create an Observable out of an Iterable and then perform the many different operators on it.

What does this relate to on the Rx.Net side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called For in Rx.NET. It behaves differently from the regular iterable as it iterates to the next value only when the previous one's selected Observable completes. For example, if source is large or infinite, it won't try to read through it completely like concat does.

return create(OperationConcat.forIterable(source, resultSelector));
}

/**
* Returns an Observable that emits a Boolean value that indicates whether
Expand Down
28 changes: 28 additions & 0 deletions rxjava-core/src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
package rx.observables;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.OperationCollect;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* An extension of {@link Observable} that provides blocking operators.
Expand Down Expand Up @@ -354,4 +358,28 @@ public Iterator<T> iterator() {
}
};
}

/**
* Return an Iterable sequence that returns elements
* collected/aggregated from the source sequence between consecutive next calls.
* @param <U> the type of the collector
* @param initialCollector factory function to create the initial collector
* @param merger function that merges the current collector with the observed value and returns a (new) collector
* @param replaceCollector function that replaces the current collector with a new collector when the current collector is consumed by an Iterator.next()
* @return an Iterable sequence that returns elements
* collected/aggregated from the source sequence between
* consecutive next calls.
*/
public <U> Iterable<U> collect(Func0<? extends U> initialCollector,
Func2<? super U, ? super T, ? extends U> merger,
Func1<? super U, ? extends U> replaceCollector) {
return OperationCollect.collect(o, initialCollector, merger, replaceCollector);
}
/**
* Return an Iterable that produces a sequence of consecutive (possibly empty) chunks of this Observable sequence.
* @return an Iterable that produces a sequence of consecutive (possibly empty) chunks of this Observable sequence.
*/
public Iterable<List<T>> chunkify() {
return OperationCollect.chunkify(o);
}
}
252 changes: 252 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationCollect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.SingleAssignmentSubscription;
import rx.util.Exceptions;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Observable to iterable mapping by using custom collector, merger and
* collector-replacer functions.
*/
public final class OperationCollect {
/** Utility class. */
private OperationCollect() { throw new IllegalStateException("No instances!"); }

/**
* Produces an Iterable sequence that returns elements
* collected/aggregated from the source sequence between consecutive next calls.
* @param <T> the source value type
* @param <U> the aggregation type
* @param source the source Observable
* @param initialCollector the factory to create the initial collector
* @param merger the merger that combines the current collector with the observed value and returns a (new) collector
* @param replaceCollector the function that replaces the current collector with a new collector.
* @return the iterable sequence
*/
public static <T, U> Iterable<U> collect(
final Observable<? extends T> source,
final Func0<? extends U> initialCollector,
final Func2<? super U, ? super T, ? extends U> merger,
final Func1<? super U, ? extends U> replaceCollector) {
return new Iterable<U>() {
@Override
public Iterator<U> iterator() {
SingleAssignmentSubscription sas = new SingleAssignmentSubscription();
Collect<T, U> collect = new Collect<T, U>(initialCollector, merger, replaceCollector, sas);
if (!sas.isUnsubscribed()) {
sas.set(source.subscribe(collect));
}
return collect;
}
};
}

/**
* Produces an Iterable sequence of consecutive (possibly empty) chunks of the source sequence.
* @param <T> the source value type
* @param source the source Observable
* @return an iterable sequence of the chunks
*/
public static <T> Iterable<List<T>> chunkify(final Observable<? extends T> source) {
ListManager<T> na = new ListManager<T>();
return collect(source, na, na, na);
}
/** Creates a new ArrayList and manages its content. */
private static final class ListManager<T> implements Func1<List<T>, List<T>>, Func0<List<T>>, Func2<List<T>, T, List<T>> {
@Override
public List<T> call() {
return new ArrayList<T>();
}

@Override
public List<T> call(List<T> t1) {
return call();
}
@Override
public List<T> call(List<T> t1, T t2) {
t1.add(t2);
return t1;
}
}

/** The observer and iterator. */
private static final class Collect<T, U> implements Observer<T>, Iterator<U> {
final Func2<? super U, ? super T, ? extends U> merger;
final Func1<? super U, ? extends U> replaceCollector;
final Subscription cancel;
final Lock lock = new ReentrantLock();
U current;
boolean hasDone;
boolean hasError;
Throwable error;
/** Iterator's current collector. */
U iCurrent;
/** Iterator has unclaimed collector. */
boolean iHasValue;
/** Iterator completed. */
boolean iDone;
/** Iterator error. */
Throwable iError;

public Collect(final Func0<? extends U> initialCollector,
final Func2<? super U, ? super T, ? extends U> merger,
final Func1<? super U, ? extends U> replaceCollector,
final Subscription cancel) {
this.merger = merger;
this.replaceCollector = replaceCollector;
this.cancel = cancel;
try {
current = initialCollector.call();
} catch (Throwable t) {
hasError = true;
error = t;
cancel.unsubscribe();
}
}

@Override
public void onNext(T args) {
boolean unsubscribe = false;
lock.lock();
try {
if (hasDone || hasError) {
return;
}
try {
current = merger.call(current, args);
} catch (Throwable t) {
error = t;
hasError = true;
unsubscribe = true;
}
} finally {
lock.unlock();
}
if (unsubscribe) {
cancel.unsubscribe();
}
}

@Override
public void onCompleted() {
boolean unsubscribe = false;
lock.lock();
try {
if (hasDone || hasError) {
return;
}
hasDone = true;
unsubscribe = true;
} finally {
lock.unlock();
}
if (unsubscribe) {
cancel.unsubscribe();
}
}

@Override
public void onError(Throwable e) {
boolean unsubscribe = false;
lock.lock();
try {
if (hasDone || hasError) {
return;
}
hasError = true;
error = e;
unsubscribe = true;
} finally {
lock.unlock();
}
if (unsubscribe) {
cancel.unsubscribe();
}
}

@Override
public boolean hasNext() {
if (iError != null) {
throw Exceptions.propagate(iError);
}
if (!iHasValue) {
if (!iDone) {
lock.lock();
try {
if (hasError) {
iError = error;
iDone = true;
current = null;
iCurrent = null;
} else {
iCurrent = current;
iHasValue = true;
if (hasDone) {
current = null;
iDone = true;
} else {
try {
current = replaceCollector.call(iCurrent);
} catch (Throwable t) {
iError = t;
iDone = true;
}
}
}
} finally {
lock.unlock();
}
if (iDone && iError != null) {
cancel.unsubscribe();
throw Exceptions.propagate(iError);
}
return true;
}
return false;
}
return true;
}

@Override
public U next() {
if (hasNext()) {
U value = iCurrent;
iCurrent = null;
iHasValue = false;
return value;
}
throw new NoSuchElementException();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read-only sequence");
}
}
}
Loading