Skip to content

Commit

Permalink
Merge pull request #20 from horizontalsystems/sync-all
Browse files Browse the repository at this point in the history
Improve syncing logic for ethereum and erc20 tokens
  • Loading branch information
omurovch authored Feb 14, 2019
2 parents 78474b8 + 7669806 commit f7c42ba
Showing 1 changed file with 105 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import io.horizontalsystems.ethereumkit.models.Balance
import io.horizontalsystems.ethereumkit.models.GasPrice
import io.horizontalsystems.ethereumkit.models.LastBlockHeight
import io.horizontalsystems.ethereumkit.models.Transaction
import io.horizontalsystems.ethereumkit.models.etherscan.EtherscanTransaction
import io.horizontalsystems.ethereumkit.network.EtherscanService
import io.horizontalsystems.hdwalletkit.HDWallet
import io.horizontalsystems.hdwalletkit.Mnemonic
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.functions.Function5
import io.reactivex.functions.Function3
import io.reactivex.schedulers.Schedulers
import io.realm.OrderedCollectionChangeSet
import io.realm.Realm
Expand All @@ -29,6 +30,7 @@ import org.web3j.abi.datatypes.Type
import org.web3j.abi.datatypes.generated.Uint256
import org.web3j.crypto.RawTransaction
import org.web3j.crypto.TransactionEncoder
import org.web3j.tuples.generated.Tuple3
import org.web3j.utils.Convert
import org.web3j.utils.Numeric
import java.math.BigDecimal
Expand Down Expand Up @@ -105,7 +107,6 @@ class EthereumKit(seed: ByteArray, networkType: NetworkType, walletId: String) {
timer = Timer(30, object : Timer.Listener {
override fun onTimeIsUp() {
refresh()
erc20List.forEach { refresh(it.key) }
}
})
}
Expand Down Expand Up @@ -138,18 +139,29 @@ class EthereumKit(seed: ByteArray, networkType: NetworkType, walletId: String) {

@Synchronized
fun refresh() {
val tokenList = erc20List.values
if (kitState is KitState.Syncing) {
return
}

tokenList.find { it.kitState is KitState.Syncing }?.let {
return
}

kitState = KitState.Syncing(0.0)
tokenList.forEach { it.kitState = KitState.Syncing(0.0) }

Flowable.zip(updateBalance(), updateLastBlockHeight(), updateTransactions(), updateTransactions(token = true), updateGasPrice(),
Function5<BigDecimal, Int, Int, Int, Double, Unit> { _, _, _, _, _ ->
Unit
})
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
Flowable.zip(web3j.getBlockNumber(), web3j.getGasPrice(), web3j.getBalance(receiveAddress), Function3 { h: Int, g: Double, b: BigDecimal -> Tuple3(h, g, b) })
.subscribeOn(Schedulers.io())
.subscribe({
kitState = KitState.Synced
updateLastBlockHeight(it.value1)
updateGasPrice(it.value2)
updateBalance(it.value3, receiveAddress, ETH_DECIMAL)

refreshTransactions()
}, {
it?.printStackTrace()
kitState = KitState.NotSynced
tokenList.forEach { it.kitState = KitState.NotSynced }
}).let {
disposables.add(it)
}
Expand Down Expand Up @@ -203,31 +215,13 @@ class EthereumKit(seed: ByteArray, networkType: NetworkType, walletId: String) {
holder.balance = it.balance.toBigDecimal()
}

refresh(token.contractAddress)
refresh()
}

fun unregister(contractAddress: String) {
erc20List.remove(contractAddress)
}

@Synchronized
fun refresh(contractAddress: String) {
val erc20 = erc20List[contractAddress] ?: return

erc20.kitState = KitState.Syncing(0.0)

updateBalance(contractAddress, erc20.listener.decimal)
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.subscribe({
erc20.kitState = KitState.Synced
}, {
it?.printStackTrace()
erc20.kitState = KitState.NotSynced
}).let {
disposables.add(it)
}
}

fun feeERC20(): Double {
realmFactory.realm.use { realm ->
val gwei = realm.where(GasPrice::class.java).findFirst()?.gasPriceInGwei
Expand Down Expand Up @@ -396,92 +390,108 @@ class EthereumKit(seed: ByteArray, networkType: NetworkType, walletId: String) {
}
}

private fun updateGasPrice(): Flowable<Double> {
return web3j.getGasPrice()
.map { gasPrice ->
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(GasPrice(gasPrice))
}
}
gasPrice
}
.onErrorReturn {
DEFAULT_GAS_PRICE
}
}
private fun getBlockHeight(token: Boolean = false): Int {
realmFactory.realm.use {
val query = it.where(Transaction::class.java)
if (token) {
query.notEqualTo("contractAddress", "")
} else {
query.equalTo("contractAddress", "")
}

private fun updateBalance(contractAddress: String? = null, decimal: Int = ETH_DECIMAL): Flowable<BigDecimal> {
val flowable = if (contractAddress == null) {
web3j.getBalance(receiveAddress)
} else {
web3j.getTokenBalance(receiveAddress, contractAddress, decimal)
return query.sort("blockNumber", Sort.DESCENDING).findFirst()?.blockNumber?.toInt() ?: 0
}
}

return flowable.map { balance ->
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(Balance(contractAddress ?: receiveAddress, balance, decimal))
}
}
private fun refreshTransactions() {
var lastBlockHeight = getBlockHeight()

etherscanService.getTransactionList(receiveAddress, lastBlockHeight + 1)
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.subscribe({
saveTransactions(it.result)
kitState = KitState.Synced
}, {
kitState = KitState.NotSynced
})
.let { disposables.add(it) }

if (erc20List.isEmpty())
return

lastBlockHeight = getBlockHeight(token = true)
etherscanService.getTokenTransactions(receiveAddress, lastBlockHeight + 1)
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.subscribe({
saveTransactions(it.result)
refreshTokensBalances()
}, {
erc20List.values.forEach { it.kitState = KitState.NotSynced }
})
.let { disposables.add(it) }

balance
}
}

private fun updateLastBlockHeight(): Flowable<Int> {
return web3j.getBlockNumber()
.map { height ->
lastBlockHeight = height
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(LastBlockHeight(height))
}
}
private fun refreshTokensBalances() {
erc20List.values.forEach { holder ->
val erc20Address = holder.listener.contractAddress
val erc20Decimal = holder.listener.decimal

web3j.getTokenBalance(receiveAddress, erc20Address, erc20Decimal)
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.subscribe({
updateBalance(it, erc20Address, erc20Decimal)
holder.kitState = KitState.Synced
}, {
holder.kitState = KitState.NotSynced
})
.let { disposables.add(it) }
}
}

listener?.onLastBlockHeightUpdate(height)
//
// InsertOrUpdate records
//

erc20List.forEach {
it.value.listener.onLastBlockHeightUpdate(height)
}
private fun updateGasPrice(gasPrice: Double) {
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(GasPrice(gasPrice))
}
}
}

height
}
private fun updateBalance(balance: BigDecimal, address: String, decimal: Int) {
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(Balance(address, balance, decimal))
}
}
}

private fun updateTransactions(token: Boolean = false): Flowable<Int> {
private fun updateLastBlockHeight(height: Int) {
lastBlockHeight = height

val lastBlockHeight = realmFactory.realm.use {
var query = it.where(Transaction::class.java)
query = if (token) {
query.notEqualTo("contractAddress", "")
.notEqualTo("input", "0x")
} else {
query.equalTo("contractAddress", "")
.equalTo("input", "0x")
realmFactory.realm.use { realm ->
realm.executeTransaction {
it.insertOrUpdate(LastBlockHeight(height))
}
query.sort("blockNumber", Sort.DESCENDING)
.findFirst()?.blockNumber?.toInt() ?: 0
}

val flowable = if (token) {
etherscanService.getTokenTransactions(receiveAddress, lastBlockHeight + 1)
} else {
etherscanService.getTransactionList(receiveAddress, lastBlockHeight + 1)
listener?.onLastBlockHeightUpdate(height)

erc20List.forEach {
it.value.listener.onLastBlockHeightUpdate(height)
}
}

return flowable.map { response ->
realmFactory.realm.use { realm ->
realm.executeTransaction {
response.result
.map { tx -> Transaction(tx) }
.forEach { tx ->
realm.insertOrUpdate(tx)
}
private fun saveTransactions(list: List<EtherscanTransaction>) {
realmFactory.realm.use { realm ->
realm.executeTransaction {
list.map { tx -> Transaction(tx) }.forEach { tx ->
realm.insertOrUpdate(tx)
}
}

response.result.size
}
}

Expand Down

0 comments on commit f7c42ba

Please sign in to comment.