Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new methods to expose opIndexes for async replication. #97

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static void TestReverseIterator(
{
x += " ooops!";
return true;
}, out _);
});
}
maintainer.WaitForBackgroundThreads();
}
Expand Down
8 changes: 5 additions & 3 deletions src/ZoneTree.UnitTests/AtomicUpdateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode)
++y;
return true;
},
out _
(in int _, long _, OperationResult result) =>
{
}
);
Interlocked.Increment(ref off);
}
Expand Down Expand Up @@ -117,7 +119,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}

Expand Down Expand Up @@ -183,7 +185,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}

Expand Down
78 changes: 78 additions & 0 deletions src/ZoneTree.UnitTests/ReplicatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Tenray.ZoneTree.Core;

namespace Tenray.ZoneTree.UnitTests;

public sealed class ReplicatorTests
{
[Test]
public void TestReplicator()
{
var dataPath = "data/TestReplicator";
if (Directory.Exists(dataPath))
Directory.Delete(dataPath, true);
var recordCount = 50_000;
var keyCount = 15_000;
var maxMemory = 10_000;
void CreateData()
{
using var zoneTree = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/source")
.SetMutableSegmentMaxItemCount(maxMemory)
.OpenOrCreate();

using var replica = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/replica")
.SetMutableSegmentMaxItemCount(maxMemory)
.OpenOrCreate();

using var replicator = new Replicator<int, int>(replica, dataPath + "/replica-op-index");
using var maintainer1 = zoneTree.CreateMaintainer();
using var maintainer2 = replica.CreateMaintainer();
var random = new Random();
int replicated = 0;
Parallel.For(0, recordCount, (i) =>
{
var key = i % keyCount;
var value = random.Next();
var opIndex = zoneTree.Upsert(key, value);
Task.Run(() =>
{
replicator.OnUpsert(key, value, opIndex);
Interlocked.Increment(ref replicated);
});
});
while (replicated < recordCount) Task.Delay(500).Wait();
maintainer1.EvictToDisk();
maintainer2.EvictToDisk();
maintainer1.WaitForBackgroundThreads();
maintainer2.WaitForBackgroundThreads();
}

void TestEqual()
{
using var zoneTree = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/source")
.Open();

using var replica = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/replica")
.Open();

using var iterator1 = zoneTree.CreateIterator();
using var iterator2 = replica.CreateIterator();
while (true)
{
var n1 = iterator1.Next();
var n2 = iterator2.Next();
Assert.That(n2, Is.EqualTo(n1));
if (!n1) break;
Assert.That(iterator2.Current, Is.EqualTo(iterator1.Current));
}
zoneTree.Maintenance.Drop();
replica.Maintenance.Drop();
}

CreateData();
TestEqual();
}
}
2 changes: 1 addition & 1 deletion src/ZoneTree.UnitTests/StringTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void HelloWorldTest2()
{
x += "b";
return true;
}, out _);
});
zoneTree.TryGet(39, out value);
Assert.That(value, Is.EqualTo("Hello Zone Tree!b"));
}
Expand Down
6 changes: 1 addition & 5 deletions src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public sealed class NodeIterator

public TValue[] Values { get; }

public long[] OpIndexes { get; }

public TKey CurrentKey => Keys[CurrentIndex];

public TValue CurrentValue => Values[CurrentIndex];
Expand All @@ -28,14 +26,12 @@ public NodeIterator(
BTree<TKey, TValue> tree,
LeafNode leafNode,
TKey[] keys,
TValue[] values,
long[] opIndexes = null)
TValue[] values)
{
Tree = tree;
Node = leafNode;
Keys = keys;
Values = values;
OpIndexes = opIndexes;
}

public NodeIterator GetPreviousNodeIterator()
Expand Down
101 changes: 101 additions & 0 deletions src/ZoneTree/Collections/BTree/BTree.Write.OpIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using Tenray.ZoneTree.Exceptions;

namespace Tenray.ZoneTree.Collections.BTree;

public delegate TValue GetValueDelegate<TKey, TValue>(long opIndex);

/// <summary>
/// In memory B+Tree.
/// This class is thread-safe.
/// </summary>
/// <typeparam name="TKey">Key Type</typeparam>
/// <typeparam name="TValue">Value Type</typeparam>
public sealed partial class BTree<TKey, TValue>
{
public bool Upsert(in TKey key, GetValueDelegate<TKey, TValue> valueGetter, out TValue value, out long opIndex)
{
if (IsReadOnly)
throw new BTreeIsReadOnlyException();
try
{
WriteLock();
while (true)
{
var root = Root;
root.WriteLock();
if (root != Root)
{
root.WriteUnlock();
continue;
}

if (!root.IsFull)
{
return UpsertNonFull(root, in key, valueGetter, out value, out opIndex);
}
var newRoot = new Node(GetNodeLocker(), NodeSize);
newRoot.Children[0] = root;
newRoot.WriteLock();
TrySplitChild(newRoot, 0, root);
var result = UpsertNonFull(newRoot, in key, valueGetter, out value, out opIndex);
Root = newRoot;
root.WriteUnlock();
return result;
}
}
catch (Exception)
{
Root.WriteUnlock();
throw;
}
finally
{
WriteUnlock();
}
}

bool UpsertNonFull(Node node, in TKey key, GetValueDelegate<TKey, TValue> valueGetter, out TValue value, out long opIndex)
{
while (true)
{
var found = node.TryGetPosition(Comparer, in key, out var position);
if (node is LeafNode leaf)
{
opIndex = OpIndexProvider.NextId();
if (found)
{
value = valueGetter(opIndex);
leaf.Update(position, in key, value);
node.WriteUnlock();
return false;
}
value = valueGetter(opIndex);
leaf.Insert(position, in key, value);
Interlocked.Increment(ref _length);
node.WriteUnlock();
return true;
}
if (found)
++position;
var child = node.Children[position];
child.WriteLock();
if (child.IsFull)
{
var splitted = TrySplitChild(node, position, child);
child.WriteUnlock();
if (!splitted)
{
continue;
}

if (Comparer.Compare(in key, in node.Keys[position]) >= 0)
++position;

child = node.Children[position];
child.WriteLock();
}
node.WriteUnlock();
node = child;
}
}
}
63 changes: 63 additions & 0 deletions src/ZoneTree/Core/Replicator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace Tenray.ZoneTree.Core;

using System.Collections.Concurrent;
using System.Collections.Generic;
using Tenray.ZoneTree.Collections;

public sealed class Replicator<TKey, TValue> : IDisposable
{
readonly IZoneTree<TKey, TValue> Replica;

readonly IZoneTree<TKey, long> LatestOpIndexes;

readonly IMaintainer Maintainer;

bool isDisposed;

public Replicator(
IZoneTree<TKey, TValue> replica,
string dataPath,
Action<ZoneTreeFactory<TKey, long>> configure = null)
{
this.Replica = replica;
var factory = new ZoneTreeFactory<TKey, long>()
.SetDataDirectory(dataPath);
if (configure != null) configure(factory);
LatestOpIndexes = factory.OpenOrCreate();
Maintainer = LatestOpIndexes.CreateMaintainer();
Maintainer.EnableJobForCleaningInactiveCaches = true;
}

public void OnUpsert(TKey key, TValue value, long opIndex)
{
LatestOpIndexes.TryAtomicAddOrUpdate(
key,
(ref long newOpIndex) =>
{
newOpIndex = opIndex;
return true;
},
(ref long existingOpIndex) =>
{
if (opIndex < existingOpIndex)
return false;
existingOpIndex = opIndex;
return true;
},
(in long _, long _, OperationResult result) =>
{
if (result == OperationResult.Cancelled) return;
Replica.Upsert(key, value);
});
}

public void Dispose()
{
if (isDisposed) return;
isDisposed = true;
Maintainer.EvictToDisk();
Maintainer.WaitForBackgroundThreads();
Maintainer.Dispose();
LatestOpIndexes.Dispose();
}
}
Loading
Loading