Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into sreactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sinwe committed Jun 12, 2019
2 parents 169a303 + ef7fc3c commit b2c1a6d
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 4 deletions.
131 changes: 129 additions & 2 deletions src/main/scala/reactor/core/scala/publisher/SParallelFlux.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package reactor.core.scala.publisher
import java.util
import java.util.function.Supplier

import org.reactivestreams.Publisher
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import reactor.core.Disposable
import reactor.core.publisher.{ParallelFlux => JParallelFlux}
import reactor.core.scheduler.Scheduler
import reactor.util.concurrent.Queues

class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) extends Publisher[T]{

/**
* Perform a fluent transformation to a value via a converter function which receives
Expand All @@ -19,6 +21,131 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) {
*/
final def as[U](converter: SParallelFlux[T] => U): U = jParallelFlux.as((t: JParallelFlux[T]) => converter(SParallelFlux(t)))

/**
* Filters the source values on each 'rail'.
* <p>
* Note that the same predicate may be called from multiple threads concurrently.
*
* @param predicate the function returning true to keep a value or false to drop a
* value
* @return the new [[SParallelFlux]] instance
*/
final def filter(predicate: SPredicate[T]) = SParallelFlux(jParallelFlux.filter(predicate))

/**
* Maps the source values on each 'rail' to another value.
* <p>
* Note that the same mapper function may be called from multiple threads
* concurrently.
*
* @tparam U the output value type
* @param mapper the mapper function turning Ts into Us.
* @return the new [[SParallelFlux]] instance
*/
final def map[U](mapper: T => _ <: U) = SParallelFlux(jParallelFlux.map[U](mapper))

/**
* Reduces all values within a 'rail' and across 'rails' with a reducer function into
* a single sequential value.
* <p>
* Note that the same reducer function may be called from multiple threads
* concurrently.
*
* @param reducer the function to reduce two values into one.
* @return the new Mono instance emitting the reduced value or empty if the
* [[SParallelFlux]] was empty
*/
final def reduce(reducer: (T, T) => T) = Mono(jParallelFlux.reduce(reducer))

/**
* Reduces all values within a 'rail' to a single value (with a possibly different
* type) via a reducer function that is initialized on each rail from an
* initialSupplier value.
* <p>
* Note that the same mapper function may be called from multiple threads
* concurrently.
*
* @tparam R the reduced output type
* @param initialSupplier the supplier for the initial value
* @param reducer the function to reduce a previous output of reduce (or the initial
* value supplied) with a current source value.
* @return the new [[SParallelFlux]] instance
*/
final def reduce[R](initialSupplier: () => R, reducer: (R, T) => R) = SParallelFlux(jParallelFlux.reduce[R](initialSupplier, reducer))

/**
* Specifies where each 'rail' will observe its incoming values with possibly
* work-stealing and a given prefetch amount.
* <p>
* This operator uses the default prefetch size returned by [[Queues.SMALL_BUFFER_SIZE]].
* <p>
* The operator will call [[Scheduler.createWorker()]] as many times as this
* ParallelFlux's parallelism level is.
* <p>
* No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
* parallelism level is lower than the ParallelFlux's, some rails may end up on
* the same thread/worker.
* <p>
* This operator doesn't require the Scheduler to be trampolining as it does its own
* built-in trampolining logic.
*
* @param scheduler the scheduler to use that rail's worker has run out of work.
* @param prefetch the number of values to request on each 'rail' from the source
* @return the new [[SParallelFlux]] instance
*/
final def runOn(scheduler: Scheduler, prefetch: Int = Queues.SMALL_BUFFER_SIZE) = SParallelFlux(jParallelFlux.runOn(scheduler, prefetch))


/**
* Merges the values from each 'rail' in a round-robin or same-order fashion and
* exposes it as a regular Publisher sequence, running with a give prefetch value for
* the rails.
*
* @param prefetch the prefetch amount to use for each rail
* @return the new Flux instance
*/
final def sequential(prefetch: Int = Queues.SMALL_BUFFER_SIZE) = Flux(jParallelFlux.sequential(prefetch))

/**
* Subscribes to this [[SParallelFlux]] by providing an onNext, onError,
* onComplete and onSubscribe callback and triggers the execution chain for all
* 'rails'.
*
* @param onNext consumer of onNext signals
* @param onError consumer of error signal
* @param onComplete callback on completion signal
* @param onSubscribe consumer of the subscription signal
*/
final def subscribe(onNext: Option[T => Unit] = None,
onError: Option[Throwable => Unit] = None,
onComplete: Option[() => Unit] = None,
onSubscribe: Option[Subscription => Unit] = None): Disposable = (onNext, onError, onComplete, onSubscribe) match {
case (Some(fn), Some(fe), Some(fc), Some(fs)) => jParallelFlux.subscribe(fn, fe, fc, fs)
case (Some(fn), Some(fe), Some(fc), None) => jParallelFlux.subscribe(fn, fe, fc)
case (Some(fn), Some(fe), None, Some(fs)) => jParallelFlux.subscribe(fn, fe, null, fs)
case (Some(fn), Some(fe), None, None) => jParallelFlux.subscribe(fn, fe)
case (Some(fn), None, Some(fe), Some(fs)) => jParallelFlux.subscribe(fn, null, fe, fs)
case (Some(fn), None, Some(fe), None) => jParallelFlux.subscribe(fn, null, fe, null)
case (Some(fn), None, None, Some(fs)) => jParallelFlux.subscribe(fn, null, null, fs)
case (Some(fn), None, None, None) => jParallelFlux.subscribe(fn)
case (None, Some(fe), Some(fc), Some(fs)) => jParallelFlux.subscribe(null, fe, fc, fs)
case (None, Some(fe), Some(fc), None) => jParallelFlux.subscribe(null, fe, fc)
case (None, Some(fe), None, Some(fs)) => jParallelFlux.subscribe(null, fe, null, fs)
case (None, Some(fe), None, None) => jParallelFlux.subscribe(null, fe)
case (None, None, Some(fc), Some(fs)) => jParallelFlux.subscribe(null, null, fc, fs)
case (None, None, Some(fc), None) => jParallelFlux.subscribe(null, null, fc, null)
case (None, None, None, Some(fs)) => jParallelFlux.subscribe(null, null, null, fs)
case (None, None, None, None) => jParallelFlux.subscribe()
}

/**
* Merge the rails into a [[Flux.sequential]] Flux and
* [[Flux#subscribe(Subscriber) subscribe]] to said Flux.
*
* @param s the subscriber to use on [[#sequential()]] Flux
*/
override def subscribe(s: Subscriber[_ >: T]): Unit = jParallelFlux.subscribe(s)

def asJava: JParallelFlux[T] = jParallelFlux
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,87 @@
package reactor.core.scala.publisher

import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FreeSpec, Matchers}
import reactor.core.publisher.{ParallelFlux => JParallelFlux, Flux => JFlux}
import reactor.core.publisher.{Flux => JFlux, ParallelFlux => JParallelFlux}
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier

class SParallelFluxTest extends FreeSpec with Matchers {
"SParallelFlux" - {
val data = Seq(1, 2, 3)
val flux = Flux.just(data.head, data.tail: _*)
val fluxParallel: SParallelFlux[Int] = flux.parallel()
".asJava should convert as Java ParallelFlux" in {
Flux.just(1, 2, 3).parallel().asJava shouldBe a[JParallelFlux[Int]]
fluxParallel.asJava shouldBe a[JParallelFlux[Int]]
}

".apply should convert Java ParallelFlux into SParallelFlux" in {
SParallelFlux(JFlux.just(1, 2, 3).parallel()).asJava shouldBe a[JParallelFlux[Int]]
}

".filter should filter elements" in {
StepVerifier.create(fluxParallel.filter((i: Int) => i % 2 == 0))
.expectNext(2)
.verifyComplete()
}

".map should map from T to U" in {
val expected = data.map(_.toString)
StepVerifier.create(fluxParallel.map(i => i.toString))
.expectNextMatches((i: String) => expected.contains(i))
.expectNextMatches((i: String) => expected.contains(i))
.expectNextMatches((i: String) => expected.contains(i))
.verifyComplete()
}

".reduce should aggregate the values" - {
"without initial supplier" in {
val mono = fluxParallel.reduce(_ + _)
StepVerifier.create(mono)
.expectNext(6)
.verifyComplete()
}
"with initial value should aggregate the values with initial one" ignore {
val parallelFlux = fluxParallel.reduce[String](() => "0", (agg, v) => s"$agg-${v.toString}")
StepVerifier.create(parallelFlux)
.expectNext("0-1")
.expectNext("0-2")
.expectNext("0-3")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.expectNext("0")
.verifyComplete()
}
}

".sequential should merge the rails" in {
val expected = data.map(_.toString)
StepVerifier.create(fluxParallel.map(i => i.toString).sequential())
.expectNextMatches((i: String) => expected.contains(i))
.expectNextMatches((i: String) => expected.contains(i))
.expectNextMatches((i: String) => expected.contains(i))
.verifyComplete()
}

".runOn should run on different thread" in {
val scheduler = spy(Schedulers.parallel())
StepVerifier.create(flux.parallel(2).runOn(scheduler))
.expectNextMatches((i: Int) => data.contains(i))
.expectNextMatches((i: Int) => data.contains(i))
.expectNextMatches((i: Int) => data.contains(i))
.verifyComplete()

verify(scheduler, times(2)).createWorker()
}
}
}

0 comments on commit b2c1a6d

Please sign in to comment.