diff --git a/WalletWasabi.Backend/Controllers/BlockchainController.cs b/WalletWasabi.Backend/Controllers/BlockchainController.cs index f0db8879cf2..d41acaaf9d7 100644 --- a/WalletWasabi.Backend/Controllers/BlockchainController.cs +++ b/WalletWasabi.Backend/Controllers/BlockchainController.cs @@ -2,11 +2,15 @@ using Microsoft.Extensions.Caching.Memory; using NBitcoin; using NBitcoin.RPC; +using Newtonsoft.Json.Linq; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Net.Cache; using System.Threading; using System.Threading.Tasks; using WalletWasabi.Backend.Models; @@ -32,6 +36,7 @@ public class BlockchainController : ControllerBase { public static readonly TimeSpan FilterTimeout = TimeSpan.FromMinutes(20); private static readonly MemoryCacheEntryOptions CacheEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(60) }; + private static MemoryCacheEntryOptions TransactionCacheOptions { get; } = new MemoryCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(20) }; public BlockchainController(IMemoryCache memoryCache, Global global) { @@ -42,8 +47,6 @@ public BlockchainController(IMemoryCache memoryCache, Global global) private IRPCClient RpcClient => Global.RpcClient; private Network Network => Global.Config.Network; - public static Dictionary TransactionHexCache { get; } = new(); - public static object TransactionHexCacheLock { get; } = new(); public IdempotencyRequestCache Cache { get; } public Global Global { get; } @@ -142,23 +145,20 @@ private async Task> GetRawMempoolStringsNoCacheAsync(Cancell [ProducesResponseType(400)] public async Task GetTransactionsAsync([FromQuery, Required] IEnumerable transactionIds, CancellationToken cancellationToken) { - var maxTxToRequest = 10; - if (transactionIds.Count() > maxTxToRequest) + const int MaxTxToRequest = 10; + int requestCount = transactionIds.Count(); + + if (requestCount > MaxTxToRequest) { - return BadRequest($"Maximum {maxTxToRequest} transactions can be requested."); + return BadRequest($"Maximum {MaxTxToRequest} transactions can be requested."); } - var parsedIds = new List(); + uint256[] parsedTxIds; + + // Make sure TXIDs are not malformed. try { - // Remove duplicates, do not use Distinct(), order is not guaranteed. - foreach (var txid in transactionIds.Select(x => new uint256(x))) - { - if (!parsedIds.Contains(txid)) - { - parsedIds.Add(txid); - } - } + parsedTxIds = transactionIds.Select(x => new uint256(x)).ToArray(); } catch { @@ -167,48 +167,83 @@ public async Task GetTransactionsAsync([FromQuery, Required] IEnu try { - var hexes = new Dictionary(); - List missingTxs = new(); - lock (TransactionHexCacheLock) + Transaction[] txs = await FetchTransactionsAsync(parsedTxIds, cancellationToken).ConfigureAwait(false); + string[] hexes = txs.Select(x => x.ToHex()).ToArray(); + + return Ok(hexes); + } + catch (Exception ex) + { + Logger.LogDebug(ex); + return BadRequest(ex.Message); + } + } + + /// + /// Fetches transactions from cache if possible and missing transactions are fetched using RPC. + /// + private async Task FetchTransactionsAsync(uint256[] txIds, CancellationToken cancellationToken) + { + int requestCount = txIds.Length; + Dictionary> txIdsRetrieve = []; + TaskCompletionSource[] txsCompletionSources = new TaskCompletionSource[requestCount]; + + try + { + // Get task completion sources for transactions. They are either new (no one else is getting that transaction right now) or existing + // and then some other caller needs the same transaction so we can use the existing task completion source. + for (int i = 0; i < requestCount; i++) { - foreach (var txid in parsedIds) + uint256 txId = txIds[i]; + string cacheKey = $"{nameof(GetTransactionsAsync)}#{txId}"; + + if (Cache.TryAddKey(cacheKey, TransactionCacheOptions, out TaskCompletionSource tcs)) { - if (TransactionHexCache.TryGetValue(txid, out string? hex)) - { - hexes.Add(txid, hex); - } - else - { - missingTxs.Add(txid); - } + txIdsRetrieve.Add(txId, tcs); } + + txsCompletionSources[i] = tcs; } - if (missingTxs.Count != 0) + if (txIdsRetrieve.Count > 0) { - foreach (var tx in await RpcClient.GetRawTransactionsAsync(missingTxs, cancellationToken)) - { - string hex = tx.ToHex(); - hexes.Add(tx.GetHash(), hex); + // Ask to get missing transactions over RPC. + IEnumerable txs = await RpcClient.GetRawTransactionsAsync(txIdsRetrieve.Keys, cancellationToken).ConfigureAwait(false); + Dictionary rpcBatch = txs.ToDictionary(x => x.GetHash(), x => x); - lock (TransactionHexCacheLock) - { - if (TransactionHexCache.TryAdd(tx.GetHash(), hex) && TransactionHexCache.Count >= 1000) - { - TransactionHexCache.Remove(TransactionHexCache.Keys.First()); - } - } + foreach (KeyValuePair kvp in rpcBatch) + { + txIdsRetrieve[kvp.Key].TrySetResult(kvp.Value); } } - // Order hexes according to the order of the query. - var orderedResult = parsedIds.Where(x => hexes.ContainsKey(x)).Select(x => hexes[x]); - return Ok(orderedResult); + Transaction[] result = new Transaction[requestCount]; + + // Add missing transactions to the result array. + for (int i = 0; i < requestCount; i++) + { + Transaction tx = await txsCompletionSources[i].Task.ConfigureAwait(false); + result[i] = tx; + } + + return result; } - catch (Exception ex) + finally { - Logger.LogDebug(ex); - return BadRequest(ex.Message); + if (txIdsRetrieve.Count > 0) + { + // It's necessary to always set a result to the task completion sources. Otherwise, cache can get corrupted. + Exception ex = new InvalidOperationException("Failed to get the transaction."); + foreach ((uint256 txid, TaskCompletionSource tcs) in txIdsRetrieve) + { + if (!tcs.Task.IsCompleted) + { + // Prefer new cache requests to try again rather than getting the exception. The window is small though. + Cache.Remove(txid); + tcs.SetException(ex); + } + } + } } } diff --git a/WalletWasabi.Tests/UnitTests/Cache/IdempotencyRequestCacheTests.cs b/WalletWasabi.Tests/UnitTests/Cache/IdempotencyRequestCacheTests.cs index c2cff83e079..05cb7109da0 100644 --- a/WalletWasabi.Tests/UnitTests/Cache/IdempotencyRequestCacheTests.cs +++ b/WalletWasabi.Tests/UnitTests/Cache/IdempotencyRequestCacheTests.cs @@ -1,5 +1,8 @@ using Microsoft.Extensions.Caching.Memory; using NBitcoin; +using NBitcoin.RPC; +using System.Diagnostics; +using System.Net.Http; using System.Threading; using System.Threading.Tasks; using WalletWasabi.Cache; @@ -13,6 +16,9 @@ namespace WalletWasabi.Tests.UnitTests.Cache; /// public class IdempotencyRequestCacheTests { + /// A bitcoin transaction in HEX format. + private const string TransactionHex = "0200000001268171371edff285e937adeea4b37b78000c0566cbb3ad64641713ca42171bf6000000006a473044022070b2245123e6bf474d60c5b50c043d4c691a5d2435f09a34a7662a9dc251790a022001329ca9dacf280bdf30740ec0390422422c81cb45839457aeb76fc12edd95b3012102657d118d3357b8e0f4c2cd46db7b39f6d9c38d9a70abcb9b2de5dc8dbfe4ce31feffffff02d3dff505000000001976a914d0c59903c5bac2868760e90fd521a4665aa7652088ac00e1f5050000000017a9143545e6e33b832c47050f24d3eeb93c9c03948bc787b32e1300"; + /// /// Very basic test that a correct response is returned. /// @@ -33,6 +39,46 @@ public async Task BasicCacheBehaviorAsync() Assert.Same(preparedResponse, response); // Compare by reference. } + /// + /// Tests . + /// + [Fact] + public async Task TryAddAsync() + { + using CancellationTokenSource testDeadlineCts = new(TimeSpan.FromMinutes(1)); + + MemoryCacheEntryOptions memoryCacheEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(30) }; + using MemoryCache memoryCache = new(new MemoryCacheOptions()); + IdempotencyRequestCache cache = new(memoryCache); + + string txid = "f461a0c37b41d0fdb64dd02969330fefd6f0a6c64428788be012c989c9795e56"; + Assert.True(cache.TryAddKey(txid, memoryCacheEntryOptions, out TaskCompletionSource responseTcs)); + + Task cacheTask = cache.GetCachedResponseAsync( + request: txid, + action: (string txid, CancellationToken cancellationToken) => throw new UnreachableException(), + testDeadlineCts.Token); + + Assert.False(cacheTask.IsCompleted); + + // The order of the following two lines is important. Failing to do the operations in this order can lead to errors. + cache.Remove(txid); + responseTcs.TrySetException(new RPCException(RPCErrorCode.RPC_CLIENT_NOT_CONNECTED, "Not connected", null!)); + + // RPCException is supposed to be thrown by the task and not UnreachableException because we registered using TryAddKey first. + await Assert.ThrowsAsync(async () => await cacheTask.ConfigureAwait(false)); + + // Repeat the cache request. Now without TryAddKey. + Transaction expectedTransaction = Transaction.Parse(TransactionHex, Network.Main); + + Transaction actualTransaction = await cache.GetCachedResponseAsync( + request: txid, + action: (string txid, CancellationToken cancellationToken) => Task.FromResult(expectedTransaction), + testDeadlineCts.Token); + + Assert.Same(expectedTransaction, actualTransaction); + } + /// /// Simulates two simultaneous requests and both have the same chance of succeeding. /// One of the requests should be served from cache. diff --git a/WalletWasabi/Cache/IdempotencyRequestCache.cs b/WalletWasabi/Cache/IdempotencyRequestCache.cs index 1d557d9924f..9a9dc2381dd 100644 --- a/WalletWasabi/Cache/IdempotencyRequestCache.cs +++ b/WalletWasabi/Cache/IdempotencyRequestCache.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.Caching.Memory; using System.Threading; using System.Threading.Tasks; -using WalletWasabi.Extensions; namespace WalletWasabi.Cache; @@ -28,6 +27,29 @@ public IdempotencyRequestCache(IMemoryCache cache) /// Guarded by . private IMemoryCache ResponseCache { get; } + /// + /// Tries to add the cache key to cache to avoid other callers to add such a key in parallel. + /// + /// true if the key was added to the cache, false otherwise. + /// Caller is responsible to ALWAYS set a result to even if an exception is thrown. + public bool TryAddKey(TRequest cacheKey, MemoryCacheEntryOptions options, out TaskCompletionSource responseTcs) + where TRequest : notnull + { + lock (ResponseCacheLock) + { + if (!ResponseCache.TryGetValue(cacheKey, out TaskCompletionSource? tcs)) + { + responseTcs = new(); + ResponseCache.Set(cacheKey, responseTcs, options); + + return true; + } + + responseTcs = tcs!; + return false; + } + } + /// /// s are preferred as /// and are generated for types automatically. @@ -47,18 +69,7 @@ public Task GetCachedResponseAsync(TRequest requ public async Task GetCachedResponseAsync(TRequest request, ProcessRequestDelegateAsync action, MemoryCacheEntryOptions options, CancellationToken cancellationToken) where TRequest : notnull { - bool callAction = false; - TaskCompletionSource? responseTcs; - - lock (ResponseCacheLock) - { - if (!ResponseCache.TryGetValue(request, out responseTcs)) - { - callAction = true; - responseTcs = new(); - ResponseCache.Set(request, responseTcs, options); - } - } + bool callAction = TryAddKey(request, options, out TaskCompletionSource? responseTcs); if (callAction) { @@ -85,10 +96,11 @@ public async Task GetCachedResponseAsync(TReques } /// - /// For testing purposes only. + /// Use after if that request + /// failed with an exception. /// Note that if there is a simultaneous request for the cache key, it is not stopped and its result is discarded. /// - internal void Remove(TRequest cacheKey) + public void Remove(TRequest cacheKey) where TRequest : notnull { lock (ResponseCacheLock)