Skip to content

Commit

Permalink
Use channels
Browse files Browse the repository at this point in the history
  • Loading branch information
deffrian committed Jan 2, 2025
1 parent 68e0aaa commit da337da
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public interface IP2PBlockValidator
{
ValidityStatus Validate(ExecutionPayloadV3 payload, P2PTopic topic);
ValidityStatus ValidateSignature(ReadOnlySpan<byte> payloadData, Span<byte> signature);
ValidityStatus IsBlockNumberPerHeightLimitReached(ExecutionPayloadV3 payload);
}

public enum ValidityStatus
Expand Down
54 changes: 43 additions & 11 deletions src/Nethermind/Nethermind.Optimism/CL/P2P/OptimismCLP2P.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Multiformats.Address;
Expand Down Expand Up @@ -40,6 +41,7 @@ public class OptimismCLP2P : IDisposable
private readonly Multiaddress[] _staticPeerList;
private readonly ICLConfig _config;
private ILocalPeer? _localPeer;
private readonly Task _mainLoopTask;

private readonly string _blocksV2TopicId;

Expand Down Expand Up @@ -67,38 +69,62 @@ public OptimismCLP2P(ulong chainId, string[] staticPeerList, ICLConfig config, A
})
.AddSingleton(new Settings())
.BuildServiceProvider();

_mainLoopTask = new(async () =>
{
await MainLoop();
});
}

private ulong _headPayloadNumber = 0;
private readonly SemaphoreSlim _semaphore = new(1);
private ulong _headPayloadNumber;
private readonly Channel<ExecutionPayloadV3> _blocksP2PMessageChannel = Channel.CreateBounded<ExecutionPayloadV3>(10); // for safety add capacity

private async void OnMessage(byte[] msg)
{
await _semaphore.WaitAsync();
try
{
if (TryValidateAndDecodePayload(msg, out var payload))
{
if (_logger.IsTrace) _logger.Trace($"Received payload prom p2p: {payload}");
await _blocksP2PMessageChannel.Writer.WriteAsync(payload, _cancellationTokenSource.Token);
}
}
catch (Exception e)
{
if (e is not OperationCanceledException && _logger.IsError) _logger.Error("Unhandled exception in Optimism CL P2P:", e);
}
}

private async Task MainLoop()
{
while (true)
{
try
{
ExecutionPayloadV3 payload =
await _blocksP2PMessageChannel.Reader.ReadAsync(_cancellationTokenSource.Token);

if (_headPayloadNumber >= (ulong)payload.BlockNumber)
{
// Old payload. skip
return;
}

if (_blockValidator.IsBlockNumberPerHeightLimitReached(payload) is not ValidityStatus.Valid)
{
return;
}

if (await SendNewPayloadToEL(payload) && await SendForkChoiceUpdatedToEL(payload.BlockHash))
{
_headPayloadNumber = (ulong)payload.BlockNumber;
}
}
}
catch (Exception e)
{
if (_logger.IsError) _logger.Error("Unhandled exception in Optimism CL P2P:", e);
}
finally
{
_semaphore.Release();
catch (Exception e)
{
if (_logger.IsError && e is not OperationCanceledException and not ChannelClosedException)
_logger.Error("Unhandled exception in Optimism CL P2P:", e);
}
}
}

Expand Down Expand Up @@ -150,6 +176,8 @@ private async Task<bool> SendNewPayloadToEL(ExecutionPayloadV3 executionPayload)
ResultWrapper<PayloadStatusV1> npResult = await _engineRpcModule.engine_newPayloadV3(executionPayload, Array.Empty<byte[]>(),
executionPayload.ParentBeaconBlockRoot);

_cancellationTokenSource.Token.ThrowIfCancellationRequested();

if (npResult.Result.ResultType == ResultType.Failure)
{
if (_logger.IsError)
Expand All @@ -174,6 +202,8 @@ private async Task<bool> SendForkChoiceUpdatedToEL(Hash256 headBlockHash)
new ForkchoiceStateV1(headBlockHash, headBlockHash, headBlockHash),
null);

_cancellationTokenSource.Token.ThrowIfCancellationRequested();

if (fcuResult.Result.ResultType == ResultType.Failure)
{
if (_logger.IsError)
Expand Down Expand Up @@ -213,6 +243,8 @@ public void Start()
PeerStore peerStore = _serviceProvider.GetService<PeerStore>()!;
peerStore.Discover(_staticPeerList);

_mainLoopTask.Start();

if (_logger.IsInfo) _logger.Info($"Started P2P: {_localPeer.Address}");
}

Expand Down
22 changes: 12 additions & 10 deletions src/Nethermind/Nethermind.Optimism/CL/P2P/P2PBlockValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Crypto;
Expand All @@ -16,7 +17,7 @@ public class P2PBlockValidator : IP2PBlockValidator
{
private readonly ILogger _logger;
private readonly ITimestamper _timestamper;
private readonly ConcurrentDictionary<long, long> _numberOfBlocksSeen = new();
private readonly Dictionary<long, long> _numberOfBlocksSeen = new();
private readonly Address _sequencerP2PAddress;
private readonly byte[] _chainId;

Expand All @@ -33,14 +34,23 @@ public ValidityStatus Validate(ExecutionPayloadV3 payload, P2PTopic topic)
{
if (!IsTopicValid(topic) || !IsTimestampValid(payload) || !IsBlockHashValid(payload) ||
!IsBlobGasUsedValid(payload, topic) || !IsExcessBlobGasValid(payload, topic) ||
!IsParentBeaconBlockRootValid(payload, topic) || IsBlockNumberPerHeightLimitReached(payload))
!IsParentBeaconBlockRootValid(payload, topic))
{
return ValidityStatus.Reject;
}

return ValidityStatus.Valid;
}

// This method is not thread safe
public ValidityStatus IsBlockNumberPerHeightLimitReached(ExecutionPayloadV3 payload)
{
// [REJECT] if more than 5 different blocks have been seen with the same block height
_numberOfBlocksSeen.TryGetValue(payload.BlockNumber, out var currentCount);
_numberOfBlocksSeen[payload.BlockNumber] = currentCount + 1;
return currentCount > 5 ? ValidityStatus.Reject : ValidityStatus.Valid;
}

public ValidityStatus ValidateSignature(ReadOnlySpan<byte> payloadData, Span<byte> signature)
{
return IsSignatureValid(payloadData, signature) ? ValidityStatus.Valid : ValidityStatus.Reject;
Expand Down Expand Up @@ -129,14 +139,6 @@ private bool IsParentBeaconBlockRootValid(ExecutionPayloadV3 payload, P2PTopic t
return true;
}

private bool IsBlockNumberPerHeightLimitReached(ExecutionPayloadV3 payload)
{
// [REJECT] if more than 5 different blocks have been seen with the same block height
long currentCount = _numberOfBlocksSeen.GetOrAdd(payload.BlockNumber, _ => 0);
_numberOfBlocksSeen[payload.BlockNumber] = currentCount + 1;
return currentCount > 5;
}

private bool IsSignatureValid(ReadOnlySpan<byte> payloadData, Span<byte> signature)
{
if (signature[64] > 3) return false;
Expand Down

0 comments on commit da337da

Please sign in to comment.