Skip to content

Commit

Permalink
Upgrade to ConsensusJ 0.7.0-alpha3, OmniJ 0.7.0-alpha3
Browse files Browse the repository at this point in the history
* `ChainTipPublisher` instead of `Publisher<ChainTip>`
* Use `ChainTipPublisher` as a DI bean.
* `Publisher` instead of `Flowable`
* Override `ChainTipPublisher` in `ApplicationSpec.groovy`
  • Loading branch information
msgilligan committed Oct 6, 2023
1 parent 954dfbb commit 3775a8d
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ btcProxyVersion = 0.5.2

useMavenLocal = false

consensusjVersion = 0.7.0-alpha2
omnijVersion = 0.7.0-alpha2
consensusjVersion = 0.7.0-alpha3
omnijVersion = 0.7.0-alpha3

micronautVersion= 4.1.0
micronautAppGradlePluginVersion = 4.0.3
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.reactivex.rxjava3.disposables.Disposable;
import org.bitcoinj.base.BitcoinNetwork;
import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.jsonrpc.service.TxOutSetService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,11 +45,11 @@ public class OmniPropertyListService implements Closeable {
private Disposable intervalSubscription;
private Disposable outSetSubscription;

OmniPropertyListService(OmniClient omniClient) {
OmniPropertyListService(OmniClient omniClient, ChainTipPublisher chainTipPublisher) {
rxJsonClient = omniClient;
cache = new OmniPropertyListCache((BitcoinNetwork) rxJsonClient.getNetwork());
activeProperties = rxJsonClient.getNetwork().equals(BitcoinNetwork.MAINNET) ? List.of(OMNI, TOMNI, USDT) : List.of(OMNI, TOMNI);
txOutSetService = new TxOutSetService(omniClient);
txOutSetService = new TxOutSetService(omniClient, chainTipPublisher);
timerInterval = Flowable.interval(3,1, TimeUnit.SECONDS);
}

Expand All @@ -63,7 +64,7 @@ public synchronized void start() {
}
// Subscribe to a TxOutSetInfo stream (happens once per block, but delayed because the calculation takes some time)
if (outSetSubscription == null) {
outSetSubscription = txOutSetService.getTxOutSetPublisher()
outSetSubscription = Flowable.fromPublisher(txOutSetService.getTxOutSetPublisher())
.subscribe(cache::cachePutBitcoin,
t -> log.error("TxOutSetService", t),
() -> log.error("TxOutSetService completed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient;
import org.consensusj.jsonrpc.JsonRpcRequest;
import org.consensusj.jsonrpc.JsonRpcResponse;
Expand All @@ -12,9 +13,11 @@

import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

// TODO: Refactor to remover usage of RxJava, except ChainTipPublisher which will migrate to Flow.Publisher
/**
* This prototype caching service implementation works best for RPC methods that:
* <ul>
Expand All @@ -31,17 +34,20 @@ public class CachedRpcService {
private final Set<String> cached = Set.of("getchaintips", "getblockcount", "getblockchaininfo", "getbestblockhash", "gettxoutsetinfo");
private final ConcurrentHashMap<String, Single<Object>> cache = new ConcurrentHashMap<>();
private final RxBitcoinClient rxBitcoinClient;
private final ChainTipPublisher chainTipPublisher;
private Disposable chainTipSubscription;

public CachedRpcService( RxBitcoinClient rxBitcoinClient) {
// TODO: Change constructor to take a regular BitcoinClient
public CachedRpcService(RxBitcoinClient rxBitcoinClient, ChainTipPublisher chainTipPublisher) {
this.rxBitcoinClient = rxBitcoinClient;
this.chainTipPublisher = chainTipPublisher;
}

@PostConstruct
public synchronized void start() {
if (chainTipSubscription == null) {
log.info("starting");
chainTipSubscription = Flowable.fromPublisher(rxBitcoinClient.chainTipPublisher())
chainTipSubscription = Flowable.fromPublisher(chainTipPublisher)
.subscribe(this::onNewBlock, this::onError);
}
}
Expand All @@ -50,6 +56,7 @@ public boolean isCached(JsonRpcRequest request) {
return cached.contains(request.getMethod());
}

// TODO: Change to return CompletableFuture
public Single<JsonRpcResponse<?>> callCached(JsonRpcRequest request) {
return fetch(request.getMethod())
.map(result -> responseFromResult(request, result));
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/consensusj/bitcoin/proxyd/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.micronaut.runtime.Micronaut;
import org.consensusj.bitcoin.json.conversion.RpcServerModule;
import org.consensusj.bitcoin.proxy.jsonrpc.JsonRpcProxyConfiguration;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -38,4 +39,9 @@ public OmniClient bitcoinClient(JsonRpcProxyConfiguration configuration) {
//client.start();
return client;
}

@Singleton
ChainTipPublisher chainTipPublisher(OmniClient omniClient) {
return omniClient.chainTipPublisher();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.consensusj.bitcoin.proxyd

import io.micronaut.context.annotation.Replaces
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Singleton

import org.consensusj.bitcoin.rx.ChainTipPublisher
import org.consensusj.bitcoin.rx.jsonrpc.test.TestChainTipPublishers
import org.consensusj.jsonrpc.JsonRpcStatusException
import org.consensusj.jsonrpc.groovy.DynamicRpcClient
import spock.lang.Shared
Expand Down Expand Up @@ -57,4 +62,10 @@ class ApplicationSpec extends Specification {
e.jsonRpcCode == -32601
// TODO e.httpCode == ??
}

@Replaces(ChainTipPublisher)
@Singleton
ChainTipPublisher chainTipService() {
return TestChainTipPublishers.never();
}
}

0 comments on commit 3775a8d

Please sign in to comment.