Skip to content

Commit

Permalink
Merge pull request WalletWasabi#12275 from kiminuo/feature/2024-01-18…
Browse files Browse the repository at this point in the history
…-gettxcache

Use cache in `BlockchainController.GetTransactionsAsync` (take 2)
  • Loading branch information
molnard authored Mar 18, 2024
2 parents c6b7993 + 70a9b75 commit 8fa5f97
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 59 deletions.
123 changes: 79 additions & 44 deletions WalletWasabi.Backend/Controllers/BlockchainController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
using Microsoft.Extensions.Caching.Memory;
using NBitcoin;
using NBitcoin.RPC;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net.Cache;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Backend.Models;
Expand All @@ -32,6 +36,7 @@ public class BlockchainController : ControllerBase
{
public static readonly TimeSpan FilterTimeout = TimeSpan.FromMinutes(20);
private static readonly MemoryCacheEntryOptions CacheEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(60) };
private static MemoryCacheEntryOptions TransactionCacheOptions { get; } = new MemoryCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(20) };

public BlockchainController(IMemoryCache memoryCache, Global global)
{
Expand All @@ -42,8 +47,6 @@ public BlockchainController(IMemoryCache memoryCache, Global global)
private IRPCClient RpcClient => Global.RpcClient;
private Network Network => Global.Config.Network;

public static Dictionary<uint256, string> TransactionHexCache { get; } = new();
public static object TransactionHexCacheLock { get; } = new();
public IdempotencyRequestCache Cache { get; }

public Global Global { get; }
Expand Down Expand Up @@ -142,23 +145,20 @@ private async Task<IEnumerable<string>> GetRawMempoolStringsNoCacheAsync(Cancell
[ProducesResponseType(400)]
public async Task<IActionResult> GetTransactionsAsync([FromQuery, Required] IEnumerable<string> transactionIds, CancellationToken cancellationToken)
{
var maxTxToRequest = 10;
if (transactionIds.Count() > maxTxToRequest)
const int MaxTxToRequest = 10;
int requestCount = transactionIds.Count();

if (requestCount > MaxTxToRequest)
{
return BadRequest($"Maximum {maxTxToRequest} transactions can be requested.");
return BadRequest($"Maximum {MaxTxToRequest} transactions can be requested.");
}

var parsedIds = new List<uint256>();
uint256[] parsedTxIds;

// Make sure TXIDs are not malformed.
try
{
// Remove duplicates, do not use Distinct(), order is not guaranteed.
foreach (var txid in transactionIds.Select(x => new uint256(x)))
{
if (!parsedIds.Contains(txid))
{
parsedIds.Add(txid);
}
}
parsedTxIds = transactionIds.Select(x => new uint256(x)).ToArray();
}
catch
{
Expand All @@ -167,48 +167,83 @@ public async Task<IActionResult> GetTransactionsAsync([FromQuery, Required] IEnu

try
{
var hexes = new Dictionary<uint256, string>();
List<uint256> missingTxs = new();
lock (TransactionHexCacheLock)
Transaction[] txs = await FetchTransactionsAsync(parsedTxIds, cancellationToken).ConfigureAwait(false);
string[] hexes = txs.Select(x => x.ToHex()).ToArray();

return Ok(hexes);
}
catch (Exception ex)
{
Logger.LogDebug(ex);
return BadRequest(ex.Message);
}
}

/// <summary>
/// Fetches transactions from cache if possible and missing transactions are fetched using RPC.
/// </summary>
private async Task<Transaction[]> FetchTransactionsAsync(uint256[] txIds, CancellationToken cancellationToken)
{
int requestCount = txIds.Length;
Dictionary<uint256, TaskCompletionSource<Transaction>> txIdsRetrieve = [];
TaskCompletionSource<Transaction>[] txsCompletionSources = new TaskCompletionSource<Transaction>[requestCount];

try
{
// Get task completion sources for transactions. They are either new (no one else is getting that transaction right now) or existing
// and then some other caller needs the same transaction so we can use the existing task completion source.
for (int i = 0; i < requestCount; i++)
{
foreach (var txid in parsedIds)
uint256 txId = txIds[i];
string cacheKey = $"{nameof(GetTransactionsAsync)}#{txId}";

if (Cache.TryAddKey(cacheKey, TransactionCacheOptions, out TaskCompletionSource<Transaction> tcs))
{
if (TransactionHexCache.TryGetValue(txid, out string? hex))
{
hexes.Add(txid, hex);
}
else
{
missingTxs.Add(txid);
}
txIdsRetrieve.Add(txId, tcs);
}

txsCompletionSources[i] = tcs;
}

if (missingTxs.Count != 0)
if (txIdsRetrieve.Count > 0)
{
foreach (var tx in await RpcClient.GetRawTransactionsAsync(missingTxs, cancellationToken))
{
string hex = tx.ToHex();
hexes.Add(tx.GetHash(), hex);
// Ask to get missing transactions over RPC.
IEnumerable<Transaction> txs = await RpcClient.GetRawTransactionsAsync(txIdsRetrieve.Keys, cancellationToken).ConfigureAwait(false);
Dictionary<uint256, Transaction> rpcBatch = txs.ToDictionary(x => x.GetHash(), x => x);

lock (TransactionHexCacheLock)
{
if (TransactionHexCache.TryAdd(tx.GetHash(), hex) && TransactionHexCache.Count >= 1000)
{
TransactionHexCache.Remove(TransactionHexCache.Keys.First());
}
}
foreach (KeyValuePair<uint256, Transaction> kvp in rpcBatch)
{
txIdsRetrieve[kvp.Key].TrySetResult(kvp.Value);
}
}

// Order hexes according to the order of the query.
var orderedResult = parsedIds.Where(x => hexes.ContainsKey(x)).Select(x => hexes[x]);
return Ok(orderedResult);
Transaction[] result = new Transaction[requestCount];

// Add missing transactions to the result array.
for (int i = 0; i < requestCount; i++)
{
Transaction tx = await txsCompletionSources[i].Task.ConfigureAwait(false);
result[i] = tx;
}

return result;
}
catch (Exception ex)
finally
{
Logger.LogDebug(ex);
return BadRequest(ex.Message);
if (txIdsRetrieve.Count > 0)
{
// It's necessary to always set a result to the task completion sources. Otherwise, cache can get corrupted.
Exception ex = new InvalidOperationException("Failed to get the transaction.");
foreach ((uint256 txid, TaskCompletionSource<Transaction> tcs) in txIdsRetrieve)
{
if (!tcs.Task.IsCompleted)
{
// Prefer new cache requests to try again rather than getting the exception. The window is small though.
Cache.Remove(txid);
tcs.SetException(ex);
}
}
}
}
}

Expand Down
46 changes: 46 additions & 0 deletions WalletWasabi.Tests/UnitTests/Cache/IdempotencyRequestCacheTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using Microsoft.Extensions.Caching.Memory;
using NBitcoin;
using NBitcoin.RPC;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Cache;
Expand All @@ -13,6 +16,9 @@ namespace WalletWasabi.Tests.UnitTests.Cache;
/// </summary>
public class IdempotencyRequestCacheTests
{
/// <summary>A bitcoin transaction in HEX format.</summary>
private const string TransactionHex = "0200000001268171371edff285e937adeea4b37b78000c0566cbb3ad64641713ca42171bf6000000006a473044022070b2245123e6bf474d60c5b50c043d4c691a5d2435f09a34a7662a9dc251790a022001329ca9dacf280bdf30740ec0390422422c81cb45839457aeb76fc12edd95b3012102657d118d3357b8e0f4c2cd46db7b39f6d9c38d9a70abcb9b2de5dc8dbfe4ce31feffffff02d3dff505000000001976a914d0c59903c5bac2868760e90fd521a4665aa7652088ac00e1f5050000000017a9143545e6e33b832c47050f24d3eeb93c9c03948bc787b32e1300";

/// <summary>
/// Very basic test that a correct response is returned.
/// </summary>
Expand All @@ -33,6 +39,46 @@ public async Task BasicCacheBehaviorAsync()
Assert.Same(preparedResponse, response); // Compare by reference.
}

/// <summary>
/// Tests <see cref="IdempotencyRequestCache.TryAddKey{TRequest, TResponse}(TRequest, MemoryCacheEntryOptions, out TaskCompletionSource{TResponse})"/>.
/// </summary>
[Fact]
public async Task TryAddAsync()
{
using CancellationTokenSource testDeadlineCts = new(TimeSpan.FromMinutes(1));

MemoryCacheEntryOptions memoryCacheEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(30) };
using MemoryCache memoryCache = new(new MemoryCacheOptions());
IdempotencyRequestCache cache = new(memoryCache);

string txid = "f461a0c37b41d0fdb64dd02969330fefd6f0a6c64428788be012c989c9795e56";
Assert.True(cache.TryAddKey(txid, memoryCacheEntryOptions, out TaskCompletionSource<Transaction> responseTcs));

Task<Transaction> cacheTask = cache.GetCachedResponseAsync<string, Transaction>(
request: txid,
action: (string txid, CancellationToken cancellationToken) => throw new UnreachableException(),
testDeadlineCts.Token);

Assert.False(cacheTask.IsCompleted);

// The order of the following two lines is important. Failing to do the operations in this order can lead to errors.
cache.Remove(txid);
responseTcs.TrySetException(new RPCException(RPCErrorCode.RPC_CLIENT_NOT_CONNECTED, "Not connected", null!));

// RPCException is supposed to be thrown by the task and not UnreachableException because we registered using TryAddKey first.
await Assert.ThrowsAsync<RPCException>(async () => await cacheTask.ConfigureAwait(false));

// Repeat the cache request. Now without TryAddKey.
Transaction expectedTransaction = Transaction.Parse(TransactionHex, Network.Main);

Transaction actualTransaction = await cache.GetCachedResponseAsync(
request: txid,
action: (string txid, CancellationToken cancellationToken) => Task.FromResult(expectedTransaction),
testDeadlineCts.Token);

Assert.Same(expectedTransaction, actualTransaction);
}

/// <summary>
/// Simulates two simultaneous requests and both have the same chance of succeeding.
/// One of the requests should be served from cache.
Expand Down
42 changes: 27 additions & 15 deletions WalletWasabi/Cache/IdempotencyRequestCache.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Microsoft.Extensions.Caching.Memory;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Extensions;

namespace WalletWasabi.Cache;

Expand All @@ -28,6 +27,29 @@ public IdempotencyRequestCache(IMemoryCache cache)
/// <remarks>Guarded by <see cref="ResponseCacheLock"/>.</remarks>
private IMemoryCache ResponseCache { get; }

/// <summary>
/// Tries to add the cache key to cache to avoid other callers to add such a key in parallel.
/// </summary>
/// <returns><c>true</c> if the key was added to the cache, <c>false</c> otherwise.</returns>
/// <remarks>Caller is responsible to ALWAYS set a result to <paramref name="responseTcs"/> even if an exception is thrown.</remarks>
public bool TryAddKey<TRequest, TResponse>(TRequest cacheKey, MemoryCacheEntryOptions options, out TaskCompletionSource<TResponse> responseTcs)
where TRequest : notnull
{
lock (ResponseCacheLock)
{
if (!ResponseCache.TryGetValue(cacheKey, out TaskCompletionSource<TResponse>? tcs))
{
responseTcs = new();
ResponseCache.Set(cacheKey, responseTcs, options);

return true;
}

responseTcs = tcs!;
return false;
}
}

/// <typeparam name="TRequest">
/// <see langword="record"/>s are preferred as <see cref="object.GetHashCode"/>
/// and <see cref="object.Equals(object?)"/> are generated for <see langword="record"/> types automatically.
Expand All @@ -47,18 +69,7 @@ public Task<TResponse> GetCachedResponseAsync<TRequest, TResponse>(TRequest requ
public async Task<TResponse> GetCachedResponseAsync<TRequest, TResponse>(TRequest request, ProcessRequestDelegateAsync<TRequest, TResponse> action, MemoryCacheEntryOptions options, CancellationToken cancellationToken)
where TRequest : notnull
{
bool callAction = false;
TaskCompletionSource<TResponse>? responseTcs;

lock (ResponseCacheLock)
{
if (!ResponseCache.TryGetValue(request, out responseTcs))
{
callAction = true;
responseTcs = new();
ResponseCache.Set(request, responseTcs, options);
}
}
bool callAction = TryAddKey(request, options, out TaskCompletionSource<TResponse>? responseTcs);

if (callAction)
{
Expand All @@ -85,10 +96,11 @@ public async Task<TResponse> GetCachedResponseAsync<TRequest, TResponse>(TReques
}

/// <remarks>
/// For testing purposes only.
/// Use after <see cref="TryAddKey{TRequest, TResponse}(TRequest, MemoryCacheEntryOptions, out TaskCompletionSource{TResponse})"/> if that request
/// failed with an exception.
/// <para>Note that if there is a simultaneous request for the cache key, it is not stopped and its result is discarded.</para>
/// </remarks>
internal void Remove<TRequest>(TRequest cacheKey)
public void Remove<TRequest>(TRequest cacheKey)
where TRequest : notnull
{
lock (ResponseCacheLock)
Expand Down

0 comments on commit 8fa5f97

Please sign in to comment.