From 3775a8dd1228b28df2cf2e6231005b86333a3d7e Mon Sep 17 00:00:00 2001 From: Sean Gilligan Date: Thu, 5 Oct 2023 23:46:19 -0700 Subject: [PATCH] Upgrade to ConsensusJ 0.7.0-alpha3, OmniJ 0.7.0-alpha3 * `ChainTipPublisher` instead of `Publisher` * Use `ChainTipPublisher` as a DI bean. * `Publisher` instead of `Flowable` * Override `ChainTipPublisher` in `ApplicationSpec.groovy` --- gradle.properties | 4 ++-- .../omni/proxy/analysis/OmniPropertyListService.java | 7 ++++--- .../bitcoin/proxy/jsonrpc/CachedRpcService.java | 11 +++++++++-- .../org/consensusj/bitcoin/proxyd/Application.java | 6 ++++++ .../consensusj/bitcoin/proxyd/ApplicationSpec.groovy | 11 +++++++++++ 5 files changed, 32 insertions(+), 7 deletions(-) diff --git a/gradle.properties b/gradle.properties index 4e01a0c..501b15f 100755 --- a/gradle.properties +++ b/gradle.properties @@ -2,8 +2,8 @@ btcProxyVersion = 0.5.2 useMavenLocal = false -consensusjVersion = 0.7.0-alpha2 -omnijVersion = 0.7.0-alpha2 +consensusjVersion = 0.7.0-alpha3 +omnijVersion = 0.7.0-alpha3 micronautVersion= 4.1.0 micronautAppGradlePluginVersion = 4.0.3 diff --git a/src/main/java/foundation/omni/proxy/analysis/OmniPropertyListService.java b/src/main/java/foundation/omni/proxy/analysis/OmniPropertyListService.java index d3e32ff..49dcc11 100644 --- a/src/main/java/foundation/omni/proxy/analysis/OmniPropertyListService.java +++ b/src/main/java/foundation/omni/proxy/analysis/OmniPropertyListService.java @@ -11,6 +11,7 @@ import io.reactivex.rxjava3.disposables.Disposable; import org.bitcoinj.base.BitcoinNetwork; import org.consensusj.bitcoin.json.pojo.ChainTip; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.jsonrpc.service.TxOutSetService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +45,11 @@ public class OmniPropertyListService implements Closeable { private Disposable intervalSubscription; private Disposable outSetSubscription; - OmniPropertyListService(OmniClient omniClient) { + OmniPropertyListService(OmniClient omniClient, ChainTipPublisher chainTipPublisher) { rxJsonClient = omniClient; cache = new OmniPropertyListCache((BitcoinNetwork) rxJsonClient.getNetwork()); activeProperties = rxJsonClient.getNetwork().equals(BitcoinNetwork.MAINNET) ? List.of(OMNI, TOMNI, USDT) : List.of(OMNI, TOMNI); - txOutSetService = new TxOutSetService(omniClient); + txOutSetService = new TxOutSetService(omniClient, chainTipPublisher); timerInterval = Flowable.interval(3,1, TimeUnit.SECONDS); } @@ -63,7 +64,7 @@ public synchronized void start() { } // Subscribe to a TxOutSetInfo stream (happens once per block, but delayed because the calculation takes some time) if (outSetSubscription == null) { - outSetSubscription = txOutSetService.getTxOutSetPublisher() + outSetSubscription = Flowable.fromPublisher(txOutSetService.getTxOutSetPublisher()) .subscribe(cache::cachePutBitcoin, t -> log.error("TxOutSetService", t), () -> log.error("TxOutSetService completed")); diff --git a/src/main/java/org/consensusj/bitcoin/proxy/jsonrpc/CachedRpcService.java b/src/main/java/org/consensusj/bitcoin/proxy/jsonrpc/CachedRpcService.java index af44535..50c4d62 100644 --- a/src/main/java/org/consensusj/bitcoin/proxy/jsonrpc/CachedRpcService.java +++ b/src/main/java/org/consensusj/bitcoin/proxy/jsonrpc/CachedRpcService.java @@ -4,6 +4,7 @@ import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.disposables.Disposable; import org.consensusj.bitcoin.json.pojo.ChainTip; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient; import org.consensusj.jsonrpc.JsonRpcRequest; import org.consensusj.jsonrpc.JsonRpcResponse; @@ -12,9 +13,11 @@ import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; + import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +// TODO: Refactor to remover usage of RxJava, except ChainTipPublisher which will migrate to Flow.Publisher /** * This prototype caching service implementation works best for RPC methods that: *