diff --git a/conf/brs-default.properties b/conf/brs-default.properties index 3c0341aeb..d3ad9a898 100644 --- a/conf/brs-default.properties +++ b/conf/brs-default.properties @@ -79,18 +79,11 @@ P2P.TimeoutIdle_ms = 30000 # Blacklist peers for 600000 milliseconds (i.e. 10 minutes by default). P2P.BlacklistingTime_ms = 600000 -# Enable re-broadcasting of new transactions until they are received back from at least one -# peer, or found in the blockchain. This feature can optionally be disabled, to avoid the -# risk of revealing that this node is the submitter of such re-broadcasted new transactions. +# Enable priority (re-)broadcasting of transactions. When enabled incoming transactions +# will be priority resent to the rebroadcast targets P2P.enableTxRebroadcast = yes -# Transactions that aren't confirmed for this many blocks start getting rebroadcast. -P2P.rebroadcastTxAfter = 5 - -# Transactions being rebroadcast get rebroadcast every this many blocks until they are confirmed. -P2P.rebroadcastTxEvery = 2 - -# Consider a new transaction or block sent after 10 peers have received it. +# Amount of extra peers to send a transaction to after sending to all rebroadcast targets P2P.sendToLimit=10 # Max number of unconfirmed transactions that will be kept in cache. @@ -99,13 +92,10 @@ P2P.maxUnconfirmedTransactions = 8192 # Max percentage of unconfirmed transactions that have a full hash reference to another transaction kept in cache P2P.maxUnconfirmedTransactionsFullHashReferencePercentage = 5 -# Max amount of unconfirmed transactions that will be asked to a peer at the same time -P2P.limitUnconfirmedTransactionsToRetrieve = 1000 - # JETTY pass-through options. See documentation at # https://www.eclipse.org/jetty/documentation/9.2.22.v20170531/dos-filter.html # P2P section: -JETTY.P2P.DoSFilter = off +JETTY.P2P.DoSFilter = on JETTY.P2P.DoSFilter.maxRequestsPerSec = 30 JETTY.P2P.DoSFilter.delayMs = 500 JETTY.P2P.DoSFilter.maxRequestMs = 300000 @@ -308,3 +298,6 @@ brs.debugTraceQuote = # Log changes to unconfirmed balances. brs.debugLogUnconfirmed = false + +# Timeout in Seconds to wait for a graceful shutdown +brs.ShutdownTimeout = 180 diff --git a/src/brs/BlockchainProcessor.java b/src/brs/BlockchainProcessor.java index 35d642dbc..afc2cd3f3 100644 --- a/src/brs/BlockchainProcessor.java +++ b/src/brs/BlockchainProcessor.java @@ -23,7 +23,7 @@ enum Event { int getMinRollbackHeight(); - void processPeerBlock(JSONObject request) throws BurstException; + void processPeerBlock(JSONObject request, Peer peer) throws BurstException; void fullReset(); diff --git a/src/brs/BlockchainProcessorImpl.java b/src/brs/BlockchainProcessorImpl.java index 33e07afce..47e72784f 100644 --- a/src/brs/BlockchainProcessorImpl.java +++ b/src/brs/BlockchainProcessorImpl.java @@ -20,7 +20,6 @@ import brs.statistics.StatisticsManagerImpl; import brs.services.AccountService; import brs.transactionduplicates.TransactionDuplicatesCheckerImpl; -import brs.transactionduplicates.TransactionDuplicationResult; import brs.unconfirmedtransactions.UnconfirmedTransactionStore; import brs.util.ThreadPool; @@ -32,11 +31,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Semaphore; @@ -744,10 +740,10 @@ public int getMinRollbackHeight() { } @Override - public void processPeerBlock(JSONObject request) throws BurstException { + public void processPeerBlock(JSONObject request, Peer peer) throws BurstException { Block newBlock = Block.parseBlock(request, blockchain.getHeight()); if (newBlock == null) { - logger.debug("Peer has announced an unprocessable block."); + logger.debug("Peer {} has announced an unprocessable block.", peer.getPeerAddress()); return; } /* @@ -760,10 +756,9 @@ public void processPeerBlock(JSONObject request) throws BurstException { newBlock.setByteLength(newBlock.toString().length()); blockService.calculateBaseTarget(newBlock, chainblock); downloadCache.addBlock(newBlock); - logger.debug("Added from Anounce: Id: " +newBlock.getId()+" Height: "+newBlock.getHeight()); + logger.debug("Peer {} added block from Announce: Id: {} Height: {}", peer.getPeerAddress(), newBlock.getId(), newBlock.getHeight()); } else { - logger.debug("Peer sent us block: " + newBlock.getPreviousBlockId() - + " that does not match our chain."); + logger.debug("Peer {} sent us block: {} which is not the follow-up block for {}", peer.getPeerAddress(), newBlock.getPreviousBlockId(), chainblock.getId()); } } diff --git a/src/brs/TransactionProcessor.java b/src/brs/TransactionProcessor.java index 5e1da7bb7..b1e3aede9 100644 --- a/src/brs/TransactionProcessor.java +++ b/src/brs/TransactionProcessor.java @@ -25,7 +25,7 @@ enum Event { void clearUnconfirmedTransactions(); - void broadcast(Transaction transaction) throws BurstException.ValidationException; + Integer broadcast(Transaction transaction) throws BurstException.ValidationException; void processPeerTransactions(JSONObject request, Peer peer) throws BurstException.ValidationException; diff --git a/src/brs/TransactionProcessorImpl.java b/src/brs/TransactionProcessorImpl.java index 303309cf4..b5baa1c40 100644 --- a/src/brs/TransactionProcessorImpl.java +++ b/src/brs/TransactionProcessorImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.stream.Collectors; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.slf4j.Logger; @@ -84,11 +85,12 @@ public TransactionProcessorImpl(PropertyService propertyService, JSONArray transactionsData = (JSONArray) response.get(UNCONFIRMED_TRANSACTIONS_RESPONSE); - if (transactionsData == null || transactionsData.isEmpty()) { + if (transactionsData == null) { return; } try { List addedTransactions = processPeerTransactions(transactionsData, peer); + Peers.feedingTime(peer, foodDispenser, doneFeedingLog); if(! addedTransactions.isEmpty()) { List activePrioPlusExtra = Peers.getAllActivePriorityPlusSomeExtraPeers(); @@ -180,40 +182,25 @@ public Transaction.Builder newTransactionBuilder(byte[] senderPublicKey, long am } @Override - public void broadcast(Transaction transaction) throws BurstException.ValidationException { + public Integer broadcast(Transaction transaction) throws BurstException.ValidationException { if (! transaction.verifySignature()) { throw new BurstException.NotValidException("Transaction signature verification failed"); } List processedTransactions; if (dbs.getTransactionDb().hasTransaction(transaction.getId())) { logger.info("Transaction " + transaction.getStringId() + " already in blockchain, will not broadcast again"); - return; + return null; } if (unconfirmedTransactionStore.exists(transaction.getId())) { - /* - if (enableTransactionRebroadcasting) { - nonBroadcastedTransactions.add(transaction); - logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will re-broadcast"); - } else {*/ - logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again"); - ///} - return; + logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again"); + return null; } processedTransactions = processTransactions(Collections.singleton(transaction), null); if(! processedTransactions.isEmpty()) { - broadcastToPeers(); - } - - if (processedTransactions.contains(transaction)) { - /* - if (enableTransactionRebroadcasting) { - nonBroadcastedTransactions.add(transaction); - } - */ - logger.debug("Accepted new transaction " + transaction.getStringId()); + return broadcastToPeers(true); } else { logger.debug("Could not accept new transaction " + transaction.getStringId()); throw new BurstException.NotValidException("Invalid transaction " + transaction.getStringId()); @@ -226,7 +213,7 @@ public void processPeerTransactions(JSONObject request, Peer peer) throws BurstE List processedTransactions = processPeerTransactions(transactionsData, peer); if(! processedTransactions.isEmpty()) { - broadcastToPeers(); + broadcastToPeers(false); } } @@ -373,10 +360,16 @@ private List processTransactions(Collection transactio } } - private void broadcastToPeers() { - for(Peer p: Peers.getAllActivePriorityPlusSomeExtraPeers()) { + private int broadcastToPeers(boolean toAll) { + List peersToSendTo = toAll ? Peers.getActivePeers().stream().limit(100).collect(Collectors.toList()) : Peers.getAllActivePriorityPlusSomeExtraPeers(); + + logger.info("Queueing up {} Peers for feeding", peersToSendTo.size()); + + for(Peer p: peersToSendTo) { Peers.feedingTime(p, foodDispenser, doneFeedingLog); } + + return peersToSendTo.size(); } public void revalidateUnconfirmedTransactions() { diff --git a/src/brs/http/APITransactionManagerImpl.java b/src/brs/http/APITransactionManagerImpl.java index c0806432d..40e66de28 100644 --- a/src/brs/http/APITransactionManagerImpl.java +++ b/src/brs/http/APITransactionManagerImpl.java @@ -23,6 +23,7 @@ import static brs.http.common.ResultFields.BROADCASTED_RESPONSE; import static brs.http.common.ResultFields.ERROR_RESPONSE; import static brs.http.common.ResultFields.FULL_HASH_RESPONSE; +import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE; import static brs.http.common.ResultFields.SIGNATURE_HASH_RESPONSE; import static brs.http.common.ResultFields.TRANSACTION_BYTES_RESPONSE; import static brs.http.common.ResultFields.TRANSACTION_JSON_RESPONSE; @@ -38,7 +39,6 @@ import brs.Blockchain; import brs.Burst; import brs.BurstException; -import brs.Constants; import brs.Transaction; import brs.Transaction.Builder; import brs.TransactionProcessor; @@ -187,7 +187,7 @@ public JSONStreamAware createTransaction(HttpServletRequest req, Account senderA response.put(TRANSACTION_BYTES_RESPONSE, Convert.toHexString(transaction.getBytes())); response.put(SIGNATURE_HASH_RESPONSE, Convert.toHexString(Crypto.sha256().digest(transaction.getSignature()))); if (broadcast) { - transactionProcessor.broadcast(transaction); + response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction)); response.put(BROADCASTED_RESPONSE, true); } else { response.put(BROADCASTED_RESPONSE, false); diff --git a/src/brs/http/BroadcastTransaction.java b/src/brs/http/BroadcastTransaction.java index 5f4256bbb..a5b3d0de9 100644 --- a/src/brs/http/BroadcastTransaction.java +++ b/src/brs/http/BroadcastTransaction.java @@ -6,6 +6,7 @@ import static brs.http.common.ResultFields.ERROR_DESCRIPTION_RESPONSE; import static brs.http.common.ResultFields.ERROR_RESPONSE; import static brs.http.common.ResultFields.FULL_HASH_RESPONSE; +import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE; import static brs.http.common.ResultFields.TRANSACTION_RESPONSE; import brs.BurstException; @@ -45,7 +46,7 @@ JSONStreamAware processRequest(HttpServletRequest req) throws BurstException { JSONObject response = new JSONObject(); try { transactionService.validate(transaction); - transactionProcessor.broadcast(transaction); + response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction)); response.put(TRANSACTION_RESPONSE, transaction.getStringId()); response.put(FULL_HASH_RESPONSE, transaction.getFullHash()); } catch (BurstException.ValidationException | RuntimeException e) { diff --git a/src/brs/http/common/ResultFields.java b/src/brs/http/common/ResultFields.java index ffd51153f..6d5813d38 100644 --- a/src/brs/http/common/ResultFields.java +++ b/src/brs/http/common/ResultFields.java @@ -21,6 +21,7 @@ public class ResultFields { public static final String DONE_RESPONSE = "done"; public static final String SCAN_TIME_RESPONSE = "scanTime"; public static final String BROADCASTED_RESPONSE = "broadcasted"; + public static final String NUMBER_PEERS_SENT_TO_RESPONSE = "numberPeersSentTo"; public static final String UNSIGNED_TRANSACTION_BYTES_RESPONSE = "unsignedTransactionBytes"; public static final String TRANSACTION_JSON_RESPONSE = "transactionJSON"; public static final String TRANSACTION_BYTES_RESPONSE = "transactionBytes"; diff --git a/src/brs/peer/Peers.java b/src/brs/peer/Peers.java index de703a13d..e7f9d3715 100644 --- a/src/brs/peer/Peers.java +++ b/src/brs/peer/Peers.java @@ -2,6 +2,7 @@ import static brs.props.Props.P2P_ENABLE_TX_REBROADCAST; import static brs.props.Props.P2P_SEND_TO_LIMIT; +import static brs.util.JSON.prepareRequest; import brs.*; import brs.props.Props; @@ -164,7 +165,7 @@ public static void init(TimeService timeService, AccountService accountService, logger.debug("My peer info:\n" + json.toJSONString()); myPeerInfoResponse = JSON.prepare(json); json.put("requestType", "getInfo"); - myPeerInfoRequest = JSON.prepareRequest(json); + myPeerInfoRequest = prepareRequest(json); if(propertyService.getBoolean(P2P_ENABLE_TX_REBROADCAST)) { rebroadcastPeers = Collections @@ -509,7 +510,7 @@ private void updateSavedPeers() { { JSONObject request = new JSONObject(); request.put("requestType", "getPeers"); - getPeersRequest = JSON.prepareRequest(request); + getPeersRequest = prepareRequest(request); } private volatile boolean addedNewPeer; @@ -575,7 +576,7 @@ public void run() { JSONObject request = new JSONObject(); request.put("requestType", "addPeers"); request.put("peers", myPeers); - peer.send(JSON.prepareRequest(request)); + peer.send(prepareRequest(request)); } } catch (Exception e) { @@ -639,7 +640,7 @@ public static Collection getAllPeers() { return allPeers; } - public static Collection getActivePeers() { + public static List getActivePeers() { List activePeers = new ArrayList<>(); for (PeerImpl peer : peers.values()) { if (peer.getState() != Peer.State.NON_CONNECTED) { @@ -752,7 +753,7 @@ public static void sendToSomePeers(Block block) { request.put("requestType", "processBlock"); blocksSendingService.submit(() -> { - final JSONStreamAware jsonRequest = JSON.prepareRequest(request); + final JSONStreamAware jsonRequest = prepareRequest(request); int successful = 0; List> expectedResponses = new ArrayList<>(); @@ -789,7 +790,7 @@ public static void sendToSomePeers(Block block) { static { JSONObject request = new JSONObject(); request.put("requestType", "getUnconfirmedTransactions"); - getUnconfirmedTransactionsRequest = JSON.prepareRequest(request); + getUnconfirmedTransactionsRequest = prepareRequest(request); } private static final ExecutorService utReceivingService = Executors.newCachedThreadPool(); @@ -814,17 +815,22 @@ public synchronized static void feedingTime(Peer peer, Function> foodDispenser, BiConsumer> doneFeedingLog) { List transactionsToSend = foodDispenser.apply(peer); + if(! transactionsToSend.isEmpty()) { - logger.debug("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size()); - peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend)); + logger.info("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size()); + JSONObject response = peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend)); + + if(response != null && response.get("error") == null) { + doneFeedingLog.accept(peer, transactionsToSend); + } else { + logger.error("Error feeding {} transactions: {} error: {}", peer.getPeerAddress(), transactionsToSend.stream().map(t -> t.getId()), response); + } } else { - logger.debug("No need to feed {}", peer.getPeerAddress()); + logger.info("No need to feed {}", peer.getPeerAddress()); } beingProcessed.remove(peer); - doneFeedingLog.accept(peer, transactionsToSend); - if(processingQueue.contains(peer)) { processingQueue.remove(peer); beingProcessed.add(peer); @@ -832,8 +838,7 @@ private static void feedPeer(Peer peer, Function> foodDi } } - - private static JSONObject sendUnconfirmedTransactionsRequest(List transactions) { + private static JSONStreamAware sendUnconfirmedTransactionsRequest(List transactions) { JSONObject request = new JSONObject(); JSONArray transactionsData = new JSONArray(); @@ -844,7 +849,7 @@ private static JSONObject sendUnconfirmedTransactionsRequest(List t request.put("requestType", "processTransactions"); request.put("transactions", transactionsData); - return request; + return prepareRequest(request); } private static boolean peerEligibleForSending(Peer peer, boolean sendSameBRSclass) { diff --git a/src/brs/peer/ProcessBlock.java b/src/brs/peer/ProcessBlock.java index bf79d7d33..2d8cdf5ac 100644 --- a/src/brs/peer/ProcessBlock.java +++ b/src/brs/peer/ProcessBlock.java @@ -41,7 +41,7 @@ public JSONStreamAware processRequest(JSONObject request, Peer peer) { // when loading blockchain from scratch return NOT_ACCEPTED; } - blockchainProcessor.processPeerBlock(request); + blockchainProcessor.processPeerBlock(request, peer); return ACCEPTED; } catch (BurstException|RuntimeException e) { diff --git a/src/brs/unconfirmedtransactions/ReservedBalanceCache.java b/src/brs/unconfirmedtransactions/ReservedBalanceCache.java index 67532182d..5ddd326d8 100644 --- a/src/brs/unconfirmedtransactions/ReservedBalanceCache.java +++ b/src/brs/unconfirmedtransactions/ReservedBalanceCache.java @@ -39,11 +39,11 @@ void reserveBalanceAndPut(Transaction transaction) throws BurstException.Validat ); if (senderAccount == null) { - LOGGER.debug(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT)); + LOGGER.info(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT)); throw new BurstException.NotCurrentlyValidException("Account unknown"); } else if ( amountNQT > senderAccount.getUnconfirmedBalanceNQT() ) { - LOGGER.debug(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance", + LOGGER.info(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance", transaction.getId(), transaction.getSenderId(), amountNQT, senderAccount.getUnconfirmedBalanceNQT() )); diff --git a/src/brs/unconfirmedtransactions/UnconfirmedTransactionStore.java b/src/brs/unconfirmedtransactions/UnconfirmedTransactionStore.java index d2f4f458d..40abe3d33 100644 --- a/src/brs/unconfirmedtransactions/UnconfirmedTransactionStore.java +++ b/src/brs/unconfirmedtransactions/UnconfirmedTransactionStore.java @@ -18,20 +18,15 @@ public interface UnconfirmedTransactionStore { List getAllFor(Peer peer); - ///TimedUnconfirmedTransactionOverview getAllSince(long timestampInMillis, long maxAmount); - - void forEach(Consumer consumer); - void remove(Transaction transaction); void clear(); - /** * Review which transactions are still eligible to stay * @return The list of removed transactions */ - List resetAccountBalances(); + void resetAccountBalances(); void markFingerPrintsOf(Peer peer, List transactions); diff --git a/src/brs/unconfirmedtransactions/UnconfirmedTransactionStoreImpl.java b/src/brs/unconfirmedtransactions/UnconfirmedTransactionStoreImpl.java index 81ef9a96b..d26209d5f 100644 --- a/src/brs/unconfirmedtransactions/UnconfirmedTransactionStoreImpl.java +++ b/src/brs/unconfirmedtransactions/UnconfirmedTransactionStoreImpl.java @@ -14,14 +14,13 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -77,70 +76,48 @@ public UnconfirmedTransactionStoreImpl(TimeService timeService, PropertyService @Override public boolean put(Transaction transaction, Peer peer) throws ValidationException { synchronized (internalStore) { - if(transactionIsCurrentlyInCache(transaction)) { - if(peer != null) { - logger.debug("Transaction {}: Added fingerprint of {}", transaction.getId(), peer.getPeerAddress()); + if (transactionIsCurrentlyInCache(transaction)) { + if (peer != null) { + logger.info("Transaction {}: Added fingerprint of {}", transaction.getId(), peer.getPeerAddress()); fingerPrintsOverview.get(transaction).add(peer); } - } else { - if (transactionCanBeAddedToCache(transaction)) { - final TransactionDuplicationResult duplicationInformation = transactionDuplicatesChecker.removeCheaperDuplicate(transaction); + } else if (transactionCanBeAddedToCache(transaction)) { + this.reservedBalanceCache.reserveBalanceAndPut(transaction); - if (duplicationInformation.isDuplicate()) { - final Transaction duplicatedTransaction = duplicationInformation.getTransaction(); + final TransactionDuplicationResult duplicationInformation = transactionDuplicatesChecker.removeCheaperDuplicate(transaction); - if (duplicatedTransaction != null && duplicatedTransaction != transaction) { - logger.debug("Transaction {}: Adding more expensive duplicate transaction", transaction.getId()); - removeTransaction(duplicationInformation.getTransaction()); + if (duplicationInformation.isDuplicate()) { + final Transaction duplicatedTransaction = duplicationInformation.getTransaction(); - addTransaction(transaction, peer); + if (duplicatedTransaction != null && duplicatedTransaction != transaction) { + logger.info("Transaction {}: Adding more expensive duplicate transaction", transaction.getId()); + removeTransaction(duplicationInformation.getTransaction()); + this.reservedBalanceCache.refundBalance(duplicationInformation.getTransaction()); - if (totalSize > maxSize) { - removeCheapestFirstToExpireTransaction(); - } - } else { - logger.debug("Transaction {}: Will not add a cheaper duplicate UT", transaction.getId()); - } - } else { addTransaction(transaction, peer); - logger.debug( - "Cache size: " + totalSize + "/" + maxSize + " Added UT " + transaction.getId() + " " + transaction.getSenderId() + " " + transaction.getAmountNQT() + " " + transaction.getFeeNQT()); + if (totalSize > maxSize) { removeCheapestFirstToExpireTransaction(); } + } else { + logger.info("Transaction {}: Will not add a cheaper duplicate UT", transaction.getId()); } - - return true; } else { - logger.info("Transaction {}: Will not add UT due to duplication, or too full", transaction.getId()); + addTransaction(transaction, peer); + logger.info("Cache size: {}/{} added {} from sender {}", totalSize, maxSize, transaction.getId(), transaction.getSenderId()); } + + if (totalSize > maxSize) { + removeCheapestFirstToExpireTransaction(); + } + + return true; } return false; } } - private boolean transactionCanBeAddedToCache(Transaction transaction) { - return transactionIsCurrentlyValid(transaction) - && ! cacheFullAndTransactionCheaperThanAllTheRest(transaction) - && ! tooManyTransactionsWithReferencedFullHash(transaction) - && ! tooManyTransactionsForSlotSize(transaction); - } - - private boolean tooManyTransactionsForSlotSize(Transaction transaction) { - final long slotHeight = this.amountSlotForTransaction(transaction); - - return this.internalStore.containsKey(slotHeight) && this.internalStore.get(slotHeight).size() == slotHeight * 360; - } - - private boolean tooManyTransactionsWithReferencedFullHash(Transaction transaction) { - return ! StringUtils.isEmpty(transaction.getReferencedTransactionFullHash()) && maxPercentageUnconfirmedTransactionsFullHash <= (((numberUnconfirmedTransactionsFullHash + 1) * 100) / maxSize); - } - - private boolean cacheFullAndTransactionCheaperThanAllTheRest(Transaction transaction) { - return totalSize == maxSize && internalStore.firstKey() > amountSlotForTransaction(transaction); - } - @Override public Transaction get(Long transactionId) { synchronized (internalStore) { @@ -180,7 +157,7 @@ public List getAll() { public List getAllFor(Peer peer) { synchronized (internalStore) { final List untouchedTransactions = fingerPrintsOverview.entrySet().stream() - .filter(e -> ! e.getValue().contains(peer)) + .filter(e -> !e.getValue().contains(peer)) .map(e -> e.getKey()).collect(Collectors.toList()); final ArrayList resultList = new ArrayList<>(); @@ -201,15 +178,6 @@ public List getAllFor(Peer peer) { } } - @Override - public void forEach(Consumer consumer) { - synchronized (internalStore) { - for (List amountSlot : internalStore.values()) { - amountSlot.stream().forEach(consumer); - } - } - } - @Override public void remove(Transaction transaction) { synchronized (internalStore) { @@ -223,7 +191,7 @@ public void remove(Transaction transaction) { @Override public void clear() { synchronized (internalStore) { - logger.debug("Clearing UTStore"); + logger.info("Clearing UTStore"); totalSize = 0; internalStore.clear(); reservedBalanceCache.clear(); @@ -232,9 +200,11 @@ public void clear() { } @Override - public List resetAccountBalances() { + public void resetAccountBalances() { synchronized (internalStore) { - return reservedBalanceCache.rebuild(getAll()); + for(Transaction insufficientFundsTransactions: reservedBalanceCache.rebuild(getAll())) { + this.removeTransaction(insufficientFundsTransactions); + } } } @@ -242,7 +212,7 @@ public List resetAccountBalances() { public void markFingerPrintsOf(Peer peer, List transactions) { synchronized (internalStore) { for (Transaction transaction : transactions) { - if(fingerPrintsOverview.containsKey(transaction)) { + if (fingerPrintsOverview.containsKey(transaction)) { fingerPrintsOverview.get(transaction).add(peer); } } @@ -252,8 +222,8 @@ public void markFingerPrintsOf(Peer peer, List transactions) { @Override public void removeForgedTransactions(List transactions) { synchronized (internalStore) { - for(Transaction t:transactions) { - if(exists(t.getId())) { + for (Transaction t : transactions) { + if (exists(t.getId())) { removeTransaction(t); } } @@ -265,22 +235,65 @@ private boolean transactionIsCurrentlyInCache(Transaction transaction) { return amountSlot != null && amountSlot.stream().anyMatch(t -> t.getId() == transaction.getId()); } - private void addTransaction(Transaction transaction, Peer peer) throws ValidationException { - this.reservedBalanceCache.reserveBalanceAndPut(transaction); + private boolean transactionCanBeAddedToCache(Transaction transaction) { + return transactionIsCurrentlyNotExpired(transaction) + && !cacheFullAndTransactionCheaperThanAllTheRest(transaction) + && !tooManyTransactionsWithReferencedFullHash(transaction) + && !tooManyTransactionsForSlotSize(transaction); + } + + private boolean tooManyTransactionsForSlotSize(Transaction transaction) { + final long slotHeight = this.amountSlotForTransaction(transaction); + + if (this.internalStore.containsKey(slotHeight) && this.internalStore.get(slotHeight).size() == slotHeight * 360) { + logger.info("Transaction {}: Not added because slot {} is full", transaction.getId(), slotHeight); + return true; + } + + return false; + } + + private boolean tooManyTransactionsWithReferencedFullHash(Transaction transaction) { + if (!StringUtils.isEmpty(transaction.getReferencedTransactionFullHash()) && maxPercentageUnconfirmedTransactionsFullHash <= (((numberUnconfirmedTransactionsFullHash + 1) * 100) / maxSize)) { + logger.info("Transaction {}: Not added because too many transactions with referenced full hash", transaction.getId()); + return true; + } + + return false; + } + + private boolean cacheFullAndTransactionCheaperThanAllTheRest(Transaction transaction) { + if (totalSize == maxSize && internalStore.firstKey() > amountSlotForTransaction(transaction)) { + logger.info("Transaction {}: Not added because cache is full and transaction is cheaper than all the rest", transaction.getId()); + return true; + } + return false; + } + + private boolean transactionIsCurrentlyNotExpired(Transaction transaction) { + if (timeService.getEpochTime() < transaction.getExpiration()) { + return true; + } else { + logger.info("Transaction {} past expiration: {}", transaction.getId(), transaction.getExpiration()); + return false; + } + } + + private void addTransaction(Transaction transaction, Peer peer) throws ValidationException { final List slot = getOrCreateAmountSlotForTransaction(transaction); slot.add(transaction); totalSize++; fingerPrintsOverview.put(transaction, new HashSet<>()); - if(peer != null) { + if (peer != null) { fingerPrintsOverview.get(transaction).add(peer); } logger.debug("Adding Transaction {} from Peer {}", transaction.getId(), (peer == null ? "Ourself" : peer.getPeerAddress())); - if(! StringUtils.isEmpty(transaction.getReferencedTransactionFullHash())) { + if (!StringUtils.isEmpty(transaction.getReferencedTransactionFullHash())) { numberUnconfirmedTransactionsFullHash++; } } @@ -301,18 +314,13 @@ private long amountSlotForTransaction(Transaction transaction) { } private void removeCheapestFirstToExpireTransaction() { - this.internalStore.get(this.internalStore.firstKey()).stream() - //.map(UnconfirmedTransactionTiming::getTransaction) + final Optional cheapestFirstToExpireTransaction = this.internalStore.get(this.internalStore.firstKey()).stream() .sorted(Comparator.comparingLong(Transaction::getFeeNQT).thenComparing(Transaction::getExpiration).thenComparing(Transaction::getId)) - .findFirst().ifPresent(t -> removeTransaction(t)); - } + .findFirst(); - private boolean transactionIsCurrentlyValid(Transaction transaction) { - if(timeService.getEpochTime() < transaction.getExpiration()) { - return true; - } else { - logger.debug("Transaction {} past expiration: {}", transaction.getId(), transaction.getExpiration()); - return false; + if (cheapestFirstToExpireTransaction.isPresent()) { + reservedBalanceCache.refundBalance(cheapestFirstToExpireTransaction.get()); + removeTransaction(cheapestFirstToExpireTransaction.get()); } } @@ -321,23 +329,13 @@ private void removeTransaction(Transaction transaction) { final List amountSlot = internalStore.get(amountSlotNumber); - final Iterator transactionSlotIterator = amountSlot.iterator(); - fingerPrintsOverview.remove(transaction); + amountSlot.remove(transaction); + totalSize--; + transactionDuplicatesChecker.removeTransaction(transaction); - while (transactionSlotIterator.hasNext()) { - final Transaction utt = transactionSlotIterator.next(); - if (utt.getId() == transaction.getId()) { - transactionSlotIterator.remove(); - transactionDuplicatesChecker.removeTransaction(transaction); - this.reservedBalanceCache.refundBalance(transaction); - totalSize--; - - if(! StringUtils.isEmpty(transaction.getReferencedTransactionFullHash())) { - numberUnconfirmedTransactionsFullHash--; - } - break; - } + if (!StringUtils.isEmpty(transaction.getReferencedTransactionFullHash())) { + numberUnconfirmedTransactionsFullHash--; } if (amountSlot.isEmpty()) {