diff --git a/build.gradle b/build.gradle index e2cf50a9..6fb2f3ff 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ buildscript { ext { // App version - versionName = '1.7.3' + versionName = '1.8.0' versionCode = 1 // SDK and tools diff --git a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt index 4e8a4a9b..5a8fdf3e 100644 --- a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt +++ b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt @@ -4,6 +4,8 @@ import com.google.gson.Gson import com.neovisionaries.ws.client.WebSocketFactory import com.neovisionaries.ws.client.WebSocketState import jp.co.soramitsu.fearless_utils.wsrpc.exception.ConnectionClosedException +import jp.co.soramitsu.fearless_utils.wsrpc.interceptor.WebSocketResponseInterceptor +import jp.co.soramitsu.fearless_utils.wsrpc.interceptor.shouldDeliver import jp.co.soramitsu.fearless_utils.wsrpc.logging.Logger import jp.co.soramitsu.fearless_utils.wsrpc.mappers.nonNull import jp.co.soramitsu.fearless_utils.wsrpc.mappers.stringIdMapper @@ -31,13 +33,15 @@ class SocketService( private val logger: Logger, private val webSocketFactory: WebSocketFactory, private val reconnector: Reconnector, - private val requestExecutor: RequestExecutor + private val requestExecutor: RequestExecutor, ) : RpcSocketListener { private var socket: RpcSocket? = null private val stateContainer = ObservableState(initialState = State.Disconnected) + private var responseInterceptor: WebSocketResponseInterceptor? = null + fun started() = stateContainer.getState() !is State.Disconnected /** @@ -81,6 +85,11 @@ class SocketService( updateState(Event.Pause) } + @Synchronized + fun setInterceptor(interceptor: WebSocketResponseInterceptor) { + this.responseInterceptor = interceptor + } + /** * Resumes connection from [SocketStateMachine.State.Paused] state, * recovering all subscriptions and [DeliveryType.AT_LEAST_ONCE] requests @@ -176,15 +185,33 @@ class SocketService( @Synchronized override fun onSingleResponse(rpcResponse: RpcResponse) { - updateState(Event.SendableResponse(rpcResponse)) + val interceptor = responseInterceptor + + if ( + interceptor != null && + interceptor.onRpcResponseReceived(rpcResponse).shouldDeliver() + ) { + updateState(Event.SendableResponse(rpcResponse)) + } } override fun onSubscriptionResponse(subscriptionChange: SubscriptionChange) { updateState(Event.SubscriptionResponse(subscriptionChange)) } + @Synchronized override fun onBatchResponse(batchResponse: List) { - updateState(Event.SendableBatchResponse(batchResponse)) + val interceptor = responseInterceptor + + val toDeliver = if (interceptor != null) { + batchResponse.filter { interceptor.onRpcResponseReceived(it).shouldDeliver() } + } else { + batchResponse + } + + if (toDeliver.isNotEmpty()) { + updateState(Event.SendableBatchResponse(batchResponse)) + } } @Synchronized @@ -289,7 +316,8 @@ class SocketService( private fun unsubscribe(subscription: SocketStateMachine.Subscription) { require(subscription is RespondableSubscription) - val unsubscribeRequest = RuntimeRequest(subscription.unsubscribeMethod, listOf(subscription.id)) + val unsubscribeRequest = + RuntimeRequest(subscription.unsubscribeMethod, listOf(subscription.id)) executeRequest( unsubscribeRequest, diff --git a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/interceptor/WebSocketResponseInterceptor.kt b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/interceptor/WebSocketResponseInterceptor.kt new file mode 100644 index 00000000..de767a94 --- /dev/null +++ b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/interceptor/WebSocketResponseInterceptor.kt @@ -0,0 +1,16 @@ +package jp.co.soramitsu.fearless_utils.wsrpc.interceptor + +import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse + +interface WebSocketResponseInterceptor { + + enum class ResponseDelivery { + DROP, DELIVER_TO_SENDER + } + + fun onRpcResponseReceived(rpcResponse: RpcResponse): ResponseDelivery +} + +fun WebSocketResponseInterceptor.ResponseDelivery.shouldDeliver(): Boolean { + return this == WebSocketResponseInterceptor.ResponseDelivery.DELIVER_TO_SENDER +}