Skip to content

Commit

Permalink
Merge pull request #52 from nova-wallet/develop
Browse files Browse the repository at this point in the history
v1.7.3
  • Loading branch information
valentunn authored Mar 16, 2023
2 parents 4dc5207 + ba146b2 commit 0de27ac
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 74 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
buildscript {
ext {
// App version
versionName = '1.7.2'
versionName = '1.7.3'
versionCode = 1

// SDK and tools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,22 @@ class SocketService(
fun executeBatchRequest(
rpcRequests: List<RpcRequest>,
deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE,
callback: ResponseListener<List<RpcResponse>>
callback: ResponseListener<RpcResponse>
): Cancellable {
val sendable = BatchSendable(rpcRequests, deliveryType, callback)

updateState(Event.Send(sendable))

return RequestCancellable(sendable)
}

@Synchronized
fun executeAccumulatingBatchRequest(
rpcRequests: List<RpcRequest>,
deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE,
allBatchDoneCallback: ResponseListener<List<RpcResponse>>
): Cancellable {
val callback = AccumulatingBatchRequest(allBatchDoneCallback, rpcRequests.size)
val sendable = BatchSendable(rpcRequests, deliveryType, callback)

updateState(Event.Send(sendable))
Expand Down Expand Up @@ -200,14 +214,10 @@ class SocketService(
logger.log("[STATE MACHINE][SIDE EFFECT] $sideEffect")

when (sideEffect) {
is SideEffect.ResponseToSendable -> respondToSingleRequest(
is SideEffect.ResponseToSendable -> respondToRequest(
sideEffect.sendable,
sideEffect.response
)
is SideEffect.ResponseToBatchSendable -> respondToBatchRequest(
sideEffect.sendable,
sideEffect.responses
)
is SideEffect.RespondSendablesError -> respondError(
sideEffect.sendables,
sideEffect.error
Expand All @@ -224,31 +234,15 @@ class SocketService(
}
}

private fun respondToSingleRequest(
private fun respondToRequest(
sendable: SocketStateMachine.Sendable,
response: RpcResponse
) {
require(sendable is SingleSendable)

sendable.callback.onNext(response)
}

private fun respondToBatchRequest(
sendable: SocketStateMachine.Sendable,
responses: List<RpcResponse>
) {
require(sendable is BatchSendable)

sendable.callback.onNext(responses)
}

private fun respondError(sendables: Set<SocketStateMachine.Sendable>, throwable: Throwable) {
sendables.forEach { sendable ->
when (sendable) {
is SingleSendable -> sendable.callback.onError(throwable)
is BatchSendable -> sendable.callback.onError(throwable)
}
}
sendables.forEach { sendable -> sendable.callback.onError(throwable) }
}

private fun respondToSubscription(
Expand All @@ -260,14 +254,9 @@ class SocketService(
subscription.callback.onNext(response)
}

private fun sendToSocket(sendables: Set<SocketStateMachine.Sendable>) {
private fun sendToSocket(sendables: Collection<SocketStateMachine.Sendable>) {
requestExecutor.execute {
sendables.forEach { sendable ->
when (sendable) {
is SingleSendable -> socket!!.sendRpcRequest(sendable.request)
is BatchSendable -> socket!!.sendBatchRpcRequests(sendable.requests)
}
}
sendables.forEach { sendable -> sendable.sendTo(socket!!) }
}
}

Expand Down Expand Up @@ -366,4 +355,24 @@ class SocketService(
// do nothing
}
}

private class AccumulatingBatchRequest(
private val allDoneCallBack: ResponseListener<List<RpcResponse>>,
private val batchSize: Int
) : ResponseListener<RpcResponse> {

private var arrivedResponses: MutableList<RpcResponse> = ArrayList(batchSize)

override fun onNext(response: RpcResponse) {
arrivedResponses.add(response)

if (arrivedResponses.size == batchSize) {
allDoneCallBack.onNext(arrivedResponses)
}
}

override fun onError(throwable: Throwable) {
allDoneCallBack.onError(throwable)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,43 @@ package jp.co.soramitsu.fearless_utils.wsrpc.request
import jp.co.soramitsu.fearless_utils.wsrpc.SocketService
import jp.co.soramitsu.fearless_utils.wsrpc.request.base.RpcRequest
import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse
import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket
import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine

internal class SingleSendable(
val request: RpcRequest,
override val deliveryType: DeliveryType,
val callback: SocketService.ResponseListener<RpcResponse>
override val callback: SocketService.ResponseListener<RpcResponse>
) : SocketStateMachine.Sendable {
override val id: Int = request.id

override val numberOfNeededResponses: Int = 1

override fun relatesTo(id: Int): Boolean = request.id == id

override fun sendTo(rpcSocket: RpcSocket) {
rpcSocket.sendRpcRequest(request)
}

override fun toString(): String {
return "Sendable($id)"
return "Sendable(${request.id})"
}
}

internal class BatchSendable(
val requests: List<RpcRequest>,
override val deliveryType: DeliveryType,
val callback: SocketService.ResponseListener<List<RpcResponse>>
override val callback: SocketService.ResponseListener<RpcResponse>
) : SocketStateMachine.Sendable {

override val id: Int = requests.first().id
override val numberOfNeededResponses: Int = requests.size

private val ids = requests.mapTo(mutableSetOf(), RpcRequest::id)

override fun relatesTo(id: Int): Boolean = id in ids

override fun sendTo(rpcSocket: RpcSocket) {
rpcSocket.sendBatchRpcRequests(requests)
}

override fun toString(): String {
val jointIds = requests.joinToString { it.id.toString() }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package jp.co.soramitsu.fearless_utils.wsrpc.state

import jp.co.soramitsu.fearless_utils.wsrpc.SocketService
import jp.co.soramitsu.fearless_utils.wsrpc.request.DeliveryType
import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse
import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket
import jp.co.soramitsu.fearless_utils.wsrpc.subscription.response.SubscriptionChange

typealias Transition = Pair<SocketStateMachine.State, List<SocketStateMachine.SideEffect>>

private typealias ResponseCounter = Int

object SocketStateMachine {

interface Sendable {
val id: Int

val numberOfNeededResponses: Int

fun relatesTo(id: Int): Boolean

val deliveryType: DeliveryType

val callback: SocketService.ResponseListener<RpcResponse>

fun sendTo(rpcSocket: RpcSocket)
}

interface Subscription {

val id: String

val initiatorId: Int
Expand All @@ -40,7 +52,7 @@ object SocketStateMachine {
val url: String,
internal val toResendOnReconnect: Set<Sendable>,
internal val unknownSubscriptionResponses: Map<String, SubscriptionChange>,
internal val waitingForResponse: Set<Sendable>,
internal val waitingForResponse: Map<Sendable, ResponseCounter>,
internal val subscriptions: Set<Subscription>
) : State()

Expand All @@ -61,10 +73,7 @@ object SocketStateMachine {

data class SendableResponse(val response: RpcResponse) : Event()

data class SendableBatchResponse(val responses: List<RpcResponse>) : Event() {

val responseId = responses.first().id
}
data class SendableBatchResponse(val responses: List<RpcResponse>) : Event()

data class Subscribed(val subscription: Subscription) : Event()

Expand Down Expand Up @@ -94,11 +103,6 @@ object SocketStateMachine {
data class ResponseToSendable(val sendable: Sendable, val response: RpcResponse) :
SideEffect()

data class ResponseToBatchSendable(
val sendable: Sendable,
val responses: List<RpcResponse>
) : SideEffect()

/**
* For [DeliveryType.AT_MOST_ONCE] errors
*/
Expand Down Expand Up @@ -190,7 +194,7 @@ object SocketStateMachine {
toResendOnReconnect = state.pendingSendables.filterByDeliveryType(
DeliveryType.ON_RECONNECT
),
waitingForResponse = state.pendingSendables,
waitingForResponse = state.pendingSendables.withCounter(),
subscriptions = emptySet(),
unknownSubscriptionResponses = emptyMap()
)
Expand Down Expand Up @@ -226,7 +230,7 @@ object SocketStateMachine {

state.copy(
toResendOnReconnect = toResendOnReconnect,
waitingForResponse = state.waitingForResponse + sendable
waitingForResponse = state.waitingForResponse.add(sendable)
)
}

Expand All @@ -250,30 +254,50 @@ object SocketStateMachine {
}

is Event.SendableResponse -> {
val sendable = findSendableById(state.waitingForResponse, event.response.id)
val entry = findSendableById(state.waitingForResponse, event.response.id)

if (sendable != null) {
if (entry != null) {
val (sendable, counter) = entry
sideEffects += SideEffect.ResponseToSendable(sendable, event.response)

state.copy(waitingForResponse = state.waitingForResponse - sendable)
val newCounter = counter + 1

state.copy(
waitingForResponse = state.waitingForResponse.keepUntilThreshold(
key = sendable,
newValue = newCounter,
threshold = sendable.numberOfNeededResponses
)
)
} else {
state
}
}

is Event.SendableBatchResponse -> {
val sendable = findSendableById(state.waitingForResponse, event.responseId)
val newWaitingForResponse = state.waitingForResponse.toMutableMap()

if (sendable != null) {
sideEffects += SideEffect.ResponseToBatchSendable(
sendable = sendable,
responses = event.responses
)
event.responses.forEach { response ->
val entry = findSendableById(state.waitingForResponse, response.id)

state.copy(waitingForResponse = state.waitingForResponse - sendable)
} else {
state
if (entry != null) {
val (sendable, counter) = entry
val newCounter = counter + 1

sideEffects += SideEffect.ResponseToSendable(
sendable = sendable,
response = response
)

newWaitingForResponse.keepUntilThreshold(
key = sendable,
newValue = newCounter,
threshold = sendable.numberOfNeededResponses
)
}
}

state.copy(waitingForResponse = newWaitingForResponse)
}

is Event.Subscribed -> {
Expand Down Expand Up @@ -392,12 +416,41 @@ object SocketStateMachine {
return Transition(newState, sideEffects)
}

private fun MutableMap<Sendable, ResponseCounter>.keepUntilThreshold(
key: Sendable,
newValue: ResponseCounter,
threshold: ResponseCounter
) {
if (newValue >= threshold) {
minusAssign(key)
} else {
plusAssign(key to newValue)
}
}

// we keep separate method for read-only `minus` since it optimizes result internally
private fun Map<Sendable, ResponseCounter>.keepUntilThreshold(
key: Sendable,
newValue: ResponseCounter,
threshold: ResponseCounter
): Map<Sendable, ResponseCounter> {
return if (newValue >= threshold) {
minus(key)
} else {
plus(key to newValue)
}
}

private fun Map<Sendable, ResponseCounter>.add(sendable: Sendable) = plus(sendable to 0)

private fun getRequestsToResendAndReportErrorToOthers(
state: State.Connected,
mutableSideEffects: MutableList<SideEffect>,
error: Throwable
): Set<Sendable> {
val toReportError = state.waitingForResponse.filterByDeliveryType(DeliveryType.AT_MOST_ONCE)
val waitingSendables = state.waitingForResponse.keys

val toReportError = waitingSendables.filterByDeliveryType(DeliveryType.AT_MOST_ONCE)

if (toReportError.isNotEmpty()) {
mutableSideEffects += SideEffect.RespondSendablesError(
Expand All @@ -406,16 +459,17 @@ object SocketStateMachine {
)
}

return state.waitingForResponse - toReportError + state.toResendOnReconnect
return waitingSendables - toReportError + state.toResendOnReconnect
}

private fun findSubscriptionById(subscriptions: Set<Subscription>, id: String) =
subscriptions.find { it.id == id }

private fun findSendableById(sendables: Set<Sendable>, id: Int) = sendables.find { it.id == id }
private fun findSendableById(sendables: Map<Sendable, ResponseCounter>, id: Int) =
sendables.entries.find { (sendable, _) -> sendable.relatesTo(id) }

private fun findSubscriptionByInitiator(subscriptions: Set<Subscription>, initiator: Sendable) =
subscriptions.find { it.initiatorId == initiator.id }
subscriptions.find { initiator.relatesTo(it.initiatorId) }

private fun handleStop(sideEffects: MutableList<SideEffect>): State {
sideEffects += SideEffect.Disconnect
Expand All @@ -436,3 +490,5 @@ object SocketStateMachine {
it.deliveryType == deliveryType
}
}

internal fun Set<SocketStateMachine.Sendable>.withCounter() = associateWith { 0 }
Loading

0 comments on commit 0de27ac

Please sign in to comment.