diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 12276c7b8d..201b3c53f0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -30,6 +30,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.ImmediateScheduler; import rx.concurrency.Schedulers; import rx.util.functions.Func1; @@ -50,7 +51,12 @@ public ObserveOn(Observable source, Scheduler scheduler) { @Override public Subscription call(final Observer observer) { - return source.subscribe(new ScheduledObserver(observer, scheduler)); + if (scheduler instanceof ImmediateScheduler) { + // do nothing if we request ImmediateScheduler so we don't invoke overhead + return source.subscribe(observer); + } else { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } } } diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index 7273a29f5c..c491760a1b 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,18 +18,13 @@ import rx.Notification; import rx.Observer; import rx.Scheduler; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - /* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; - private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - private final AtomicInteger counter = new AtomicInteger(0); - public ScheduledObserver(Observer underlying, Scheduler scheduler) { this.underlying = underlying; this.scheduler = scheduler; @@ -46,47 +41,38 @@ public void onError(final Exception e) { } @Override - public void onNext(final T args) { - enqueue(new Notification(args)); + public void onNext(final T v) { + enqueue(new Notification(v)); } - private void enqueue(Notification notification) { - int count = counter.getAndIncrement(); - - queue.offer(notification); + private void enqueue(final Notification notification) { - if (count == 0) { - processQueue(); - } - } - - private void processQueue() { - scheduler.schedule(new Action0() { + Schedulers.currentThread().schedule(new Action0() { @Override public void call() { - Notification not = queue.poll(); - - switch (not.getKind()) { - case OnNext: - underlying.onNext(not.getValue()); - break; - case OnError: - underlying.onError(not.getException()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + not); - - } - - int count = counter.decrementAndGet(); - if (count > 0) { - scheduler.schedule(this); - } + scheduler.schedule(new Action0() { + @Override + public void call() { + switch (notification.getKind()) { + case OnNext: + underlying.onNext(notification.getValue()); + break; + case OnError: + underlying.onError(notification.getException()); + break; + case OnCompleted: + underlying.onCompleted(); + break; + default: + throw new IllegalStateException("Unknown kind of notification " + notification); + + } + } + }); } + }); - } + }; + }