Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Fixing #18
Browse files Browse the repository at this point in the history
  • Loading branch information
sinwe committed Jun 16, 2018
1 parent 409a959 commit b5b0fcc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
13 changes: 10 additions & 3 deletions src/main/scala/reactor/core/scala/Scannable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.language.implicitConversions
* Created by winarto on 17/6/17.
*/
trait Scannable {
def jScannable: JScannable
private[scala] def jScannable: JScannable

def actuals(): Stream[_ <: Scannable] = jScannable.actuals().iterator().asScala.map(js => js: Scannable).toStream

Expand Down Expand Up @@ -91,8 +91,15 @@ trait Scannable {
}

object Scannable {
def from(any: AnyRef): Scannable = new Scannable {
override def jScannable: JScannable = JScannable.from(any)
def from(any: Option[AnyRef]): Scannable = new Scannable {
override def jScannable: JScannable = {
any match {
case None => JScannable.from(None.orNull)
case Some(s: Scannable) => JScannable.from(s.jScannable)
case Some(js: JScannable) => JScannable.from(js)
case Some(other) => JScannable.from(other)
}
}
}

implicit def JScannable2Scannable(js: JScannable): Scannable = new Scannable {
Expand Down
11 changes: 8 additions & 3 deletions src/main/scala/reactor/core/scala/publisher/Flux.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import java.util.logging.Level
import java.util.{Comparator, stream, List => JList}

import org.reactivestreams.{Publisher, Subscriber, Subscription}
import reactor.core.Disposable
import reactor.core
import reactor.core.{Disposable, Scannable => JScannable}
import reactor.core.publisher.FluxSink.OverflowStrategy
import reactor.core.publisher.{BufferOverflowStrategy, FluxSink, Signal, SignalType, SynchronousSink, Flux => JFlux, GroupedFlux => JGroupedFlux}
import reactor.core.scala.Scannable
import reactor.core.scheduler.{Scheduler, Schedulers}
import reactor.util.Logger
import reactor.util.context.Context
Expand Down Expand Up @@ -43,7 +45,10 @@ import scala.concurrent.duration.Duration
* @see [[Mono]]
*/
class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] {
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] with Scannable {

override def jScannable: JScannable = JScannable.from(jFlux)

override def subscribe(s: Subscriber[_ >: T]): Unit = jFlux.subscribe(s)

/**
Expand Down Expand Up @@ -2104,7 +2109,7 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
* @param name a name for the sequence
* @return the same sequence, but bearing a name
*/
final def name(name: String) = Flux(jFlux.name(name))
final def name(name: String): Flux[T] = Flux(jFlux.name(name))

/**
* Emit only the first item emitted by this [[Flux]].
Expand Down
11 changes: 6 additions & 5 deletions src/test/scala/reactor/core/scala/publisher/FluxTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.reactivestreams.{Publisher, Subscription}
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.{FreeSpec, Matchers}
import reactor.core.publisher.{Flux => JFlux, _}
import reactor.core.scala.Scannable
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import reactor.test.scheduler.VirtualTimeScheduler
Expand Down Expand Up @@ -1574,10 +1575,10 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
}

".name should call the underlying Flux.name method" in {
val jFlux = spy(JFlux.just(1, 2, 3))
val flux = Flux(jFlux)
flux.name("flux-integer")
verify(jFlux).name("flux-integer")
val name = "flux integer"
val flux = Flux.just(1, 2, 3, 4).name(name)
val scannable: Scannable = Scannable.from(Option(flux))
scannable.name shouldBe name
}

".ofType should filter the value emitted by this flux according to the class" in {
Expand Down Expand Up @@ -1810,7 +1811,7 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
.verifyComplete()
}
"with initial value should scan with provided initial value" in {
val flux = Flux.just(1, 2, 3, 4).scan[Int](2, { (a, b) => a * b })
val flux = Flux.just[Int](1, 2, 3, 4).scan[Int](2, { (a: Int, b: Int) => a * b })
StepVerifier.create(flux)
.expectNext(2, 2, 4, 12, 48)
.verifyComplete()
Expand Down

0 comments on commit b5b0fcc

Please sign in to comment.