diff --git a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/configuration/MetricsConfiguration.kt b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/configuration/MetricsConfiguration.kt index b1e2675c..d323023d 100644 --- a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/configuration/MetricsConfiguration.kt +++ b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/configuration/MetricsConfiguration.kt @@ -6,5 +6,8 @@ const val NINE_HUNDRED_NINGTHY_FIVE_PERCENT = 0.95 const val TRADE_COUNT_METRIC = "trade_count" const val TRADE_LATENCY_METRIC = "trade_latency" +const val ORDERBOOK_SIZE_METRIC = "orderbook_size" + +const val ORDER_TYPE_TAG = "order_type" const val EXCHANGE_TAG = "exchange" const val TOKENS_PAIR_TAG = "tokens_pair" \ No newline at end of file diff --git a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/etherdelta/EtherdeltaTokenResolver.kt b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/etherdelta/EtherdeltaTokenResolver.kt index 2001ac8b..9b4c9799 100644 --- a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/etherdelta/EtherdeltaTokenResolver.kt +++ b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/etherdelta/EtherdeltaTokenResolver.kt @@ -9,8 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.support.GenericApplicationContext import org.springframework.stereotype.Component import org.web3j.protocol.Web3j -import org.web3j.tx.Contract import org.web3j.tx.ReadonlyTransactionManager +import org.web3j.tx.gas.ContractGasProvider +import org.web3j.tx.gas.DefaultGasProvider import java.math.BigInteger import java.net.URL import java.nio.charset.Charset @@ -32,6 +33,8 @@ class EtherdeltaTokenResolver { @Autowired private lateinit var resourceLoader: GenericApplicationContext + private val gasProvider: ContractGasProvider = DefaultGasProvider() + /** * Get ERC20 token definitions from different sources */ @@ -119,7 +122,7 @@ class EtherdeltaTokenResolver { try { val transactionManager = ReadonlyTransactionManager(web3j, PARITY_TOKEN_REGISTRY_CONTRACT_ADDRESS) val parityTokenRegistryContract = ParityTokenRegistryContract.load(PARITY_TOKEN_REGISTRY_CONTRACT_ADDRESS, - web3j, transactionManager, Contract.GAS_PRICE, Contract.GAS_LIMIT) + web3j, transactionManager, gasProvider.getGasPrice(null), gasProvider.getGasLimit(null)) val tokensCount = parityTokenRegistryContract.tokenCount().send().toLong() for (index in 0 until tokensCount) { @@ -152,7 +155,7 @@ class EtherdeltaTokenResolver { try { val transactionManager = ReadonlyTransactionManager(web3j, address) - val erc20Contract = Erc20Contract.load(address, web3j, transactionManager, Contract.GAS_PRICE, Contract.GAS_LIMIT) + val erc20Contract = Erc20Contract.load(address, web3j, transactionManager, gasProvider.getGasPrice(null), gasProvider.getGasLimit(null)) val tokenSymbol = erc20Contract.symbol().send().trim() val tokenDecimals = erc20Contract.decimals().send().intValueExact() diff --git a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/orderbook/XchangeOrderbookConnector.kt b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/orderbook/XchangeOrderbookConnector.kt index db5fc475..f5c8a193 100644 --- a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/orderbook/XchangeOrderbookConnector.kt +++ b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/orderbook/XchangeOrderbookConnector.kt @@ -6,14 +6,20 @@ import fund.cyber.markets.common.model.Order import fund.cyber.markets.common.model.OrderType import fund.cyber.markets.common.model.TokensPair import fund.cyber.markets.connector.AbstarctXchangeConnector +import fund.cyber.markets.connector.configuration.EXCHANGE_TAG +import fund.cyber.markets.connector.configuration.ORDERBOOK_SIZE_METRIC +import fund.cyber.markets.connector.configuration.ORDER_TYPE_TAG +import fund.cyber.markets.connector.configuration.TOKENS_PAIR_TAG import info.bitrich.xchangestream.core.ProductSubscription import info.bitrich.xchangestream.core.StreamingExchangeFactory import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.knowm.xchange.currency.CurrencyPair import org.knowm.xchange.dto.marketdata.OrderBook import org.knowm.xchange.dto.trade.LimitOrder import org.springframework.kafka.core.KafkaTemplate import java.util.* +import java.util.concurrent.atomic.AtomicLong class XchangeOrderbookConnector : AbstarctXchangeConnector, OrderbookConnector { override var orderbooks: MutableMap = mutableMapOf() @@ -39,12 +45,22 @@ class XchangeOrderbookConnector : AbstarctXchangeConnector, OrderbookConnector { override fun subscribe() { log.info("Subscribing for orderbooks from $exchangeName exchange") + val exchangeTag = Tags.of(EXCHANGE_TAG, exchangeName) + val askOrderTag = Tags.of(ORDER_TYPE_TAG, OrderType.ASK.name) + val bidOrderTag = Tags.of(ORDER_TYPE_TAG, OrderType.BID.name) + exchangeTokensPairs.forEach { pair -> + val exchangePairTag = exchangeTag.and(Tags.of(TOKENS_PAIR_TAG, pair.base.currencyCode + "_" + pair.counter.currencyCode)) + val askCountMonitor = monitoring.gauge(ORDERBOOK_SIZE_METRIC, exchangePairTag.and(askOrderTag), AtomicLong(0L)) + val bidCountMonitor = monitoring.gauge(ORDERBOOK_SIZE_METRIC, exchangePairTag.and(bidOrderTag), AtomicLong(0L)) val orderbookSubscription = exchange.streamingMarketDataService .getOrderBook(pair) .subscribe({ orderbook -> orderbooks[pair] = orderbook + + askCountMonitor!!.set(orderbook.asks.size.toLong()) + bidCountMonitor!!.set(orderbook.bids.size.toLong()) }) { throwable -> log.error("Error in subscribing orderbook for $exchangeName, pair $pair", throwable) } diff --git a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/trade/EtherdeltaTradeConnector.kt b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/trade/EtherdeltaTradeConnector.kt index 2d5fbb0c..a364bb88 100644 --- a/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/trade/EtherdeltaTradeConnector.kt +++ b/exchanges-connector/src/main/kotlin/fund/cyber/markets/connector/trade/EtherdeltaTradeConnector.kt @@ -24,10 +24,11 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component import org.web3j.protocol.Web3j -import org.web3j.protocol.core.DefaultBlockParameter +import org.web3j.protocol.core.DefaultBlockParameterName import org.web3j.protocol.core.methods.response.EthBlock -import org.web3j.tx.Contract import org.web3j.tx.ReadonlyTransactionManager +import org.web3j.tx.gas.ContractGasProvider +import org.web3j.tx.gas.DefaultGasProvider import org.web3j.utils.Numeric import java.math.BigDecimal import java.math.BigInteger @@ -58,15 +59,17 @@ class EtherdeltaTradeConnector : Connector { @Autowired private lateinit var monitoring: MeterRegistry + private val gasProvider: ContractGasProvider = DefaultGasProvider() + /** * Connect to etherdelta and parity token registry smart contracts using web3j */ override fun connect() { log.info("Connecting to $exchangeName exchange") - val gasPrice = web3j.ethGasPrice().send() val etherdeltaTransactionManager = ReadonlyTransactionManager(web3j, ETHERDELTA_CONTRACT_ADDRESS) - etherdeltaContract = EtherdeltaContract.load(ETHERDELTA_CONTRACT_ADDRESS, web3j, etherdeltaTransactionManager, gasPrice.gasPrice, Contract.GAS_LIMIT) + etherdeltaContract = EtherdeltaContract.load(ETHERDELTA_CONTRACT_ADDRESS, web3j, + etherdeltaTransactionManager, gasProvider.getGasPrice(null), gasProvider.getGasLimit(null)) log.info("Connected to $exchangeName exchange") updateTokensPairs() @@ -107,11 +110,10 @@ class EtherdeltaTradeConnector : Connector { .publishPercentiles(NINGTHY_FIVE_PERCENT, NINE_HUNDRED_NINGTHY_FIVE_PERCENT) .register(monitoring) - val latestBlockParameter = DefaultBlockParameter.valueOf("latest") - var block = web3j.ethGetBlockByNumber(latestBlockParameter, false).send() + var block = web3j.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send() etherdeltaContract - .tradeEventObservable(latestBlockParameter, latestBlockParameter) + .tradeEventObservable(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST) .subscribe{ tradeEvent -> if (block.block.hash != tradeEvent.log!!.blockHash) {