Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8525 beacon states cleanup #8725

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
### Breaking Changes

### Additions and Improvements
- Clean up old beacon states when switching from ARCHIVE to PRUNE or MINIMAL data storage mode

### Bug Fixes
3 changes: 3 additions & 0 deletions services/chainstorage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ dependencies {
implementation project(':infrastructure:events')

implementation 'org.hyperledger.besu:plugin-api'

testImplementation testFixtures(project(':infrastructure:async'))
testImplementation testFixtures(project(':ethereum:execution-types'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE;
import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM;

import com.google.common.annotations.VisibleForTesting;
import java.nio.file.Path;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -130,28 +131,17 @@ protected SafeFuture<?> doStart() {
}
if (config.getDataStorageMode().storesFinalizedStates()
&& config.getRetainedSlots() > 0) {
if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) {
throw new InvalidConfigurationException(
"State pruning is not supported with leveldb_tree database.");
} else {
LOG.info(
"State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution. ",
config.getStatePruningInterval().toMinutes(),
config.getRetainedSlots(),
config.getStatePruningLimit());
statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
config.getRetainedSlots(),
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
configureStatePruner(
config.getRetainedSlots(),
storagePrunerAsyncRunner,
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge);
} else if (!config.getDataStorageMode().storesFinalizedStates()) {
configureStatePruner(
StorageConfiguration.DEFAULT_STORAGE_RETAINED_SLOTS,
storagePrunerAsyncRunner,
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge);
}

final DataArchive dataArchive =
Expand Down Expand Up @@ -228,6 +218,41 @@ protected SafeFuture<?> doStart() {
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

void configureStatePruner(
final long slotsToRetain,
final AsyncRunner storagePrunerAsyncRunner,
final SettableLabelledGauge pruningTimingsLabelledGauge,
final SettableLabelledGauge pruningActiveLabelledGauge) {
if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) {
throw new InvalidConfigurationException(
"State pruning is not supported with leveldb_tree database.");
}

LOG.info(
"State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution.",
config.getStatePruningInterval().toMinutes(),
slotsToRetain,
config.getStatePruningLimit());

statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
slotsToRetain,
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}

@VisibleForTesting
public Optional<StatePruner> getStatePruner() {
return statePruner;
}

@Override
protected SafeFuture<?> doStop() {
return blockPruner
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.services.chainstorage;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.nio.file.Path;
import java.util.Optional;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunnerFactory;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.service.serviceutils.ServiceConfig;
import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.storage.server.DatabaseVersion;
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.server.StorageConfiguration;
import tech.pegasys.teku.storage.server.pruner.StatePruner;

class StorageServiceTest {

private final ServiceConfig serviceConfig = mock(ServiceConfig.class);
private final StorageConfiguration storageConfiguration = mock(StorageConfiguration.class);
private final MetricsSystem metricsSystem = mock(MetricsSystem.class);
private final DataDirLayout dataDirLayout = mock(DataDirLayout.class);
private final Eth1Address eth1DepositContract = mock(Eth1Address.class);
private final Spec spec = mock(Spec.class);
private final EventChannels eventChannels = mock(EventChannels.class);
private StorageService storageService;

@BeforeEach
void setUp(@TempDir final Path tempDir) {
when(serviceConfig.getMetricsSystem()).thenReturn(metricsSystem);
when(dataDirLayout.getBeaconDataDirectory()).thenReturn(tempDir);
when(serviceConfig.getDataDirLayout()).thenReturn(dataDirLayout);
when(storageConfiguration.getDataStorageCreateDbVersion()).thenReturn(DatabaseVersion.NOOP);
when(storageConfiguration.getMaxKnownNodeCacheSize())
.thenReturn(StorageConfiguration.DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE);
when(storageConfiguration.getDataStorageFrequency())
.thenReturn(StorageConfiguration.DEFAULT_STORAGE_FREQUENCY);
when(storageConfiguration.getEth1DepositContract()).thenReturn(eth1DepositContract);
when(storageConfiguration.isStoreNonCanonicalBlocksEnabled()).thenReturn(false);
when(storageConfiguration.getSpec()).thenReturn(spec);

when(eventChannels.subscribe(any(), any())).thenReturn(eventChannels);
when(serviceConfig.getEventChannels()).thenReturn(eventChannels);

final StubAsyncRunnerFactory asyncRunnerFactory = new StubAsyncRunnerFactory();
when(serviceConfig.getAsyncRunnerFactory()).thenReturn(asyncRunnerFactory);

final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();
when(serviceConfig.createAsyncRunner(any(), anyInt(), anyInt(), anyInt()))
.thenReturn(stubAsyncRunner);

storageService = new StorageService(serviceConfig, storageConfiguration, false, false);
}

@Test
void shouldNotSetupStatePrunerWhenArchiveMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isEmpty();
}

@Test
void shouldSetupStatePrunerWhenArchiveModeAndRetentionSlotsEnabled() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE);
when(storageConfiguration.getRetainedSlots()).thenReturn(5L);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}

@Test
void shouldSetupStatePrunerWhenPruneMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.PRUNE);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}

@Test
void shouldSetupStatePrunerWhenMinimalMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.MINIMAL);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}
}