diff --git a/src/main/scala/reactor/core/scala/Scannable.scala b/src/main/scala/reactor/core/scala/Scannable.scala index 137792fa..d60e2432 100644 --- a/src/main/scala/reactor/core/scala/Scannable.scala +++ b/src/main/scala/reactor/core/scala/Scannable.scala @@ -10,7 +10,7 @@ import scala.language.implicitConversions * Created by winarto on 17/6/17. */ trait Scannable { - def jScannable: JScannable + private[scala] def jScannable: JScannable def actuals(): Stream[_ <: Scannable] = jScannable.actuals().iterator().asScala.map(js => js: Scannable).toStream @@ -91,8 +91,15 @@ trait Scannable { } object Scannable { - def from(any: AnyRef): Scannable = new Scannable { - override def jScannable: JScannable = JScannable.from(any) + def from(any: Option[AnyRef]): Scannable = new Scannable { + override def jScannable: JScannable = { + any match { + case None => JScannable.from(None.orNull) + case Some(s: Scannable) => JScannable.from(s.jScannable) + case Some(js: JScannable) => JScannable.from(js) + case Some(other) => JScannable.from(other) + } + } } implicit def JScannable2Scannable(js: JScannable): Scannable = new Scannable { diff --git a/src/main/scala/reactor/core/scala/publisher/Flux.scala b/src/main/scala/reactor/core/scala/publisher/Flux.scala index 03ef5b2d..0d3ebf52 100644 --- a/src/main/scala/reactor/core/scala/publisher/Flux.scala +++ b/src/main/scala/reactor/core/scala/publisher/Flux.scala @@ -8,9 +8,11 @@ import java.util.logging.Level import java.util.{Comparator, stream, List => JList} import org.reactivestreams.{Publisher, Subscriber, Subscription} -import reactor.core.Disposable +import reactor.core +import reactor.core.{Disposable, Scannable => JScannable} import reactor.core.publisher.FluxSink.OverflowStrategy import reactor.core.publisher.{BufferOverflowStrategy, FluxSink, Signal, SignalType, SynchronousSink, Flux => JFlux, GroupedFlux => JGroupedFlux} +import reactor.core.scala.Scannable import reactor.core.scheduler.{Scheduler, Schedulers} import reactor.util.Logger import reactor.util.context.Context @@ -43,7 +45,10 @@ import scala.concurrent.duration.Duration * @see [[Mono]] */ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) - extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] { + extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] with Scannable { + + override def jScannable: JScannable = JScannable.from(jFlux) + override def subscribe(s: Subscriber[_ >: T]): Unit = jFlux.subscribe(s) /** @@ -2104,7 +2109,7 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) * @param name a name for the sequence * @return the same sequence, but bearing a name */ - final def name(name: String) = Flux(jFlux.name(name)) + final def name(name: String): Flux[T] = Flux(jFlux.name(name)) /** * Emit only the first item emitted by this [[Flux]]. diff --git a/src/test/scala/reactor/core/scala/publisher/FluxTest.scala b/src/test/scala/reactor/core/scala/publisher/FluxTest.scala index b7615598..2dc3f220 100644 --- a/src/test/scala/reactor/core/scala/publisher/FluxTest.scala +++ b/src/test/scala/reactor/core/scala/publisher/FluxTest.scala @@ -13,6 +13,7 @@ import org.reactivestreams.{Publisher, Subscription} import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{FreeSpec, Matchers} import reactor.core.publisher.{Flux => JFlux, _} +import reactor.core.scala.Scannable import reactor.core.scheduler.Schedulers import reactor.test.StepVerifier import reactor.test.scheduler.VirtualTimeScheduler @@ -1574,10 +1575,10 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit } ".name should call the underlying Flux.name method" in { - val jFlux = spy(JFlux.just(1, 2, 3)) - val flux = Flux(jFlux) - flux.name("flux-integer") - verify(jFlux).name("flux-integer") + val name = "flux integer" + val flux = Flux.just(1, 2, 3, 4).name(name) + val scannable: Scannable = Scannable.from(Option(flux)) + scannable.name shouldBe name } ".ofType should filter the value emitted by this flux according to the class" in { @@ -1810,7 +1811,7 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit .verifyComplete() } "with initial value should scan with provided initial value" in { - val flux = Flux.just(1, 2, 3, 4).scan[Int](2, { (a, b) => a * b }) + val flux = Flux.just[Int](1, 2, 3, 4).scan[Int](2, { (a: Int, b: Int) => a * b }) StepVerifier.create(flux) .expectNext(2, 2, 4, 12, 48) .verifyComplete()