Skip to content

Commit

Permalink
Allow local operations to republish on the network periodically (#7059)
Browse files Browse the repository at this point in the history
* Allow local operations to republish on the network periodically

 - If a local operation has been in the pool for 2 hours, socialise the operation again, assuming it's been lost from pools. Any operations using the mapped pool will start using this functionality.
 - reduce the BLS operation pool size to default (10k)

 Re-submission of local operations would still error as in the ticket, but I think that's correct as we can't re-add them. This will at least allow that stale operation to be re-published, which is the intent of re-running the command.

 fixes #6803

Signed-off-by: Paul Harris <paul.harris@consensys.net>
  • Loading branch information
rolfyone authored Apr 19, 2023
1 parent 72d8467 commit 67df8e3
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ For information on changes in released versions of Teku, see the [releases page]
- When failovers are configured, the validator client will perform a readiness check on startup to avoid retrieving validator statuses from a node which is not ready.
- Enabled deposit tree snapshot bundles for major networks and persists it after finalization to decrease EL pressure and speed up node startup. Use `--deposit-snapshot-enabled=false` to disable.
- Optimized validator exit processing during state transition, to speed up block import containing multiple validator exits.
- Locally submitted exits and bls changes will now periodically broadcast if they are not actioned, to address operations being lost in remote pools.

### Bug Fixes
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
import tech.pegasys.teku.bls.BLSKeyGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.SyncAsyncRunner;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.time.SystemTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork;
import tech.pegasys.teku.provider.JsonProvider;
Expand Down Expand Up @@ -112,6 +113,8 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {

protected ActiveValidatorChannel activeValidatorChannel;

protected StubAsyncRunner asyncRunner = new StubAsyncRunner();

// Mocks
protected final Eth2P2PNetwork eth2P2PNetwork = mock(Eth2P2PNetwork.class);
protected final SyncService syncService = mock(SyncService.class);
Expand Down Expand Up @@ -204,7 +207,9 @@ private void setupStorage(
.andThen(BeaconBlockBodySchema::toVersionCapella)
.andThen(Optional::orElseThrow)
.andThen(BeaconBlockBodySchemaCapella::getBlsToExecutionChangesSchema),
validator);
validator,
asyncRunner,
new SystemTimeProvider());
}

private void setupAndStartRestAPI(BeaconRestApiConfig config) {
Expand Down Expand Up @@ -235,7 +240,7 @@ private void setupAndStartRestAPI(BeaconRestApiConfig config) {
eth1DataProvider,
config,
eventChannels,
SyncAsyncRunner.SYNC_RUNNER,
asyncRunner,
StubTimeProvider.withTimeInMillis(1000),
executionClientDataProvider,
spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.statetransition.validation.ValidationResultCode.IGNORE;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -29,13 +30,15 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.ssz.SszCollection;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.schema.SszListSchema;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.operations.MessageWithValidatorId;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
Expand All @@ -54,16 +57,22 @@ public class MappedOperationPool<T extends MessageWithValidatorId> implements Op

private final String metricType;

private final TimeProvider timeProvider;

public MappedOperationPool(
final String metricType,
final MetricsSystem metricsSystem,
final Function<UInt64, SszListSchema<T, ?>> slotToSszListSchemaSupplier,
final OperationValidator<T> operationValidator) {
final OperationValidator<T> operationValidator,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider) {
this(
metricType,
metricsSystem,
slotToSszListSchemaSupplier,
operationValidator,
asyncRunner,
timeProvider,
DEFAULT_OPERATION_POOL_SIZE);
}

Expand All @@ -72,6 +81,8 @@ public MappedOperationPool(
final MetricsSystem metricsSystem,
final Function<UInt64, SszListSchema<T, ?>> slotToSszListSchemaSupplier,
final OperationValidator<T> operationValidator,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final int operationPoolSize) {

this.slotToSszListSchemaSupplier = slotToSszListSchemaSupplier;
Expand All @@ -89,6 +100,54 @@ public MappedOperationPool(
OPERATION_POOL_SIZE_VALIDATION_REASON + metricType,
"Total number of attempts to add an operation to the pool, broken down by validation result",
"result");

this.timeProvider = timeProvider;
asyncRunner.runWithFixedDelay(
this::updateLocalSubmissions,
Duration.ofMinutes(10),
this::updateLocalSubmissionsErrorHandler);
}

private void updateLocalSubmissionsErrorHandler(Throwable throwable) {
LOG.debug("Failed to update " + metricType, throwable);
}

private void updateLocalSubmissions() {
final UInt64 staleTime =
timeProvider.getTimeInSeconds().minus(Duration.ofHours(2).getSeconds());
final List<OperationPoolEntry<T>> staleLocalOperations =
operations.values().stream()
.filter(OperationPoolEntry::isLocal)
.filter(entry -> entry.getTimeSubmitted().isLessThanOrEqualTo(staleTime))
.collect(Collectors.toList());
if (!staleLocalOperations.isEmpty()) {
LOG.info(
"Re-publishing {} operations that are still in the local {} operation pool",
staleLocalOperations.size(),
metricType);
}
int i = 0;
for (OperationPoolEntry<T> entry : staleLocalOperations) {
if (++i % 200 == 0) {
try {
// if we're processing large numbers, don't flood the network...
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.debug(e);
}
}
operationValidator
.validateForGossip(entry.getMessage())
.thenAcceptChecked(
result -> {
validationReasonCounter.labels(result.code().toString()).inc();
if (result.code().equals(ValidationResultCode.ACCEPT)) {
entry.setTimeSubmitted(timeProvider.getTimeInSeconds());
subscribers.forEach(s -> s.onOperationAdded(entry.getMessage(), result, false));
}
})
.finish(err -> LOG.debug("Failed to resubmit local operation", err));
}
}

private static InternalValidationResult rejectForDuplicatedMessage(
Expand Down Expand Up @@ -171,7 +230,9 @@ public void addAll(final SszCollection<T> items) {
items.forEach(
item -> {
final int validatorIndex = item.getValidatorId();
operations.putIfAbsent(validatorIndex, new OperationPoolEntry<>(item, false));
operations.putIfAbsent(
validatorIndex,
new OperationPoolEntry<>(item, false, timeProvider.getTimeInSeconds()));
});
}

Expand Down Expand Up @@ -212,7 +273,9 @@ private SafeFuture<InternalValidationResult> add(T item, boolean fromNetwork) {
result -> {
validationReasonCounter.labels(result.code().toString()).inc();
if (result.code().equals(ValidationResultCode.ACCEPT)) {
operations.put(validatorIndex, new OperationPoolEntry<>(item, !fromNetwork));
operations.put(
validatorIndex,
new OperationPoolEntry<>(item, !fromNetwork, timeProvider.getTimeInSeconds()));
subscribers.forEach(s -> s.onOperationAdded(item, result, fromNetwork));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.statetransition;

import org.jetbrains.annotations.NotNull;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.operations.MessageWithValidatorId;

public class OperationPoolEntry<T extends MessageWithValidatorId>
Expand All @@ -22,9 +23,12 @@ public class OperationPoolEntry<T extends MessageWithValidatorId>
private final T message;
private final boolean isLocal;

public OperationPoolEntry(T message, boolean isLocal) {
private UInt64 timeSubmitted;

public OperationPoolEntry(T message, boolean isLocal, final UInt64 timeSubmitted) {
this.message = message;
this.isLocal = isLocal;
this.timeSubmitted = timeSubmitted;
}

public T getMessage() {
Expand All @@ -35,6 +39,14 @@ public boolean isLocal() {
return isLocal;
}

public UInt64 getTimeSubmitted() {
return timeSubmitted;
}

public void setTimeSubmitted(final UInt64 timeSubmitted) {
this.timeSubmitted = timeSubmitted;
}

@Override
public int compareTo(@NotNull OperationPoolEntry<T> o) {
if (isLocal && !o.isLocal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.SafeFuture.completedFuture;
import static tech.pegasys.teku.statetransition.validation.InternalValidationResult.ACCEPT;
Expand All @@ -30,9 +33,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.schema.SszListSchema;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
Expand All @@ -55,6 +60,8 @@ public class MappedOperationPoolTest {
private final StubMetricsSystem metricsSystem = new StubMetricsSystem();
private final SignedBlsToExecutionChangeValidator validator =
mock(SignedBlsToExecutionChangeValidator.class);
private final StubTimeProvider stubTimeProvider = StubTimeProvider.withTimeInSeconds(1_000_000);
private final StubAsyncRunner asyncRunner = new StubAsyncRunner(stubTimeProvider);
private final Function<UInt64, SszListSchema<SignedBlsToExecutionChange, ?>>
blsToExecutionSchemaSupplier =
beaconBlockSchemaSupplier
Expand All @@ -63,7 +70,12 @@ public class MappedOperationPoolTest {
.andThen(BeaconBlockBodySchemaCapella::getBlsToExecutionChangesSchema);
private final OperationPool<SignedBlsToExecutionChange> pool =
new MappedOperationPool<>(
"BlsToExecutionOperationPool", metricsSystem, blsToExecutionSchemaSupplier, validator);
"BlsToExecutionOperationPool",
metricsSystem,
blsToExecutionSchemaSupplier,
validator,
asyncRunner,
stubTimeProvider);

@BeforeEach
void init() {
Expand Down Expand Up @@ -185,17 +197,63 @@ void shouldPruneFromPoolIfNoLongerValid() {
assertThat(pool.size()).isEqualTo(0);
}

@Test
void shouldNotUpdateLocalOperationsIfLessThanMinimumTime() {
when(validator.validateForGossip(any())).thenReturn(completedFuture(ACCEPT));
when(validator.validateForBlockInclusion(any(), any()))
.thenReturn(Optional.of(mock(OperationInvalidReason.class)));

initPoolWithSingleItem();
assertThat(pool.size()).isEqualTo(1);
// the object only gets verified once, because the async running task doesn't pick it up to
// reprocess
verify(validator, times(1)).validateForGossip(any());
final Subscription subscription = initialiseSubscriptions();
stubTimeProvider.advanceTimeBySeconds(7199);
assertThat(asyncRunner.countDelayedActions()).isEqualTo(1);
asyncRunner.executeDueActions();

verifyNoMoreInteractions(validator);
assertThat(subscription.getBlsToExecutionChange()).isEmpty();
}

@Test
void shouldUpdateLocalOperationsIfStale() {
when(validator.validateForGossip(any())).thenReturn(completedFuture(ACCEPT));
when(validator.validateForBlockInclusion(any(), any()))
.thenReturn(Optional.of(mock(OperationInvalidReason.class)));

initPoolWithSingleItem();
assertThat(pool.size()).isEqualTo(1);

final Subscription subscription = initialiseSubscriptions();
stubTimeProvider.advanceTimeBySeconds(7201);
assertThat(asyncRunner.countDelayedActions()).isEqualTo(1);
asyncRunner.executeDueActions();
// the stale object should get verified if it's being reprocessed
verify(validator, times(2)).validateForGossip(any());
assertThat(subscription.getBlsToExecutionChange()).isNotEmpty();
}

@Test
void shouldNotReprocessRemoteOperations() {
when(validator.validateForGossip(any())).thenReturn(completedFuture(ACCEPT));
final SignedBlsToExecutionChange remoteEntry =
dataStructureUtil.randomSignedBlsToExecutionChange();

assertThat(pool.addRemote(remoteEntry)).isCompleted();
verify(validator, times(1)).validateForGossip(any());
stubTimeProvider.advanceTimeBySeconds(1_000_000);
asyncRunner.executeDueActions();
// the object only gets verified once, because the async running task doesn't pick it up to
// reprocess
verifyNoMoreInteractions(validator);
}

@ParameterizedTest(name = "fromNetwork={0}")
@ValueSource(booleans = {true, false})
void subscribeOperationAddedSuccessfully(final boolean isFromNetwork) {
final Subscription subscription = new Subscription();
final OperationAddedSubscriber<SignedBlsToExecutionChange> subscriber =
(key, value, fromNetwork) -> {
subscription.setFromNetwork(fromNetwork);
subscription.setBlsToExecutionChange(key);
subscription.setInternalValidationResult(value);
};
pool.subscribeOperationAdded(subscriber);
final Subscription subscription = initialiseSubscriptions();
final SignedBlsToExecutionChange item = dataStructureUtil.randomSignedBlsToExecutionChange();
when(validator.validateForGossip(item)).thenReturn(completedFuture(ACCEPT));

Expand All @@ -212,14 +270,7 @@ void subscribeOperationAddedSuccessfully(final boolean isFromNetwork) {
@ParameterizedTest(name = "fromNetwork={0}")
@ValueSource(booleans = {true, false})
void subscribeOperationIgnored(final boolean isFromNetwork) {
final Subscription subscription = new Subscription();
final OperationAddedSubscriber<SignedBlsToExecutionChange> subscriber =
(key, value, fromNetwork) -> {
subscription.setFromNetwork(fromNetwork);
subscription.setBlsToExecutionChange(key);
subscription.setInternalValidationResult(value);
};
pool.subscribeOperationAdded(subscriber);
final Subscription subscription = initialiseSubscriptions();
final SignedBlsToExecutionChange item = dataStructureUtil.randomSignedBlsToExecutionChange();
when(validator.validateForGossip(item)).thenReturn(completedFuture(IGNORE));

Expand All @@ -242,14 +293,7 @@ void subscribeOperationIgnoredDuplicate(final boolean isFromNetwork)
// pre-populate cache
assertThat(pool.addRemote(item)).isCompleted();

final Subscription subscription = new Subscription();
final OperationAddedSubscriber<SignedBlsToExecutionChange> subscriber =
(key, value, fromNetwork) -> {
subscription.setFromNetwork(fromNetwork);
subscription.setBlsToExecutionChange(key);
subscription.setInternalValidationResult(value);
};
pool.subscribeOperationAdded(subscriber);
final Subscription subscription = initialiseSubscriptions();

final SafeFuture<InternalValidationResult> future;
if (isFromNetwork) {
Expand All @@ -265,6 +309,18 @@ void subscribeOperationIgnoredDuplicate(final boolean isFromNetwork)
assertThat(subscription.getFromNetwork()).isEmpty();
}

private Subscription initialiseSubscriptions() {
final Subscription subscription = new Subscription();
final OperationAddedSubscriber<SignedBlsToExecutionChange> subscriber =
(key, value, fromNetwork) -> {
subscription.setFromNetwork(fromNetwork);
subscription.setBlsToExecutionChange(key);
subscription.setInternalValidationResult(value);
};
pool.subscribeOperationAdded(subscriber);
return subscription;
}

private static class Subscription {
private Optional<Boolean> fromNetwork = Optional.empty();
private Optional<SignedBlsToExecutionChange> blsToExecutionChange = Optional.empty();
Expand Down
Loading

0 comments on commit 67df8e3

Please sign in to comment.