Skip to content

Commit

Permalink
feat: make transition logic toggle configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisKujawa committed Oct 25, 2021
1 parent 2660fef commit 13c98a9
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@
// TODO make package private again
public final class PartitionFactory {

/**
* Feature flag to switch between old and new partition bootstrap and transition code. The old
* code is based on LEADER_STEPS and FOLLOWER steps. The new code is based on TRANSITION_STEPS and
* ZeebePartition.STARTUP_PROCESS
*/
public static final boolean FEATURE_TOGGLE_USE_NEW_CODE = true;

private static final List<StartupStep<PartitionStartupContext>> STARTUP_STEPS =
List.of(
new LogDeletionPartitionStartupStep(), new RockDbMetricExporterPartitionStartupStep());
Expand All @@ -95,7 +88,6 @@ public final class PartitionFactory {
new StreamProcessorTransitionStep(),
new SnapshotDirectorPartitionTransitionStep(),
new ExporterDirectorPartitionTransitionStep());

private static final List<PartitionStep> LEADER_STEPS =
List.of(
PartitionStepMigrationHelper.fromStartupStep(new LogDeletionPartitionStartupStep()),
Expand Down Expand Up @@ -216,7 +208,9 @@ List<ZeebePartition> constructPartitions(
final ZeebePartition zeebePartition =
new ZeebePartition(
partitionStartupAndTransitionContext,
FEATURE_TOGGLE_USE_NEW_CODE ? newTransitionBehavior : transitionBehavior,
brokerCfg.getExperimental().isNewTransitionLogicEnabled()
? newTransitionBehavior
: transitionBehavior,
STARTUP_STEPS);

healthCheckService.registerMonitoredPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public class ExperimentalCfg implements ConfigurationEntry {
public static final DataSize DEFAULT_MAX_APPEND_BATCH_SIZE = DataSize.ofKilobytes(32);
public static final boolean DEFAULT_DISABLE_EXPLICIT_RAFT_FLUSH = false;
public static final boolean DEFAULT_ENABLE_PRIORITY_ELECTION = false;
public static final boolean DEFAULT_NEW_TRANSITION_LOGIC_ENABLED = false;

private boolean newTransitionLogicEnabled = DEFAULT_NEW_TRANSITION_LOGIC_ENABLED;
private int maxAppendsPerFollower = DEFAULT_MAX_APPENDS_PER_FOLLOWER;
private DataSize maxAppendBatchSize = DEFAULT_MAX_APPEND_BATCH_SIZE;
private boolean disableExplicitRaftFlush = DEFAULT_DISABLE_EXPLICIT_RAFT_FLUSH;
Expand Down Expand Up @@ -105,17 +107,31 @@ public void setQueryApi(final QueryApiCfg queryApi) {
this.queryApi = queryApi;
}

public boolean isNewTransitionLogicEnabled() {
return newTransitionLogicEnabled;
}

public void setNewTransitionLogicEnabled(boolean newTransitionLogicEnabled) {
this.newTransitionLogicEnabled = newTransitionLogicEnabled;
}

@Override
public String toString() {
return "ExperimentalCfg{"
+ "maxAppendsPerFollower="
+ "newTransitionLogicEnabled="
+ newTransitionLogicEnabled
+ ", maxAppendsPerFollower="
+ maxAppendsPerFollower
+ ", maxAppendBatchSize="
+ maxAppendBatchSize
+ ", disableExplicitRaftFlush="
+ disableExplicitRaftFlush
+ ", enablePriorityElection="
+ enablePriorityElection
+ ", rocksdb="
+ rocksdb
+ ", raft="
+ raft
+ ", partitioning="
+ partitioning
+ ", queryApi="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionFactory;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.HealthMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
Expand Down Expand Up @@ -58,6 +57,7 @@ public final class ZeebePartition extends Actor
private final PartitionTransition transition;
private CompletableActorFuture<Void> closeFuture;
private ActorFuture<Void> currentTransitionFuture;
private final boolean newTransitionLogicEnabled;

public ZeebePartition(
final PartitionStartupAndTransitionContextImpl transitionContext,
Expand All @@ -75,7 +75,9 @@ public ZeebePartition(
transitionContext.setDiskSpaceAvailable(true);

// todo remove after migration
if (PartitionFactory.FEATURE_TOGGLE_USE_NEW_CODE) {
newTransitionLogicEnabled =
transitionContext.getBrokerCfg().getExperimental().isNewTransitionLogicEnabled();
if (newTransitionLogicEnabled) {
transition.setConcurrencyControl(actor);
}
// todo remove after migration
Expand Down Expand Up @@ -109,7 +111,7 @@ public String getName() {

@Override
public void onActorStarting() {
if (PartitionFactory.FEATURE_TOGGLE_USE_NEW_CODE) {
if (newTransitionLogicEnabled) {
startupProcess
.startup(actor, startupContext)
.onComplete(
Expand Down Expand Up @@ -160,7 +162,7 @@ protected void onActorClosing() {
.getComponentHealthMonitor()
.removeComponent(context.getRaftPartition().name());

if (PartitionFactory.FEATURE_TOGGLE_USE_NEW_CODE) {
if (newTransitionLogicEnabled) {
startupProcess
.shutdown(actor, startupContext)
.onComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.configuration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -86,4 +87,52 @@ public void shouldSetRaftMinStepDownFailureCountFromEnv() {
// then
assertThat(raft.getMinStepDownFailureCount()).isEqualTo(10);
}

@Test
public void shouldSetNewTransitionLogicEnabledFromFile() {
// given
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);

// when
final var experimental = cfg.getExperimental();

// then
assertThat(experimental.isNewTransitionLogicEnabled()).isTrue();
}

@Test
public void shouldUseDefaultValueForNewTransitionLogicEnabled() {
// given
final BrokerCfg cfg = TestConfigReader.readConfig("default", environment);

// when
final var experimental = cfg.getExperimental();

// then
assertThat(experimental.isNewTransitionLogicEnabled()).isFalse();
}

@Test
public void shouldSetNewTransitionLogicEnabledFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.minStepDownFailureCount", "10");
final BrokerCfg cfg = TestConfigReader.readConfig("default", environment);

// when
final var experimental = cfg.getExperimental();

// then
assertThat(experimental.isNewTransitionLogicEnabled()).isTrue();
}

@Test
public void shouldFailOnUnparsableValue() {
// given
environment.put("zeebe.broker.experimental.newTransitionLogicEnabled", "yolo");

// when
assertThatThrownBy(() -> TestConfigReader.readConfig("experimental-cfg", environment))
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage("Invalid boolean value [yolo]");
}
}
1 change: 1 addition & 0 deletions broker/src/test/resources/system/experimental-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ zeebe:
minStepDownFailureCount: 5
queryApi:
enabled: true
newTransitionLogicEnabled: true

0 comments on commit 13c98a9

Please sign in to comment.