Skip to content

Commit

Permalink
limit to one producer for megabundles
Browse files Browse the repository at this point in the history
  • Loading branch information
avalonche committed Sep 22, 2021
1 parent 373e49f commit 3388ba0
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 146 deletions.
32 changes: 0 additions & 32 deletions src/Nethermind/Nethermind.Mev/Data/MegabundleEventArgs.cs

This file was deleted.

15 changes: 9 additions & 6 deletions src/Nethermind/Nethermind.Mev/Data/MevMegabundle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
//

using System;
using System.Collections.Generic;
using Nethermind.Core.Crypto;
using Nethermind.Int256;

namespace Nethermind.Mev.Data
{
public class MevMegabundle: IEquatable<MevMegabundle>
public class MevMegabundle: MevBundle, IEquatable<MevMegabundle>
{
public MevMegabundle(MevBundle bundle, Signature relaySignature)
public MevMegabundle(Signature relaySignature, long blockNumber, IReadOnlyList<BundleTransaction> transactions, UInt256? minTimestamp = null, UInt256? maxTimestamp = null)
: base(blockNumber, transactions, minTimestamp, maxTimestamp)
{
Bundle = bundle;
RelaySignature = relaySignature;
}

public MevBundle Bundle { get; }
public Signature RelaySignature { get; }

public bool Equals(MevMegabundle? other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Equals(Bundle, other.Bundle)
return Equals(Hash, other.Hash)
&& Equals(RelaySignature, other.RelaySignature);
}

public override string ToString() => $"Hash:{Hash}; Block:{BlockNumber}; Min:{MinTimestamp}; Max:{MaxTimestamp}; TxCount:{Transactions.Count}; RelaySignature:{RelaySignature};";
}
}
35 changes: 6 additions & 29 deletions src/Nethermind/Nethermind.Mev/MevPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ public async Task<IBlockProducer> InitBlockProducer(IConsensusPlugin consensusPl
}

_nethermindApi.BlockProducerEnvFactory.TransactionsExecutorFactory = new MevBlockProducerTransactionsExecutorFactory(_nethermindApi.SpecProvider!, _nethermindApi.LogManager);


int megabundleProducerCount = _mevConfig.GetTrustedRelayAddresses().Any() ? 1 : 0;
List<MevBlockProducer.MevBlockProducerInfo> blockProducers =
new(_mevConfig.MaxMergedBundles + _mevConfig.GetTrustedRelayAddresses().Count() + 1);
new(_mevConfig.MaxMergedBundles + megabundleProducerCount + 1);

// Add non-mev block
MevBlockProducer.MevBlockProducerInfo standardProducer = await CreateProducer(consensusPlugin);
Expand All @@ -172,10 +173,10 @@ public async Task<IBlockProducer> InitBlockProducer(IConsensusPlugin consensusPl
blockProducers.Add(bundleProducer);
}

foreach (Address address in _mevConfig.GetTrustedRelayAddresses())
if (megabundleProducerCount > 0)
{
MegabundleSelector megabundleSelector = new(BundlePool, address);
MevBlockProducer.MevBlockProducerInfo bundleProducer = await CreateProducer(consensusPlugin, address, new BundleTxSource(megabundleSelector, _nethermindApi.Timestamper));
MegabundleSelector megabundleSelector = new(BundlePool);
MevBlockProducer.MevBlockProducerInfo bundleProducer = await CreateProducer(consensusPlugin, 0, new BundleTxSource(megabundleSelector, _nethermindApi.Timestamper));
blockProducers.Add(bundleProducer);
}

Expand Down Expand Up @@ -210,30 +211,6 @@ bool BundleLimitTriggerCondition(BlockProductionEventArgs e)
return new MevBlockProducer.MevBlockProducerInfo(producer, manualTrigger, new BeneficiaryTracer());
}

private async Task<MevBlockProducer.MevBlockProducerInfo> CreateProducer(
IConsensusPlugin consensusPlugin,
Address relayAddress,
ITxSource? additionalTxSource = null)
{
bool MegabundleTriggerCondition(BlockProductionEventArgs e)
{
BlockHeader? parent = _nethermindApi.BlockTree!.GetProducedBlockParent(e.ParentHeader);
if (parent is not null)
{
MevBundle? bundle = BundlePool.GetMegabundle(parent, _nethermindApi.Timestamper, relayAddress);
return bundle is not null;
}

return false;
}

IManualBlockProductionTrigger manualTrigger = new BuildBlocksWhenRequested();
IBlockProductionTrigger trigger = new TriggerWithCondition(manualTrigger, MegabundleTriggerCondition);

IBlockProducer producer = await consensusPlugin.InitBlockProducer(trigger, additionalTxSource);
return new MevBlockProducer.MevBlockProducerInfo(producer, manualTrigger, new BeneficiaryTracer());
}

public bool Enabled => _mevConfig.Enabled;

public ValueTask DisposeAsync() => ValueTask.CompletedTask;
Expand Down
3 changes: 1 addition & 2 deletions src/Nethermind/Nethermind.Mev/MevRpcModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public ResultWrapper<bool> eth_sendBundle(MevBundleRpc mevBundleRpc)
public ResultWrapper<bool> eth_sendMegabundle(MevMegabundleRpc mevMegabundleRpc)
{
BundleTransaction[] txs = Decode(mevMegabundleRpc.Txs, mevMegabundleRpc.RevertingTxHashes?.ToHashSet());
MevBundle bundle = new(mevMegabundleRpc.BlockNumber, txs, mevMegabundleRpc.MinTimestamp, mevMegabundleRpc.MaxTimestamp);
MevMegabundle megabundle = new(bundle, mevMegabundleRpc.RelaySignature!);
MevMegabundle megabundle = new(mevMegabundleRpc.RelaySignature!, mevMegabundleRpc.BlockNumber, txs, mevMegabundleRpc.MinTimestamp, mevMegabundleRpc.MaxTimestamp);
bool result = _bundlePool.AddMegabundle(megabundle);
return ResultWrapper<bool>.Success(result);
}
Expand Down
116 changes: 50 additions & 66 deletions src/Nethermind/Nethermind.Mev/Source/BundlePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class BundlePool : IBundlePool, ISimulatedBundleSource, IDisposable

public event EventHandler<BundleEventArgs>? NewReceivedBundle;
public event EventHandler<BundleEventArgs>? NewPendingBundle;
public event EventHandler<MegabundleEventArgs>? NewReceivedMegabundle;
public event EventHandler<MegabundleEventArgs>? NewPendingMegabundle;

public BundlePool(
IBlockTree blockTree,
Expand Down Expand Up @@ -85,7 +83,6 @@ public BundlePool(

_bundles.Removed += OnBundleRemoved;
_blockTree.NewHeadBlock += OnNewBlock;
NewPendingMegabundle += OnNewPendingMegabundle;
}

public Task<IEnumerable<MevBundle>> GetBundles(BlockHeader parent, UInt256 timestamp, long gasLimit, CancellationToken token = default) =>
Expand All @@ -104,44 +101,37 @@ private IEnumerable<MevBundle> GetBundles(long blockNumber, UInt256 minTimestamp
{
break;
}

bool bundleIsInFuture = mevBundle.MinTimestamp != UInt256.Zero && minTimestamp < mevBundle.MinTimestamp;
bool bundleIsTooOld = mevBundle.MaxTimestamp != UInt256.Zero && maxTimestamp > mevBundle.MaxTimestamp;
if (!bundleIsInFuture && !bundleIsTooOld)

if (BundleInTimestampRange(mevBundle, minTimestamp, maxTimestamp))
{
yield return mevBundle;
}
}
}
}

public Task<MevBundle?> GetMegabundle(BlockHeader parent, UInt256 timestamp, long gasLimit,
Address relayAddress, CancellationToken token = default) =>
Task.FromResult(GetMegabundle(parent.Number + 1, timestamp, relayAddress, token));
public Task<IEnumerable<MevBundle>> GetMegabundles(BlockHeader parent, UInt256 timestamp, long gasLimit, CancellationToken token = default) =>
Task.FromResult(GetMegabundles(parent.Number + 1, timestamp, token));

public MevBundle? GetMegabundle(long blockNumber, UInt256 timestamp, Address relayAddress, CancellationToken token = default) =>
GetMegabundle(blockNumber, timestamp, timestamp, relayAddress, token);
public IEnumerable<MevBundle> GetMegabundles(long blockNumber, UInt256 timestamp, CancellationToken token = default) =>
GetMegabundles(blockNumber, timestamp, timestamp, token);

private MevBundle? GetMegabundle(long blockNumber, UInt256 minTimestamp, UInt256 maxTimestamp, Address relayAddress,
private IEnumerable<MevBundle> GetMegabundles(long blockNumber, UInt256 minTimestamp, UInt256 maxTimestamp,
CancellationToken token = default)
{
if (_megabundles.TryGetValue(relayAddress, out MevBundle? bundle))
foreach (var bundle in _megabundles.Values)
{
if (token.IsCancellationRequested)
{
return null;
break;
}

bool bundleIsInFuture = bundle.MinTimestamp != UInt256.Zero && minTimestamp < bundle.MinTimestamp;
bool bundleIsTooOld = bundle.MaxTimestamp != UInt256.Zero && maxTimestamp > bundle.MaxTimestamp;
bool bundleIsInCurrentBlock = bundle.BlockNumber == blockNumber;
if (!bundleIsInFuture && !bundleIsTooOld && bundleIsInCurrentBlock)
if (BundleInTimestampRange(bundle, minTimestamp, maxTimestamp) && bundleIsInCurrentBlock)
{
return bundle;
yield return bundle;
}
}

return null;
}

public bool AddBundle(MevBundle bundle)
Expand Down Expand Up @@ -173,28 +163,42 @@ public bool AddBundle(MevBundle bundle)
public bool AddMegabundle(MevMegabundle megabundle)
{
Metrics.MegabundlesReceived++;
Address relayAddress = _ecdsa.RecoverAddress(megabundle.RelaySignature, megabundle.Bundle.Hash)!;
MegabundleEventArgs megabundleEventArgs = new(megabundle.Bundle, relayAddress);
NewReceivedMegabundle?.Invoke(this, megabundleEventArgs);
BundleEventArgs bundleEventArgs = new(megabundle);
NewReceivedBundle?.Invoke(this, bundleEventArgs);

if (ValidateBundle(megabundle.Bundle) && IsTrustedRelay(relayAddress))
if (ValidateBundle(megabundle))
{
_megabundles.AddOrUpdate(relayAddress,
_ => megabundle.Bundle,
(_, _) => megabundle.Bundle);
Metrics.ValidMegabundlesReceived++;
NewPendingMegabundle?.Invoke(this, megabundleEventArgs);

if (megabundle.Bundle.BlockNumber == HeadNumber + 1)
{
TrySimulateBundle(megabundle.Bundle);
Address relayAddress = _ecdsa.RecoverAddress(megabundle.RelaySignature, megabundle.Hash)!;
if (IsTrustedRelay(relayAddress))
{
Metrics.ValidMegabundlesReceived++;
NewPendingBundle?.Invoke(this, bundleEventArgs);

_megabundles.AddOrUpdate(relayAddress,
_ => megabundle,
(_, _) =>
{
RemoveSimulation(megabundle);
return megabundle;
});

if (megabundle.BlockNumber == HeadNumber + 1)
{
TrySimulateBundle(megabundle);
}
}

return true;
}

return false;
}

private bool BundleInTimestampRange(MevBundle bundle, UInt256 minTimestamp, UInt256 maxTimestamp)
{
bool bundleIsInFuture = bundle.MinTimestamp != UInt256.Zero && minTimestamp < bundle.MinTimestamp;
bool bundleIsTooOld = bundle.MaxTimestamp != UInt256.Zero && maxTimestamp > bundle.MaxTimestamp;
return !bundleIsInFuture && !bundleIsTooOld;
}

private bool IsTrustedRelay(Address relayAddress)
{
Expand Down Expand Up @@ -391,16 +395,8 @@ private void StopSimulation(SimulatedMevBundleContext simulation)
}
}

private void OnNewPendingMegabundle(object? sender, MegabundleEventArgs e)
{
_megabundles.TryRemove(e.RelayAddress, out _);
RemoveSimulation(e.MevBundle);
}

async Task<IEnumerable<SimulatedMevBundle>> ISimulatedBundleSource.GetBundles(BlockHeader parent, UInt256 timestamp, long gasLimit, CancellationToken token)
private async Task<IEnumerable<SimulatedMevBundle>> GetSimulatedBundles(HashSet<MevBundle> bundles, BlockHeader parent, UInt256 timestamp, long gasLimit, CancellationToken token)
{
HashSet<MevBundle> bundles = (await GetBundles(parent, timestamp, gasLimit, token)).ToHashSet();

if (_simulatedBundles.TryGetValue(parent.Number, out ConcurrentDictionary<(MevBundle Bundle, Keccak BlockHash), SimulatedMevBundleContext>? simulatedBundlesForBlock))
{
IEnumerable<Task<SimulatedMevBundle>> resultTasks = simulatedBundlesForBlock
Expand All @@ -421,36 +417,24 @@ async Task<IEnumerable<SimulatedMevBundle>> ISimulatedBundleSource.GetBundles(Bl
}
return (Enumerable.Empty<SimulatedMevBundle>());
}

async Task<SimulatedMevBundle?> ISimulatedBundleSource.GetMegabundle(BlockHeader parent, UInt256 timestamp,
long gasLimit, Address relayAddress, CancellationToken token)

async Task<IEnumerable<SimulatedMevBundle>> ISimulatedBundleSource.GetBundles(BlockHeader parent, UInt256 timestamp, long gasLimit, CancellationToken token)
{
MevBundle? bundle = await GetMegabundle(parent, timestamp, gasLimit, relayAddress, token);

if (bundle is not null && _simulatedBundles.TryGetValue(parent.Number, out ConcurrentDictionary<(MevBundle Bundle, Keccak BlockHash), SimulatedMevBundleContext>? simulatedBundlesForBlock))
{
if (simulatedBundlesForBlock.TryGetValue((bundle, parent.Hash!),
out SimulatedMevBundleContext? simulatedBundle))
{
Task<SimulatedMevBundle> resultTask = simulatedBundle.Task;
await Task.WhenAny(resultTask, token.AsTask());
HashSet<MevBundle> bundles = (await GetBundles(parent, timestamp, gasLimit, token)).ToHashSet();
return await GetSimulatedBundles(bundles, parent, timestamp, gasLimit, token);
}

bool success = resultTask.IsCompletedSuccessfully && resultTask.Result.Success;
bool withinGasLimit = resultTask.Result.GasUsed <= gasLimit;
if (success && withinGasLimit)
{
return resultTask.Result;
}
}
}
return null;
async Task<IEnumerable<SimulatedMevBundle>> ISimulatedBundleSource.GetMegabundles(BlockHeader parent, UInt256 timestamp,
long gasLimit, CancellationToken token)
{
HashSet<MevBundle> bundles = (await GetMegabundles(parent, timestamp, gasLimit, token)).ToHashSet();
return await GetSimulatedBundles(bundles, parent, timestamp, gasLimit, token);
}

public void Dispose()
{
_blockTree.NewHeadBlock -= OnNewBlock;
_bundles.Removed -= OnBundleRemoved;
NewPendingMegabundle -= OnNewPendingMegabundle;
}

protected class SimulatedMevBundleContext : IDisposable
Expand Down
8 changes: 3 additions & 5 deletions src/Nethermind/Nethermind.Mev/Source/IBundlePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,18 @@ public interface IBundlePool : IBundleSource
{
event EventHandler<BundleEventArgs> NewReceivedBundle;
event EventHandler<BundleEventArgs> NewPendingBundle;
event EventHandler<MegabundleEventArgs> NewReceivedMegabundle;
event EventHandler<MegabundleEventArgs> NewPendingMegabundle;
bool AddBundle(MevBundle bundle);
bool AddMegabundle(MevMegabundle megabundle);
IEnumerable<MevBundle> GetBundles(long block, UInt256 timestamp, CancellationToken token = default);
MevBundle? GetMegabundle(long block, UInt256 timestamp, Address relayAddress, CancellationToken token = default);
IEnumerable<MevBundle> GetMegabundles(long block, UInt256 timestamp, CancellationToken token = default);
}

public static class BundlePoolExtensions
{
public static IEnumerable<MevBundle> GetBundles(this IBundlePool bundleSource, BlockHeader parent, ITimestamper timestamper, CancellationToken token = default) =>
bundleSource.GetBundles(parent.Number + 1, timestamper.UnixTime.Seconds, token);

public static MevBundle? GetMegabundle(this IBundlePool bundleSource, BlockHeader parent, ITimestamper timestamper, Address relayAddress, CancellationToken token = default) =>
bundleSource.GetMegabundle(parent.Number + 1, timestamper.UnixTime.Seconds, relayAddress, token);
public static IEnumerable<MevBundle> GetMegabundles(this IBundlePool bundleSource, BlockHeader parent, ITimestamper timestamper, CancellationToken token = default) =>
bundleSource.GetMegabundles(parent.Number + 1, timestamper.UnixTime.Seconds, token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface ISimulatedBundleSource
Task<IEnumerable<SimulatedMevBundle>> GetBundles(BlockHeader parent, UInt256 timestamp, long gasLimit,
CancellationToken token = default);

Task<SimulatedMevBundle?> GetMegabundle(BlockHeader parent, UInt256 timestamp, long gasLimit, Address relayAddress,
Task<IEnumerable<SimulatedMevBundle>> GetMegabundles(BlockHeader parent, UInt256 timestamp, long gasLimit,
CancellationToken token = default);
}
}
12 changes: 7 additions & 5 deletions src/Nethermind/Nethermind.Mev/Source/MegabundleSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ namespace Nethermind.Mev.Source
public class MegabundleSelector : IBundleSource
{
private readonly ISimulatedBundleSource _simulatedBundleSource;
private readonly Address _relayAddress;

public MegabundleSelector(ISimulatedBundleSource simulatedBundleSource, Address relayAddress)
public MegabundleSelector(ISimulatedBundleSource simulatedBundleSource)
{
_simulatedBundleSource = simulatedBundleSource;
_relayAddress = relayAddress;
}

public async Task<IEnumerable<MevBundle>> GetBundles(BlockHeader parent, UInt256 timestamp, long gasLimit,
CancellationToken token = default)
{
SimulatedMevBundle? simulatedBundle = await _simulatedBundleSource.GetMegabundle(parent, timestamp, gasLimit, _relayAddress, token);
return simulatedBundle is null ? Enumerable.Empty<MevBundle>() : new [] {simulatedBundle.Bundle};
IEnumerable<SimulatedMevBundle> simulatedBundles = await _simulatedBundleSource.GetMegabundles(parent, timestamp, gasLimit, token);
return simulatedBundles
.OrderByDescending(bundle => bundle.BundleAdjustedGasPrice)
.ThenBy(bundle => bundle.Bundle.SequenceNumber)
.Take(1)
.Select(s => s.Bundle);
}
}
}

0 comments on commit 3388ba0

Please sign in to comment.