Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple block and blobs publishing\import #8728

Merged
merged 9 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,58 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin;
import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;
import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED;

import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.api.ChainDataProvider;
import tech.pegasys.teku.api.NetworkDataProvider;
import tech.pegasys.teku.api.NodeDataProvider;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.beacon.sync.events.SyncStateProvider;
import tech.pegasys.teku.beacon.sync.events.SyncStateTracker;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionAndPublishingPerformanceFactory;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricUtils;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.time.SystemTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.P2PConfig;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber;
import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb;
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
Expand All @@ -58,16 +76,19 @@
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker;
import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher;

@TestSpecContext(milestone = {SpecMilestone.PHASE0, SpecMilestone.DENEB})
public class ValidatorApiHandlerIntegrationTest {
private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

// Use full storage system
private final StorageSystem storageSystem =
InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE);
private final CombinedChainDataClient combinedChainDataClient =
storageSystem.combinedChainDataClient();
private final Spec spec = TestSpecFactory.createMinimalPhase0();

// Other dependencies are mocked, but these can be updated as needed
private final SyncStateProvider syncStateProvider = mock(SyncStateTracker.class);
Expand Down Expand Up @@ -100,45 +121,93 @@ public class ValidatorApiHandlerIntegrationTest {
mock(SyncCommitteeSubscriptionManager.class);

private final DutyMetrics dutyMetrics = mock(DutyMetrics.class);
private final ValidatorApiHandler handler =
new ValidatorApiHandler(
chainDataProvider,
nodeDataProvider,
networkDataProvider,
combinedChainDataClient,
syncStateProvider,
blockFactory,
blockImportChannel,
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
attestationPool,
attestationManager,
attestationTopicSubscriber,
activeValidatorTracker,
dutyMetrics,
performanceTracker,
spec,
forkChoiceTrigger,
proposersDataManager,
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0));

private ValidatorApiHandler handler;

@BeforeEach
public void setup() {
public void setup(final SpecContext specContext) {
when(syncStateProvider.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);
when(forkChoiceTrigger.prepareForAttestationProduction(any())).thenReturn(SafeFuture.COMPLETE);
when(dutyMetrics.getValidatorDutyMetric())
.thenReturn(ValidatorDutyMetricUtils.createValidatorDutyMetric(new StubMetricsSystem()));

when(blockGossipChannel.publishBlock(any())).thenReturn(SafeFuture.COMPLETE);
when(blobSidecarGossipChannel.publishBlobSidecar(any())).thenReturn(SafeFuture.COMPLETE);
when(blobSidecarGossipChannel.publishBlobSidecars(any())).thenReturn(SafeFuture.COMPLETE);

doAnswer(invocation -> SafeFuture.completedFuture(invocation.getArgument(0)))
.when(blockFactory)
.unblindSignedBlockIfBlinded(any(), any());

// BlobSidecar builder
doAnswer(
invocation -> {
final SignedBlockContainer blockContainer = invocation.getArgument(0);
final SpecVersion asspecVersion =
specContext.getSpec().forMilestone(SpecMilestone.DENEB);
if (asspecVersion == null) {
return List.of();
}
final MiscHelpersDeneb miscHelpersDeneb =
MiscHelpersDeneb.required(asspecVersion.miscHelpers());
if (blockContainer.getBlobs().isEmpty()) {
return List.of();
}
final SszList<Blob> blobs = blockContainer.getBlobs().orElseThrow();
final SszList<SszKZGProof> proofs = blockContainer.getKzgProofs().orElseThrow();
return IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
blockContainer.getSignedBlock(),
UInt64.valueOf(index),
blobs.get(index),
proofs.get(index)))
.toList();
})
.when(blockFactory)
.createBlobSidecars(any());

handler =
new ValidatorApiHandler(
chainDataProvider,
nodeDataProvider,
networkDataProvider,
combinedChainDataClient,
syncStateProvider,
blockFactory,
attestationPool,
attestationManager,
attestationTopicSubscriber,
activeValidatorTracker,
dutyMetrics,
performanceTracker,
specContext.getSpec(),
forkChoiceTrigger,
proposersDataManager,
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0),
new MilestoneBasedBlockPublisher(
asyncRunner,
specContext.getSpec(),
blockFactory,
blockImportChannel,
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics,
P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED));
}

@Test
public void createAttestationData_withRecentBlockAvailable() {
@TestTemplate
public void createAttestationData_withRecentBlockAvailable(final SpecContext specContext) {
specContext.assumeIsNotOneOf(SpecMilestone.DENEB);
final UInt64 targetEpoch = UInt64.valueOf(3);
final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetSlot = targetEpochStartSlot.plus(2);

final SignedBlockAndState genesis = chainUpdater.initializeGenesis();
Expand Down Expand Up @@ -167,12 +236,14 @@ public void createAttestationData_withRecentBlockAvailable() {
assertThat(attestation.getTarget()).isEqualTo(expectedTarget);
}

@Test
public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() {
@TestTemplate
public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch(
final SpecContext specContext) {
specContext.assumeIsNotOneOf(SpecMilestone.DENEB);
final UInt64 latestEpoch = UInt64.valueOf(2);
final UInt64 latestSlot = spec.computeStartSlotAtEpoch(latestEpoch).plus(ONE);
final UInt64 latestSlot = specContext.getSpec().computeStartSlotAtEpoch(latestEpoch).plus(ONE);
final UInt64 targetEpoch = UInt64.valueOf(latestEpoch.longValue() + 3);
final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetSlot = targetEpochStartSlot.plus(2);

final SignedBlockAndState genesis = chainUpdater.initializeGenesis();
Expand All @@ -196,4 +267,27 @@ public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() {
assertThat(attestation.getSource()).isEqualTo(genesisCheckpoint);
assertThat(attestation.getTarget()).isEqualTo(expectedTarget);
}

@TestTemplate
void sendSignedBlock_shouldImportAndPublishBlock(final SpecContext specContext) {
final SignedBeaconBlock block = specContext.getDataStructureUtil().randomSignedBeaconBlock(5);

when(blockImportChannel.importBlock(block, NOT_REQUIRED))
.thenReturn(prepareBlockImportResult(BlockImportResult.successful(block)));
final SafeFuture<SendSignedBlockResult> result = handler.sendSignedBlock(block, NOT_REQUIRED);
assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot()));

if (specContext.getSpecMilestone() == SpecMilestone.DENEB) {
verify(blobSidecarGossipChannel).publishBlobSidecars(any());
}
verify(blockGossipChannel).publishBlock(block);
verify(blockImportChannel).importBlock(block, NOT_REQUIRED);
}

private SafeFuture<BlockImportAndBroadcastValidationResults> prepareBlockImportResult(
final BlockImportResult blockImportResult) {
return SafeFuture.completedFuture(
new BlockImportAndBroadcastValidationResults(
SafeFuture.completedFuture(blockImportResult)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
SignedBeaconBlock maybeBlindedBlock, BlockPublishingPerformance blockPublishingPerformance);

List<BlobSidecar> createBlobSidecars(
SignedBlockContainer blockContainer, BlockPublishingPerformance blockPublishingPerformance);
List<BlobSidecar> createBlobSidecars(SignedBlockContainer blockContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -69,12 +68,8 @@ public SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
return operationSelector
.createBlobSidecarsSelector(blockPublishingPerformance)
.apply(blockContainer);
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
return operationSelector.createBlobSidecarsSelector().apply(blockContainer);
}

private BlockContents createBlockContents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ public Function<BeaconBlock, SafeFuture<BlobsBundle>> createBlobsBundleSelector(
};
}

public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector(
final BlockPublishingPerformance blockPublishingPerformance) {
public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector() {
return blockContainer -> {
final UInt64 slot = blockContainer.getSlot();
final SignedBeaconBlock block = blockContainer.getSignedBlock();
Expand Down Expand Up @@ -505,17 +504,12 @@ public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelec
final MiscHelpersDeneb miscHelpersDeneb =
MiscHelpersDeneb.required(spec.atSlot(slot).miscHelpers());

final List<BlobSidecar> blobSidecars =
IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();

blockPublishingPerformance.blobSidecarsPrepared();

return blobSidecars;
return IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,9 @@ public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
final SpecMilestone milestone = getMilestone(blockContainer.getSlot());
return registeredFactories
.get(milestone)
.createBlobSidecars(blockContainer, blockPublishingPerformance);
return registeredFactories.get(milestone).createBlobSidecars(blockContainer);
}

private SpecMilestone getMilestone(final UInt64 slot) {
Expand Down
Loading