Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SerialSubscription & From #621

Merged
merged 3 commits into from
Dec 23, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,15 @@ object SerialSubscription {
/**
* Represents a [[rx.lang.scala.Subscription]] that can be checked for status.
*/
class SerialSubscription private[scala] (serial: rx.subscriptions.SerialSubscription) extends Subscription {
class SerialSubscription private[scala] (override val asJavaSubscription: rx.subscriptions.SerialSubscription) extends Subscription {

/*
* As long as rx.subscriptions.SerialSubscription has no isUnsubscribed,
* we need to intercept and do it ourselves.
*/
override val asJavaSubscription: rx.subscriptions.SerialSubscription = new rx.subscriptions.SerialSubscription() {
override def unsubscribe(): Unit = {
if(unsubscribed.compareAndSet(false, true)) { serial.unsubscribe() }
}
override def setSubscription(subscription: rx.Subscription): Unit = serial.setSubscription(subscription)
override def getSubscription(): rx.Subscription = serial.getSubscription()
}
override def unsubscribe(): Unit = asJavaSubscription.unsubscribe()
override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed

def subscription_=(value: Subscription): this.type = { asJavaSubscription.setSubscription(value.asJavaSubscription); this }
def subscription_=(value: Subscription): this.type = {
asJavaSubscription.setSubscription(value.asJavaSubscription)
this
}
def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)

}
Expand Down
50 changes: 29 additions & 21 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ public static <T> Observable<T> error(Throwable exception, Scheduler scheduler)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
return from(iterable, Schedulers.currentThread());
}

/**
Expand All @@ -751,7 +751,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
*/
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
return from(iterable).observeOn(scheduler);
return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler));
}

/**
Expand All @@ -764,14 +764,35 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler s
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param items the source sequence
* @param items the source array
* @param <T> the type of items in the Array and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(T[] items) {
return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items)));
return from(Arrays.asList(items));
}

/**
* Converts an Array into an Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
* <p>
* Note: the entire array is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param items the source array
* @param scheduler the scheduler to emit the items of the array
* @param <T> the type of items in the Array and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(T[] items, Scheduler scheduler) {
return from(Arrays.asList(items), scheduler);
}

/**
Expand Down Expand Up @@ -828,7 +849,7 @@ public static <T> Observable<T> from(T t1, T t2) {
* subscribes. Since this occurs before the {@link Subscription} is
* returned, it is not possible to unsubscribe from the sequence before it
* completes.
*
*
* @param t1 first item
* @param t2 second item
* @param t3 third item
Expand Down Expand Up @@ -1013,11 +1034,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
* <p>
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
* returned, it is not possible to unsubscribe from the sequence before it
* completes.
*
* @param t1 first item
* @param t2 second item
* @param t3 third item
Expand Down Expand Up @@ -1045,11 +1061,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/range.png">
* <p>
* Note: the entire range is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param start the value of the first Integer in the sequence
* @param count the number of sequential Integers to generate
* @return an Observable that emits a range of sequential Integers
Expand All @@ -1074,7 +1085,7 @@ public static Observable<Integer> range(int start, int count) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
*/
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
return range(start, count).observeOn(scheduler);
return from(Range.createWithCount(start, count), scheduler);
}

/**
Expand Down Expand Up @@ -1121,10 +1132,7 @@ public static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> o
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
*/
public static <T> Observable<T> just(T value) {
List<T> list = new ArrayList<T>();
list.add(value);

return from(list);
return from(Arrays.asList((value)));
}

/**
Expand All @@ -1143,7 +1151,7 @@ public static <T> Observable<T> just(T value) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
*/
public static <T> Observable<T> just(T value, Scheduler scheduler) {
return just(value).observeOn(scheduler);
return from(Arrays.asList((value)), scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

import java.util.Iterator;

/**
* Converts an Iterable sequence into an Observable.
Expand All @@ -30,24 +36,42 @@
*/
public final class OperationToObservableIterable<T> {

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
return new ToObservableIterable<T>(list, scheduler);
}

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
return new ToObservableIterable<T>(list);
return toObservableIterable(list, Schedulers.currentThread());
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
public ToObservableIterable(Iterable<? extends T> list) {

public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
this.iterable = list;
this.scheduler = scheduler;
}

public Iterable<? extends T> iterable;

public Subscription onSubscribe(Observer<? super T> observer) {
for (T item : iterable) {
observer.onNext(item);
}
observer.onCompleted();
Scheduler scheduler;
final Iterable<? extends T> iterable;

return Subscriptions.empty();
public Subscription onSubscribe(final Observer<? super T> observer) {
final Iterator<? extends T> iterator = iterable.iterator();
return scheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
try {
if (iterator.hasNext()) {
T x = iterator.next();
observer.onNext(x);
self.call();
} else {
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
}
26 changes: 12 additions & 14 deletions rxjava-core/src/test/java/rx/ObservableWindowTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ public class ObservableWindowTests {
@Test
public void testWindow() {
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
Observable.from(1, 2, 3, 4, 5, 6)
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {

@Override
public List<Integer> call(Observable<Integer> o) {
return o.toList().toBlockingObservable().single();
}
Observable.concat(Observable.from(1, 2, 3, 4, 5, 6).window(3).map(new Func1<Observable<Integer>, Observable<List<Integer>>>() {
@Override
public Observable<List<Integer>> call(Observable<Integer> xs) {
return xs.toList();
}
})).toBlockingObservable().forEach(new Action1<List<Integer>>() {

}).toBlockingObservable().forEach(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> xs) {
lists.add(xs);
}
});

@Override
public void call(List<Integer> t) {
lists.add(t);
}
});

assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[]{1, 2, 3});
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
assertEquals(2, lists.size());

Expand Down
26 changes: 12 additions & 14 deletions rxjava-core/src/test/java/rx/operators/OperationWindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand All @@ -44,21 +45,18 @@ public void before() {
scheduler = new TestScheduler();
}

private static <T> List<List<T>> toLists(Observable<Observable<T>> observable) {
final List<T> list = new ArrayList<T>();
final List<List<T>> lists = new ArrayList<List<T>>();
private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {

observable.subscribe(new Action1<Observable<T>>() {
final List<List<T>> lists = new ArrayList<List<T>>();
Observable.concat(observables.map(new Func1<Observable<T>, Observable<List<T>>>() {
@Override
public void call(Observable<T> tObservable) {
tObservable.subscribe(new Action1<T>() {
@Override
public void call(T t) {
list.add(t);
}
});
lists.add(new ArrayList<T>(list));
list.clear();
public Observable<List<T>> call(Observable<T> xs) {
return xs.toList();
}
})).toBlockingObservable().forEach(new Action1<List<T>>() {
@Override
public void call(List<T> xs) {
lists.add(xs);
}
});
return lists;
Expand Down Expand Up @@ -90,7 +88,7 @@ public void testSkipAndCountGaplessEindows() {

@Test
public void testOverlappingWindows() {
Observable<String> subject = Observable.from("zero", "one", "two", "three", "four", "five");
Observable<String> subject = Observable.from(new String[]{"zero", "one", "two", "three", "four", "five"}, Schedulers.currentThread());
Observable<Observable<String>> windowed = Observable.create(window(subject, 3, 1));

List<List<String>> windows = toLists(windowed);
Expand Down