Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.7.3 #52

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
buildscript {
ext {
// App version
versionName = '1.7.2'
versionName = '1.7.3'
versionCode = 1

// SDK and tools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,22 @@ class SocketService(
fun executeBatchRequest(
rpcRequests: List<RpcRequest>,
deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE,
callback: ResponseListener<List<RpcResponse>>
callback: ResponseListener<RpcResponse>
): Cancellable {
val sendable = BatchSendable(rpcRequests, deliveryType, callback)

updateState(Event.Send(sendable))

return RequestCancellable(sendable)
}

@Synchronized
fun executeAccumulatingBatchRequest(
rpcRequests: List<RpcRequest>,
deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE,
allBatchDoneCallback: ResponseListener<List<RpcResponse>>
): Cancellable {
val callback = AccumulatingBatchRequest(allBatchDoneCallback, rpcRequests.size)
val sendable = BatchSendable(rpcRequests, deliveryType, callback)

updateState(Event.Send(sendable))
Expand Down Expand Up @@ -200,14 +214,10 @@ class SocketService(
logger.log("[STATE MACHINE][SIDE EFFECT] $sideEffect")

when (sideEffect) {
is SideEffect.ResponseToSendable -> respondToSingleRequest(
is SideEffect.ResponseToSendable -> respondToRequest(
sideEffect.sendable,
sideEffect.response
)
is SideEffect.ResponseToBatchSendable -> respondToBatchRequest(
sideEffect.sendable,
sideEffect.responses
)
is SideEffect.RespondSendablesError -> respondError(
sideEffect.sendables,
sideEffect.error
Expand All @@ -224,31 +234,15 @@ class SocketService(
}
}

private fun respondToSingleRequest(
private fun respondToRequest(
sendable: SocketStateMachine.Sendable,
response: RpcResponse
) {
require(sendable is SingleSendable)

sendable.callback.onNext(response)
}

private fun respondToBatchRequest(
sendable: SocketStateMachine.Sendable,
responses: List<RpcResponse>
) {
require(sendable is BatchSendable)

sendable.callback.onNext(responses)
}

private fun respondError(sendables: Set<SocketStateMachine.Sendable>, throwable: Throwable) {
sendables.forEach { sendable ->
when (sendable) {
is SingleSendable -> sendable.callback.onError(throwable)
is BatchSendable -> sendable.callback.onError(throwable)
}
}
sendables.forEach { sendable -> sendable.callback.onError(throwable) }
}

private fun respondToSubscription(
Expand All @@ -260,14 +254,9 @@ class SocketService(
subscription.callback.onNext(response)
}

private fun sendToSocket(sendables: Set<SocketStateMachine.Sendable>) {
private fun sendToSocket(sendables: Collection<SocketStateMachine.Sendable>) {
requestExecutor.execute {
sendables.forEach { sendable ->
when (sendable) {
is SingleSendable -> socket!!.sendRpcRequest(sendable.request)
is BatchSendable -> socket!!.sendBatchRpcRequests(sendable.requests)
}
}
sendables.forEach { sendable -> sendable.sendTo(socket!!) }
}
}

Expand Down Expand Up @@ -366,4 +355,24 @@ class SocketService(
// do nothing
}
}

private class AccumulatingBatchRequest(
private val allDoneCallBack: ResponseListener<List<RpcResponse>>,
private val batchSize: Int
) : ResponseListener<RpcResponse> {

private var arrivedResponses: MutableList<RpcResponse> = ArrayList(batchSize)

override fun onNext(response: RpcResponse) {
arrivedResponses.add(response)

if (arrivedResponses.size == batchSize) {
allDoneCallBack.onNext(arrivedResponses)
}
}

override fun onError(throwable: Throwable) {
allDoneCallBack.onError(throwable)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,43 @@ package jp.co.soramitsu.fearless_utils.wsrpc.request
import jp.co.soramitsu.fearless_utils.wsrpc.SocketService
import jp.co.soramitsu.fearless_utils.wsrpc.request.base.RpcRequest
import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse
import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket
import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine

internal class SingleSendable(
val request: RpcRequest,
override val deliveryType: DeliveryType,
val callback: SocketService.ResponseListener<RpcResponse>
override val callback: SocketService.ResponseListener<RpcResponse>
) : SocketStateMachine.Sendable {
override val id: Int = request.id

override val numberOfNeededResponses: Int = 1

override fun relatesTo(id: Int): Boolean = request.id == id

override fun sendTo(rpcSocket: RpcSocket) {
rpcSocket.sendRpcRequest(request)
}

override fun toString(): String {
return "Sendable($id)"
return "Sendable(${request.id})"
}
}

internal class BatchSendable(
val requests: List<RpcRequest>,
override val deliveryType: DeliveryType,
val callback: SocketService.ResponseListener<List<RpcResponse>>
override val callback: SocketService.ResponseListener<RpcResponse>
) : SocketStateMachine.Sendable {

override val id: Int = requests.first().id
override val numberOfNeededResponses: Int = requests.size

private val ids = requests.mapTo(mutableSetOf(), RpcRequest::id)

override fun relatesTo(id: Int): Boolean = id in ids

override fun sendTo(rpcSocket: RpcSocket) {
rpcSocket.sendBatchRpcRequests(requests)
}

override fun toString(): String {
val jointIds = requests.joinToString { it.id.toString() }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package jp.co.soramitsu.fearless_utils.wsrpc.state

import jp.co.soramitsu.fearless_utils.wsrpc.SocketService
import jp.co.soramitsu.fearless_utils.wsrpc.request.DeliveryType
import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse
import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket
import jp.co.soramitsu.fearless_utils.wsrpc.subscription.response.SubscriptionChange

typealias Transition = Pair<SocketStateMachine.State, List<SocketStateMachine.SideEffect>>

private typealias ResponseCounter = Int

object SocketStateMachine {

interface Sendable {
val id: Int

val numberOfNeededResponses: Int

fun relatesTo(id: Int): Boolean

val deliveryType: DeliveryType

val callback: SocketService.ResponseListener<RpcResponse>

fun sendTo(rpcSocket: RpcSocket)
}

interface Subscription {

val id: String

val initiatorId: Int
Expand All @@ -40,7 +52,7 @@ object SocketStateMachine {
val url: String,
internal val toResendOnReconnect: Set<Sendable>,
internal val unknownSubscriptionResponses: Map<String, SubscriptionChange>,
internal val waitingForResponse: Set<Sendable>,
internal val waitingForResponse: Map<Sendable, ResponseCounter>,
internal val subscriptions: Set<Subscription>
) : State()

Expand All @@ -61,10 +73,7 @@ object SocketStateMachine {

data class SendableResponse(val response: RpcResponse) : Event()

data class SendableBatchResponse(val responses: List<RpcResponse>) : Event() {

val responseId = responses.first().id
}
data class SendableBatchResponse(val responses: List<RpcResponse>) : Event()

data class Subscribed(val subscription: Subscription) : Event()

Expand Down Expand Up @@ -94,11 +103,6 @@ object SocketStateMachine {
data class ResponseToSendable(val sendable: Sendable, val response: RpcResponse) :
SideEffect()

data class ResponseToBatchSendable(
val sendable: Sendable,
val responses: List<RpcResponse>
) : SideEffect()

/**
* For [DeliveryType.AT_MOST_ONCE] errors
*/
Expand Down Expand Up @@ -190,7 +194,7 @@ object SocketStateMachine {
toResendOnReconnect = state.pendingSendables.filterByDeliveryType(
DeliveryType.ON_RECONNECT
),
waitingForResponse = state.pendingSendables,
waitingForResponse = state.pendingSendables.withCounter(),
subscriptions = emptySet(),
unknownSubscriptionResponses = emptyMap()
)
Expand Down Expand Up @@ -226,7 +230,7 @@ object SocketStateMachine {

state.copy(
toResendOnReconnect = toResendOnReconnect,
waitingForResponse = state.waitingForResponse + sendable
waitingForResponse = state.waitingForResponse.add(sendable)
)
}

Expand All @@ -250,30 +254,50 @@ object SocketStateMachine {
}

is Event.SendableResponse -> {
val sendable = findSendableById(state.waitingForResponse, event.response.id)
val entry = findSendableById(state.waitingForResponse, event.response.id)

if (sendable != null) {
if (entry != null) {
val (sendable, counter) = entry
sideEffects += SideEffect.ResponseToSendable(sendable, event.response)

state.copy(waitingForResponse = state.waitingForResponse - sendable)
val newCounter = counter + 1

state.copy(
waitingForResponse = state.waitingForResponse.keepUntilThreshold(
key = sendable,
newValue = newCounter,
threshold = sendable.numberOfNeededResponses
)
)
} else {
state
}
}

is Event.SendableBatchResponse -> {
val sendable = findSendableById(state.waitingForResponse, event.responseId)
val newWaitingForResponse = state.waitingForResponse.toMutableMap()

if (sendable != null) {
sideEffects += SideEffect.ResponseToBatchSendable(
sendable = sendable,
responses = event.responses
)
event.responses.forEach { response ->
val entry = findSendableById(state.waitingForResponse, response.id)

state.copy(waitingForResponse = state.waitingForResponse - sendable)
} else {
state
if (entry != null) {
val (sendable, counter) = entry
val newCounter = counter + 1

sideEffects += SideEffect.ResponseToSendable(
sendable = sendable,
response = response
)

newWaitingForResponse.keepUntilThreshold(
key = sendable,
newValue = newCounter,
threshold = sendable.numberOfNeededResponses
)
}
}

state.copy(waitingForResponse = newWaitingForResponse)
}

is Event.Subscribed -> {
Expand Down Expand Up @@ -392,12 +416,41 @@ object SocketStateMachine {
return Transition(newState, sideEffects)
}

private fun MutableMap<Sendable, ResponseCounter>.keepUntilThreshold(
key: Sendable,
newValue: ResponseCounter,
threshold: ResponseCounter
) {
if (newValue >= threshold) {
minusAssign(key)
} else {
plusAssign(key to newValue)
}
}

// we keep separate method for read-only `minus` since it optimizes result internally
private fun Map<Sendable, ResponseCounter>.keepUntilThreshold(
key: Sendable,
newValue: ResponseCounter,
threshold: ResponseCounter
): Map<Sendable, ResponseCounter> {
return if (newValue >= threshold) {
minus(key)
} else {
plus(key to newValue)
}
}

private fun Map<Sendable, ResponseCounter>.add(sendable: Sendable) = plus(sendable to 0)

private fun getRequestsToResendAndReportErrorToOthers(
state: State.Connected,
mutableSideEffects: MutableList<SideEffect>,
error: Throwable
): Set<Sendable> {
val toReportError = state.waitingForResponse.filterByDeliveryType(DeliveryType.AT_MOST_ONCE)
val waitingSendables = state.waitingForResponse.keys

val toReportError = waitingSendables.filterByDeliveryType(DeliveryType.AT_MOST_ONCE)

if (toReportError.isNotEmpty()) {
mutableSideEffects += SideEffect.RespondSendablesError(
Expand All @@ -406,16 +459,17 @@ object SocketStateMachine {
)
}

return state.waitingForResponse - toReportError + state.toResendOnReconnect
return waitingSendables - toReportError + state.toResendOnReconnect
}

private fun findSubscriptionById(subscriptions: Set<Subscription>, id: String) =
subscriptions.find { it.id == id }

private fun findSendableById(sendables: Set<Sendable>, id: Int) = sendables.find { it.id == id }
private fun findSendableById(sendables: Map<Sendable, ResponseCounter>, id: Int) =
sendables.entries.find { (sendable, _) -> sendable.relatesTo(id) }

private fun findSubscriptionByInitiator(subscriptions: Set<Subscription>, initiator: Sendable) =
subscriptions.find { it.initiatorId == initiator.id }
subscriptions.find { initiator.relatesTo(it.initiatorId) }

private fun handleStop(sideEffects: MutableList<SideEffect>): State {
sideEffects += SideEffect.Disconnect
Expand All @@ -436,3 +490,5 @@ object SocketStateMachine {
it.deliveryType == deliveryType
}
}

internal fun Set<SocketStateMachine.Sendable>.withCounter() = associateWith { 0 }
Loading