Skip to content

Commit

Permalink
Use sqlite for backend filters (WalletWasabi#13393)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolay authored Sep 14, 2024
1 parent 5e82434 commit 1579876
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 97 deletions.
2 changes: 1 addition & 1 deletion WalletWasabi.Backend/Global.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Global(string dataDir, IRPCClient rpcClient, Config config)

// Initialize index building
var indexBuilderServiceDir = Path.Combine(DataDir, "IndexBuilderService");
var indexFilePath = Path.Combine(indexBuilderServiceDir, $"Index{RpcClient.Network}.dat");
var indexFilePath = Path.Combine(indexBuilderServiceDir, $"Index{RpcClient.Network}.sqlite");
IndexBuilderService = new(RpcClient, HostedServices.Get<BlockNotifier>(), indexFilePath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ public async Task SegwitTaprootUnsynchronizedBitcoinNodeAsync()
}),
};
using var blockNotifier = new BlockNotifier(rpc);
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.txt");
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.sqlite");

indexer.Synchronize();

await Task.Delay(TimeSpan.FromSeconds(1));
//// Assert.False(indexer.IsRunning); // <------------ ERROR: it should have stopped but there is a bug for RegTest
Assert.Throws<ArgumentOutOfRangeException>(() => indexer.GetLastFilter()); // There are no filters
// There is only starting filter
Assert.True(indexer.GetLastFilter()?.Header.BlockHash.Equals(StartingFilters.GetStartingFilter(rpc.Network).Header.BlockHash));
}

[Fact]
Expand All @@ -55,13 +56,14 @@ public async Task SegwitTaprootStalledBitcoinNodeAsync()
}
};
using var blockNotifier = new BlockNotifier(rpc);
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.txt");
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.sqlite");

indexer.Synchronize();

await Task.Delay(TimeSpan.FromSeconds(2));
Assert.True(indexer.IsRunning); // It is still working
Assert.Throws<ArgumentOutOfRangeException>(() => indexer.GetLastFilter()); // There are no filters
// There is only starting filter
Assert.True(indexer.GetLastFilter()?.Header.BlockHash.Equals(StartingFilters.GetStartingFilter(rpc.Network).Header.BlockHash));
Assert.True(called > 1);
}

Expand Down Expand Up @@ -94,7 +96,7 @@ public async Task SegwitTaprootSynchronizingBitcoinNodeAsync()
Assert.True(indexer.IsRunning); // It is still working

var lastFilter = indexer.GetLastFilter();
Assert.Equal(9, (int)lastFilter.Header.Height);
Assert.Equal(9, (int)lastFilter!.Header.Height);
Assert.True(called > 1);
}

Expand Down Expand Up @@ -182,13 +184,14 @@ public async Task TaprootUnsynchronizedBitcoinNodeAsync()
}),
};
using var blockNotifier = new BlockNotifier(rpc);
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.txt");
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.sqlite");

indexer.Synchronize();

await Task.Delay(TimeSpan.FromSeconds(1));
//// Assert.False(indexer.IsRunning); // <------------ ERROR: it should have stopped but there is a bug for RegTest
Assert.Throws<ArgumentOutOfRangeException>(() => indexer.GetLastFilter()); // There are no filters
// There is only starting filter
Assert.True(indexer.GetLastFilter()?.Header.BlockHash.Equals(StartingFilters.GetStartingFilter(rpc.Network).Header.BlockHash));
}

[Fact]
Expand All @@ -209,13 +212,14 @@ public async Task TaprootStalledBitcoinNodeAsync()
}
};
using var blockNotifier = new BlockNotifier(rpc);
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.txt");
var indexer = new IndexBuilderService(rpc, blockNotifier, "filters.sqlite");

indexer.Synchronize();

await Task.Delay(TimeSpan.FromSeconds(2));
Assert.True(indexer.IsRunning); // It is still working
Assert.Throws<ArgumentOutOfRangeException>(() => indexer.GetLastFilter()); // There are no filters
// There is only starting filter
Assert.True(indexer.GetLastFilter()?.Header.BlockHash.Equals(StartingFilters.GetStartingFilter(rpc.Network).Header.BlockHash));
Assert.True(called > 1);
}

Expand Down Expand Up @@ -248,7 +252,7 @@ public async Task TaprootSynchronizingBitcoinNodeAsync()
Assert.True(indexer.IsRunning); // It is still working

var lastFilter = indexer.GetLastFilter();
Assert.Equal(9, (int)lastFilter.Header.Height);
Assert.Equal(9, (int)lastFilter!.Header.Height);
Assert.True(called > 1);
}

Expand Down
128 changes: 50 additions & 78 deletions WalletWasabi/Blockchain/BlockFilters/IndexBuilderService.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using NBitcoin;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.Sqlite;
using WalletWasabi.Backend.Models;
using WalletWasabi.BitcoinCore.Rpc;
using WalletWasabi.BitcoinCore.Rpc.Models;
using WalletWasabi.Blockchain.Blocks;
using WalletWasabi.Helpers;
using WalletWasabi.Logging;
using WalletWasabi.Models;
using WalletWasabi.Stores;

namespace WalletWasabi.Blockchain.BlockFilters;

Expand Down Expand Up @@ -41,31 +42,12 @@ public IndexBuilderService(IRPCClient rpc, BlockNotifier blockNotifier, string i

IoHelpers.EnsureContainingDirectoryExists(IndexFilePath);

// Testing permissions.
using (var _ = File.Open(IndexFilePath, FileMode.OpenOrCreate, FileAccess.ReadWrite))
if (RpcClient.Network == Network.RegTest && File.Exists(IndexFilePath))
{
File.Delete(IndexFilePath); // RegTest is not a global ledger, better to delete it.
}

if (File.Exists(IndexFilePath))
{
if (RpcClient.Network == Network.RegTest)
{
File.Delete(IndexFilePath); // RegTest is not a global ledger, better to delete it.
}
else
{
ImmutableList<FilterModel>.Builder builder = ImmutableList.CreateBuilder<FilterModel>();

foreach (var line in File.ReadAllLines(IndexFilePath))
{
var filter = FilterModel.FromLine(line);
builder.Add(filter);
}

Index = builder.ToImmutableList();
}
}

IndexStorage = CreateBlockFilterSqliteStorage();
BlockNotifier.OnBlock += BlockNotifier_OnBlock;
}

Expand All @@ -74,7 +56,7 @@ public IndexBuilderService(IRPCClient rpc, BlockNotifier blockNotifier, string i
private IRPCClient RpcClient { get; }
private BlockNotifier BlockNotifier { get; }
private string IndexFilePath { get; }
private ImmutableList<FilterModel> Index { get; set; } = ImmutableList<FilterModel>.Empty;
private BlockFilterSqliteStorage IndexStorage { get; set; }

/// <remarks>Guards <see cref="Index"/>.</remarks>
private object IndexLock { get; } = new();
Expand All @@ -85,6 +67,21 @@ public IndexBuilderService(IRPCClient rpc, BlockNotifier blockNotifier, string i

private RpcPubkeyType[] PubKeyTypes { get; } = [RpcPubkeyType.TxWitnessV0Keyhash, RpcPubkeyType.TxWitnessV1Taproot];

private BlockFilterSqliteStorage CreateBlockFilterSqliteStorage()
{
try
{
return BlockFilterSqliteStorage.FromFile(dataSource: IndexFilePath, startingFilter: StartingFilters.GetStartingFilter(RpcClient.Network));
}
catch (SqliteException ex) when (ex.SqliteExtendedErrorCode == 11) // 11 ~ SQLITE_CORRUPT error code
{
Logger.LogError($"Failed to open SQLite storage file because it's corrupted. Deleting the storage file '{IndexFilePath}'.");

File.Delete(IndexFilePath);
throw;
}
}

public static GolombRiceFilter CreateDummyEmptyFilter(uint256 blockHash)
{
return new GolombRiceFilterBuilder()
Expand Down Expand Up @@ -127,14 +124,11 @@ public void Synchronize()
{
SyncInfo syncInfo = await GetSyncInfoAsync().ConfigureAwait(false);

FilterModel? lastIndexFilter = null;
FilterModel? lastIndexFilter;

lock (IndexLock)
{
if (Index.Count != 0)
{
lastIndexFilter = Index[^1];
}
lastIndexFilter = GetLastFilter();
}

uint currentHeight;
Expand Down Expand Up @@ -190,7 +184,7 @@ public void Synchronize()
{
Logger.LogWarning("Reorg observed on the network.");

await ReorgOneAsync().ConfigureAwait(false);
ReorgOne();

// Skip the current block.
continue;
Expand All @@ -201,11 +195,9 @@ public void Synchronize()
var smartHeader = new SmartHeader(block.Hash, block.PrevBlockHash, nextHeight, block.BlockTime);
var filterModel = new FilterModel(smartHeader, filter);

await File.AppendAllLinesAsync(IndexFilePath, new[] { filterModel.ToLine() }).ConfigureAwait(false);

lock (IndexLock)
{
Index = Index.Add(filterModel);
IndexStorage.TryAppend(filterModel);
}

// If not close to the tip, just log debug.
Expand Down Expand Up @@ -289,22 +281,15 @@ private static List<Script> FetchScripts(VerboseBlockInfo block, RpcPubkeyType[]
return scripts;
}

private async Task ReorgOneAsync()
private void ReorgOne()
{
// 1. Rollback index.
uint256 blockHash;

lock (IndexLock)
{
blockHash = Index[^1].Header.BlockHash;
Index = Index.RemoveAt(Index.Count - 1);
if(IndexStorage.TryRemoveLast(out var removedFilter))
{
Logger.LogInfo($"REORG invalid block: {removedFilter.Header.BlockHash}");
}
}

Logger.LogInfo($"REORG invalid block: {blockHash}");

// 2. Serialize Index. (Remove last line.)
var lines = await File.ReadAllLinesAsync(IndexFilePath).ConfigureAwait(false);
await File.WriteAllLinesAsync(IndexFilePath, lines.Take(lines.Length - 1).ToArray()).ConfigureAwait(false);
}

private async Task<SyncInfo> GetSyncInfoAsync()
Expand All @@ -329,50 +314,37 @@ private void BlockNotifier_OnBlock(object? sender, Block e)

public (Height bestHeight, IEnumerable<FilterModel> filters) GetFilterLinesExcluding(uint256 bestKnownBlockHash, int count, out bool found)
{
found = false; // Only build the filter list from when the known hash is found.
var filters = new List<FilterModel>();

ImmutableList<FilterModel> currentIndex;

lock (IndexLock)
{
currentIndex = Index;
}

if (currentIndex.Count == 0)
{
return (Height.Unknown, []);
}

// Search for bestKnownBlockHash from last to first
var i = currentIndex.Count - 1;
for (; i >= 0; i--)
{
if (!currentIndex[i].Header.BlockHash.Equals(bestKnownBlockHash))
var filterModels = IndexStorage.FetchNewerThanBlockHash(bestKnownBlockHash, count).ToList();
uint bestHeight;
if (filterModels.Count > 0)
{
continue;
bestHeight = (uint)IndexStorage.GetBestHeight();
found = true;
}
else
{
var lastFilter = GetLastFilter();
if (lastFilter is null)
{
found = false;
return (new Height(HeightType.Unknown), []);
}

found = true;
break;
}

if (found && i < currentIndex.Count - 1)
{
// Populate filters starting from the found index + 1
var startIndex = i + 1;
var filtersCount = Math.Min(count, currentIndex.Count - startIndex);
filters.AddRange(currentIndex.GetRange(startIndex, filtersCount));
found = lastFilter.Header.BlockHash == bestKnownBlockHash;
bestHeight = lastFilter.Header.Height;
}
return (new Height(bestHeight), filterModels);
}

return (new Height(currentIndex[^1].Header.Height), filters);
}

public FilterModel GetLastFilter()
public FilterModel? GetLastFilter()
{
lock (IndexLock)
{
return Index[^1];
var lastFilterList = IndexStorage.FetchLast(1).ToList();
return lastFilterList.Count == 0 ? null : lastFilterList[0];
}
}

Expand Down
Loading

0 comments on commit 1579876

Please sign in to comment.