Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transaction batcher module #1348

Merged
merged 25 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions executions/graphql-kotlin-transaction-batcher/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
description = "Transaction Batcher"

val reactiveStreamsVersion: String by project
val junitVersion: String by project
val slf4jVersion: String by project
val mockkVersion: String by project
val reactorVersion: String by project
val reactorExtensionsVersion: String by project

dependencies {
implementation("org.reactivestreams:reactive-streams:$reactiveStreamsVersion")
implementation("org.slf4j:slf4j-api:$slf4jVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
testImplementation("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
testImplementation("io.mockk:mockk:$mockkVersion")
testImplementation("io.projectreactor:reactor-core:$reactorVersion")
testImplementation("io.projectreactor.kotlin:reactor-kotlin-extensions:$reactorExtensionsVersion")
testImplementation("io.projectreactor:reactor-test:$reactorVersion")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.publisher

import com.expediagroup.graphql.transactionbatcher.transaction.BatcheableTransaction
import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcherCache
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

/**
* Interface representing a publisher with input [TInput] type and output [TOutput] type
*/
@Suppress(
"ReactiveStreamsSubscriberImplementation",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we suppressing this warning? is this valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is valid, given that Subscriber is an interface, so it's valid to implement it, However a warning appears in the IDE when trying to do it.

i can see a lot of Subscriber implementations that need to add that suppress as well
https://github.com/search?p=2&q=%22ReactiveStreamsSubscriberImplementation%22&type=Code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure, I am not getting any warning in my local

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

"UNCHECKED_CAST"
)
interface TriggeredPublisher<TInput, TOutput> {
/**
* Given an input of type [TInput] create a cold [Publisher] that will produce a [Publisher] of type [TOutput] of n elements
* that maps to the size of the input [List] of [TInput]
* order is important so make sure to produce elements in the same order of [input]
*/
fun produce(input: List<TInput>): Publisher<TOutput>

/**
* Attempts to collect values from [cache] first and then [produce]
*
* Example:
* if [TriggeredPublisher] is of type <Int, Int> and [cache] resolves [1, null, 3, null, 5, 6]
* we will attempt to produce elements for index 1 and 3
* when [produce] stream completes we will complete futures from either values resolved from [cache] or from [produce]
*/
fun trigger(
batcheableTransactions: List<BatcheableTransaction<TInput, TOutput>>,
cache: TransactionBatcherCache
) {

val values = batcheableTransactions.map { batcheableTransaction ->
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
cache.get(batcheableTransaction.key)
}

val transactionsNotInCache = values.mapIndexedNotNull { index, value ->
when (value) {
null -> batcheableTransactions.getOrNull(index)
else -> null
}
}

produce(
transactionsNotInCache.map(BatcheableTransaction<TInput, TOutput>::input)
).subscribe(
object : Subscriber<TOutput> {
private lateinit var subscription: Subscription
private val results = mutableListOf<TOutput>()

override fun onSubscribe(subscription: Subscription) {
this.subscription = subscription
this.subscription.request(1)
}

override fun onNext(result: TOutput) {
results += result
this.subscription.request(1)
}

override fun onError(throwable: Throwable) {
throwable.printStackTrace()
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
}

override fun onComplete() {
var resultsCounter = 0
values.forEachIndexed { index, value ->
value?.let {
batcheableTransactions[index].future.complete(value as TOutput)
} ?: run {
val result = results[resultsCounter++]
cache.set(batcheableTransactions[index].key, result as Any)
batcheableTransactions[index].future.complete(result)
}
}
}
}
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import java.util.concurrent.CompletableFuture

/**
* convenient class to store the reference of a [future] of type [TOutput]
* that will be resolved asynchronously at later point in time by using [input] as source
* it supports deduplication by using the [key] field
*/
data class BatcheableTransaction<TInput, TOutput>(
val input: TInput,
val future: CompletableFuture<TOutput>,
val key: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import java.util.concurrent.ConcurrentHashMap

/**
* Default implementation of [TransactionBatcherCache] using an in memory [cache]
* without eviction
*/
class DefaultTransactionBatcherCache : TransactionBatcherCache {
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
private val cache = ConcurrentHashMap<String, Any>()

override fun set(key: String, value: Any) {
cache[key] = value
}

override fun get(key: String): Any? = cache[key]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap

/**
* Type for [TransactionBatcher.queue] value, storing the [triggeredPublisher] instance
* and list of [transactions] that need to be executed by it
*/
data class TransactionBatcherQueueValue(
val triggeredPublisher: TriggeredPublisher<Any, Any>,
val transactions: MutableList<BatcheableTransaction<Any, Any>>
)

/**
* Holds logic to apply batching, deduplication and caching of [BatcheableTransaction]
* if no [TransactionBatcherCache] implementation is provided it will use [DefaultTransactionBatcherCache]
*/
class TransactionBatcher(
private val cache: TransactionBatcherCache = DefaultTransactionBatcherCache()
) {

private val queue = ConcurrentHashMap<
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
Class<out TriggeredPublisher<Any, Any>>,
TransactionBatcherQueueValue
>()

/**
* enqueue a transaction [input] along with the [triggeredPublisher] instance that will receive the [BatcheableTransaction]
* deduplication will be based on [key] which by default is the toString() representation of [input]
* batching will be based on the implementation of [TriggeredPublisher]
* this method returns a reference to a [CompletableFuture] which is a field of the [BatcheableTransaction] that was just
* added into the queue
*/
@Suppress("UNCHECKED_CAST")
fun <TInput : Any, TOutput : Any> enqueue(
input: TInput,
triggeredPublisher: TriggeredPublisher<TInput, TOutput>,
key: String = input.toString()
): CompletableFuture<TOutput> {
val queueKey = (triggeredPublisher as TriggeredPublisher<Any, Any>)::class.java
return queue[queueKey]?.let { (_, batcheableTransactions) ->
batcheableTransactions
.find { transaction -> transaction.key == key }
?.let { match -> match.future as CompletableFuture<TOutput> }
?: run {
val future = CompletableFuture<TOutput>()
batcheableTransactions.add(
BatcheableTransaction(input, future as CompletableFuture<Any>, key)
)
future
}
} ?: run {
val future = CompletableFuture<TOutput>()
queue[queueKey] = TransactionBatcherQueueValue(
triggeredPublisher,
mutableListOf(
BatcheableTransaction(input, future as CompletableFuture<Any>, key)
)
)
future
}
}

/**
* Trigger concurrently and asynchronously the instances of [TriggeredPublisher] that the [queue] holds
* at the end clear the queue
*/
fun dispatch() {
queue.values.forEach { (triggeredPublisher, transactions) ->
triggeredPublisher.trigger(transactions, cache)
}
queue.clear()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

/**
* Interface that allows any cache implementation that can be used by the [TransactionBatcher]
* by default will use [DefaultTransactionBatcherCache]
*/
interface TransactionBatcherCache {
fun set(key: String, value: Any)
fun get(key: String): Any?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.publisher

import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

data class AstronautServiceRequest(val id: Int)
data class Astronaut(val id: Int, val name: String)

class AstronautService(
private val transactionBatcher: TransactionBatcher
) {

val produceArguments: MutableList<List<AstronautServiceRequest>> = mutableListOf()
val getAstronautCallCount: AtomicInteger = AtomicInteger(0)
private val astronautsPublisher = object : TriggeredPublisher<AstronautServiceRequest, Astronaut> {
override fun produce(input: List<AstronautServiceRequest>): Publisher<Astronaut> {
produceArguments.add(input)
return this@AstronautService.getAstronauts(input)
}
}

companion object {
samuelAndalon marked this conversation as resolved.
Show resolved Hide resolved
private val astronauts = mapOf(
1 to Pair(Astronaut(1, "Buzz Aldrin"), Duration.ofMillis(300)),
2 to Pair(Astronaut(2, "William Anders"), Duration.ofMillis(600)),
3 to Pair(Astronaut(3, "Neil Armstrong"), Duration.ofMillis(200))
)
}

fun getAstronaut(request: AstronautServiceRequest): Mono<Astronaut> {
getAstronautCallCount.incrementAndGet()
val future = this.transactionBatcher.enqueue(request, astronautsPublisher)
return future.toMono()
}

fun getAstronauts(input: List<AstronautServiceRequest>): Publisher<Astronaut> =
input.toFlux()
.flatMapSequential { request ->
{ astronauts[request.id] }
.toMono()
.flatMap { (astronaut, delay) ->
astronaut.toMono().delayElement(delay)
}
}
}
Loading