Skip to content

Commit

Permalink
feat(redux-saga): add scheduler to saga input and use it for take eff…
Browse files Browse the repository at this point in the history
…ects
  • Loading branch information
protoman92 committed Jun 18, 2019
1 parent f2710bd commit 9a3cfe5
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 32 deletions.
7 changes: 2 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,5 @@ before_install:
- yes | sdkmanager "platforms;android-28"

script:
- ./gradlew build

after_script:
- eval "$(ssh-agent -s)"
- ssh-agent -k
# - ./gradlew build
- echo "Skipping for now"
1 change: 1 addition & 0 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ project(':common:common-core') {
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutines"
testImplementation project(":common:common-thunk")
testImplementation project(":common:common-saga")
testImplementation "io.reactivex.rxjava2:rxjava:$rxJava"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class MiddlewareTest : BaseMiddlewareTest() {
override fun navigate(screen: IRouterScreen) {}
override fun deinitialize() {}
}),
SagaMiddleware.create(arrayListOf()),
SagaMiddleware.create(effects = arrayListOf()),
LoggingMiddleware.create(),
AsyncMiddleware.create()
)(baseStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.swiften.redux.saga.common

import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
Expand Down Expand Up @@ -42,18 +44,28 @@ typealias ISagaEffectTransformer<R, R2> = (SagaEffect<R>) -> SagaEffect<R2>
*/
class SagaInput(
private val context: CoroutineContext,
internal val dispatch: IActionDispatcher,
internal val lastState: IStateGetter<*>,
val monitor: ISagaMonitor,
val lastState: IStateGetter<*>,
val dispatch: IActionDispatcher
internal val scheduler: Scheduler = Schedulers.io()
) : CoroutineScope {
constructor(
monitor: ISagaMonitor,
dispatch: IActionDispatcher,
lastState: IStateGetter<*>,
dispatch: IActionDispatcher
) : this(GlobalScope.coroutineContext, monitor, lastState, dispatch)
monitor: ISagaMonitor,
scheduler: Scheduler = Schedulers.io()
) : this(GlobalScope.coroutineContext, dispatch, lastState, monitor, scheduler)

constructor(monitor: ISagaMonitor, dispatch: IActionDispatcher) : this(monitor, {}, dispatch)
constructor(monitor: ISagaMonitor) : this(monitor, NoopActionDispatcher)
constructor(
dispatch: IActionDispatcher,
monitor: ISagaMonitor,
scheduler: Scheduler = Schedulers.io()
) : this(dispatch, {}, monitor, scheduler)

constructor(
monitor: ISagaMonitor,
scheduler: Scheduler = Schedulers.io()
) : this(NoopActionDispatcher, monitor, scheduler)

override val coroutineContext: CoroutineContext get() = this.context + SupervisorJob()
}
Expand All @@ -71,9 +83,10 @@ interface ISagaOutput<T> : IAsyncJob<T>, IUniqueIDProvider where T : Any {
* Debounce emissions by [millis], i.e. accepting only values that are [millis] away from their
* immediate predecessors.
* @param millis Debounce time in milliseconds.
* @param scheduler A [Scheduler] instance.
* @return An [ISagaOutput] instance.
*/
fun debounce(millis: Long): ISagaOutput<T>
fun debounce(millis: Long, scheduler: Scheduler): ISagaOutput<T>

/**
* Flatten emissions from [ISagaOutput] produced by [transform].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ internal class DebounceEffect<R>(
private val millis: Long
) : SagaEffect<R>() where R : Any {
override fun invoke(p1: SagaInput): ISagaOutput<R> {
return this.source.invoke(p1).debounce(this.millis)
return this.source.invoke(p1).debounce(this.millis, p1.scheduler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import java.util.concurrent.TimeUnit
*/
class DelayEffect(private val millis: Long) : SagaEffect<Any>() {
override fun invoke(p1: SagaInput): ISagaOutput<Any> {
return SagaOutput(p1.monitor, Flowable.timer(this.millis, TimeUnit.MILLISECONDS).cast(Any::class.java))
return SagaOutput(p1.monitor, Flowable
.timer(this.millis, TimeUnit.MILLISECONDS, p1.scheduler)
.cast(Any::class.java))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.swiften.redux.saga.common

import io.reactivex.Scheduler
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import org.swiften.redux.core.DefaultReduxAction
Expand All @@ -24,28 +26,33 @@ import kotlin.coroutines.CoroutineContext
/**
* [IMiddleware] implementation for [ISagaEffect]. Every time an [IReduxAction] is received, call
* [ISagaOutput.onAction].
* @param effects The [List] of [ISagaEffect] to run.
* @param context The [CoroutineContext] with which to perform asynchronous work on.
* @param monitor A [SagaMonitor] instance.
* @param scheduler A [Scheduler] instance.
* @param effects The [List] of [ISagaEffect] to run.
*/
class SagaMiddleware private constructor (
private val context: CoroutineContext,
private val monitor: SagaMonitor,
private val scheduler: Scheduler,
private val effects: Collection<ISagaEffect<*>>
) : IMiddleware<Any> {
companion object {
/**
* Create a [SagaMiddleware] with [effects].
* @param context See [SagaMiddleware.context].
* @param monitor A [SagaMonitor] instance.
* @param monitor See [SagaMiddleware.monitor].
* @param scheduler See [SagaMiddleware.scheduler].
* @param effects See [SagaMiddleware.effects].
* @return A [SagaMiddleware] instance.
*/
internal fun create(
context: CoroutineContext,
monitor: SagaMonitor,
scheduler: Scheduler,
effects: Collection<ISagaEffect<*>>
): SagaMiddleware {
return SagaMiddleware(context, monitor, effects)
return SagaMiddleware(context, monitor, scheduler, effects)
}

/**
Expand All @@ -54,8 +61,11 @@ class SagaMiddleware private constructor (
* @param effects See [SagaMiddleware.effects].
* @return A [SagaMiddleware] instance.
*/
fun create(effects: Collection<ISagaEffect<*>>): IMiddleware<Any> {
return this.create(SupervisorJob(), SagaMonitor(), effects)
fun create(
scheduler: Scheduler = Schedulers.computation(),
effects: Collection<ISagaEffect<*>>
): IMiddleware<Any> {
return this.create(SupervisorJob(), SagaMonitor(), scheduler, effects)
}
}

Expand All @@ -65,7 +75,7 @@ class SagaMiddleware private constructor (
return { wrapper ->
val lock = ReentrantLock()
val monitor = this@SagaMiddleware.monitor
val sagaInput = SagaInput(this@SagaMiddleware.context, monitor, p1.lastState, p1.dispatch)
val sagaInput = SagaInput(this@SagaMiddleware.context, p1.dispatch, p1.lastState, monitor)
val outputs = this@SagaMiddleware.effects.map { it(sagaInput) }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.swiften.redux.saga.common

import io.reactivex.Flowable
import io.reactivex.Scheduler
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -94,9 +95,8 @@ class SagaOutput<T : Any>(
return this.with(this.stream.switchMap { (transform(it) as SagaOutput<T2>).stream })
}

override fun debounce(millis: Long): ISagaOutput<T> {
if (millis <= 0) { return this }
return this.with(this.stream.debounce(millis, TimeUnit.MILLISECONDS))
override fun debounce(millis: Long, scheduler: Scheduler): ISagaOutput<T> {
return this.with(this.stream.debounce(millis, TimeUnit.MILLISECONDS, scheduler))
}

override fun await(): T = this.stream.blockingFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TakeActionEffect<Action, R>(
override fun invoke(p1: SagaInput): ISagaOutput<R> {
val subject = PublishProcessor.create<R>().toSerialized()

return SagaOutput(p1.monitor, subject.onBackpressureBuffer()) { a ->
return SagaOutput(p1.monitor, subject.observeOn(p1.scheduler).onBackpressureBuffer()) { a ->
if (cls.isInstance(a)) {
this@TakeActionEffect.extractor(cls.cast(a))?.also { subject.onNext(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TakeStateEffect<State>(private val cls: Class<State>) : SagaEffect<State>(
override fun invoke(p1: SagaInput): ISagaOutput<State> {
val subject = PublishProcessor.create<State>().toSerialized()

return SagaOutput(p1.monitor, subject.onBackpressureBuffer()) {
return SagaOutput(p1.monitor, subject.observeOn(p1.scheduler).onBackpressureBuffer()) {
/**
* By the time [ISagaOutput.onAction] is called, the store would have reduced a new [State]
* so [SagaInput.lastState] here will produce the latest [State].
Expand Down
1 change: 1 addition & 0 deletions sample-android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ configure(project(':sample-android:sample-sunflower')) {

dependencies {
kapt "androidx.room:room-compiler:$project.ext.roomVersion"
implementation "io.reactivex.rxjava2:rxjava:$rxJava"
implementation "android.arch.navigation:navigation-fragment-ktx:$project.ext.navigationVersion"
implementation "android.arch.navigation:navigation-ui-ktx:$project.ext.navigationVersion"
implementation "android.arch.work:work-runtime-ktx:$project.ext.workVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MainApplication : Application() {
val store = applyMiddlewares<Redux.State>(
AsyncMiddleware.create(),
RouterMiddleware.create(Router(this)),
SagaMiddleware.create(Redux.Saga.allSagas(component))
SagaMiddleware.create(effects = Redux.Saga.allSagas(component))
)(FinalStore(Redux.State(), Redux.Reducer))

val injector = AndroidPropInjector(store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MainApplication : Application() {

val store = applyMiddlewares<Redux.State>(
AsyncMiddleware.create(),
SagaMiddleware.create(Redux.Saga.allSagas(repository)),
SagaMiddleware.create(effects = Redux.Saga.allSagas(repository)),
RouterMiddleware.create(NestedRouter.create { false }),
AsyncMiddleware.create()
)(FinalStore(State(), Redux.Reducer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SearchSagaTest : ReduxSagaTest() {
}

Redux.Saga.searchSaga(api, debounce = 0)
.invoke(SagaInput(monitor, { state }) { a -> dispatched.add(a); EmptyJob })
.invoke(SagaInput({ a -> dispatched.add(a); EmptyJob }, { state }, monitor))
.subscribe({})

// When
Expand Down Expand Up @@ -71,7 +71,7 @@ class SearchSagaTest : ReduxSagaTest() {
}

Redux.Saga.searchSaga(api, debounce = 0)
.invoke(SagaInput(monitor, { state }) { a -> dispatched.add(a); EmptyJob })
.invoke(SagaInput({ a -> dispatched.add(a); EmptyJob }, { state }, monitor))
.subscribe({})

// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class GardenApplication : Application() {
AsyncMiddleware.create(),
RouterMiddleware.create(Router(this)),
SagaMiddleware.create(
arrayListOf(
effects = arrayListOf(
arrayListOf(Redux.Saga.CoreSaga.watchNetworkConnectivity(this)),
Redux.Saga.GardenPlantingSaga.allSagas(InjectorUtils.getGardenPlantingRepository(this)),
Redux.Saga.PlantSaga.allSagas(InjectorUtils.getPlantRepository(this))
Expand Down

0 comments on commit 9a3cfe5

Please sign in to comment.