Skip to content

Commit

Permalink
feat(engine): add ZeebeTestEngine implementation
Browse files Browse the repository at this point in the history
Adds a new module, `engine`, which provides a `ZeebeTestEngine`
implementation backed by either a container or a cluster.

The implementations define idle/busy state based on the rate of records
exported (idle being no records within a certain period, and busy being
at least one record within a certain period). It also allows for a grace
period to be defined, meaning one can omit calls to
`ZeebeTestEngine#waitForIdleState(Duration)` (with some caveats).

The main entry point of the module is the interface `ContainerEngine`,
which extends both `Startable` and `ZeebeTestEngine`. By extending
`Startable`, we can let the Testcontainers extension manage the
lifecycle of our containers/engine.

Additionally, the concrete implementations implement
`TestLifecycleAware`, which allows us to do pre/post test tasks, such as
initializing the `BpmnAssert` record source, and printing out the log on
failure.
  • Loading branch information
npepinpe committed Jul 22, 2022
1 parent f693d93 commit eb99c16
Show file tree
Hide file tree
Showing 29 changed files with 2,121 additions and 55 deletions.
11 changes: 9 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,19 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- Test dependencies -->

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -201,6 +207,7 @@
<goals>
<goal>copy</goal>
</goals>
<!-- TODO: set to package, and run the relevant tests as integration tests -->
<phase>generate-resources</phase>
<configuration>
<artifactItems>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ default T withZeebeData(final ZeebeData data) {
@API(status = Status.EXPERIMENTAL)
default T withDebugExporter(final int port) {
Testcontainers.exposeHostPorts(port);

//noinspection resource
withCopyToContainer(
MountableFile.forClasspathResource("debug-exporter.jar"), "/tmp/debug-exporter.jar")
.withEnv("ZEEBE_BROKER_EXPORTERS_DEBUG_JARPATH", "/tmp/debug-exporter.jar")
Expand Down
44 changes: 34 additions & 10 deletions core/src/main/java/io/zeebe/containers/cluster/ZeebeCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.zeebe.containers.ZeebeGatewayNode;
import io.zeebe.containers.ZeebeNode;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
Expand All @@ -37,7 +38,7 @@
/**
* A convenience class representing a one or more containers that form a Zeebe cluster.
*
* <p>It's recommended to use the {@link ZeebeClientBuilder} to build one.
* <p>It's recommended to use the {@link ZeebeClusterBuilder} to build one.
*
* <p>As the cluster is not started automatically, the containers can still be modified/configured
* beforehand. Be aware however that the replication factor and the partitions count cannot be
Expand Down Expand Up @@ -209,6 +210,19 @@ public Map<Integer, ZeebeBrokerNode<? extends GenericContainer<?>>> getBrokers()
return brokers;
}

/**
* Returns a map of all nodes in the cluster, where the keys are the member IDs (for brokers, the
* node ID), and the values are the containers.
*
* @return the nodes of this cluster
*/
public Map<String, ZeebeNode<? extends GenericContainer<?>>> getNodes() {
final Map<String, ZeebeNode<? extends GenericContainer<?>>> nodes = new HashMap<>(gateways);
brokers.forEach((id, node) -> nodes.put(String.valueOf(id), node));

return nodes;
}

/**
* Builds a new client builder by picking a random gateway started gateway for it and disabling
* transport security.
Expand All @@ -217,21 +231,31 @@ public Map<Integer, ZeebeBrokerNode<? extends GenericContainer<?>>> getBrokers()
* @throws NoSuchElementException if there are no started gateways
*/
public ZeebeClientBuilder newClientBuilder() {
final ZeebeGatewayNode<?> gateway =
gateways.values().stream()
.filter(ZeebeNode::isStarted)
.findAny()
.orElseThrow(
() ->
new NoSuchElementException(
"Expected at least one gateway for the client to connect to, but there is"
+ " none"));
final ZeebeGatewayNode<?> gateway = getAvailableGateway();

return ZeebeClient.newClientBuilder()
.gatewayAddress(gateway.getExternalGatewayAddress())
.usePlaintext();
}

/**
* Returns the first gateway which can accept requests from a Zeebe client.
*
* @return a gateway ready to accept requests
* @throws NoSuchElementException if there are no such gateways (e.g. none are started, or they
* are dead, etc.)
*/
public ZeebeGatewayNode<? extends GenericContainer<?>> getAvailableGateway() {
return gateways.values().stream()
.filter(ZeebeNode::isStarted)
.findAny()
.orElseThrow(
() ->
new NoSuchElementException(
"Expected at least one gateway for the client to connect to, but there is"
+ " none"));
}

private Stream<? extends GenericContainer<?>> getGatewayContainers() {
return gateways.values().stream().map(Container::self);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apiguardian.api.API;
import org.apiguardian.api.API.Status;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -114,8 +114,8 @@ public class ZeebeClusterBuilder {
private DockerImageName brokerImageName = getInstance().getDefaultDockerImage();

private Consumer<ZeebeNode<?>> nodeConfig = cfg -> {};
private Consumer<ZeebeBrokerNode<?>> brokerConfig = cfg -> {};
private Consumer<ZeebeGatewayNode<?>> gatewayConfig = cfg -> {};
private BiConsumer<Integer, ZeebeBrokerNode<?>> brokerConfig = (id, cfg) -> {};
private BiConsumer<String, ZeebeGatewayNode<?>> gatewayConfig = (memberId, cfg) -> {};

private final Map<String, ZeebeGatewayNode<? extends GenericContainer<?>>> gateways =
new HashMap<>();
Expand Down Expand Up @@ -305,35 +305,78 @@ public ZeebeClusterBuilder withNodeConfig(final Consumer<ZeebeNode<?>> nodeCfgFu

/**
* Sets the configuration function that will be executed in the {@link #build()} method on each
* gateway (including embedded gateways). NOTE: in case of conflicts with {@link #nodeConfig} this
* configuration will override {@link #nodeConfig}. NOTE: in case of conflicts with this
* configuration is an embedded gateway configuration and a broker configuration, broker
* configuration will override this configuration.
* gateway (including embedded gateways). The first argument of is the member ID of the gateway,
* and the second argument is the gateway container itself.
*
* <p>NOTE: in case of conflicts with {@link #nodeConfig} this configuration will override {@link
* #nodeConfig}.
*
* <p>NOTE: in case of conflicts with this configuration is an embedded gateway configuration and
* a broker configuration, broker configuration will override this configuration.
*
* @param gatewayCfgFunction the function that will be applied on all cluster gateways (embedded
* ones included)
* @return this builder instance for chaining
*/
public ZeebeClusterBuilder withGatewayConfig(
final Consumer<ZeebeGatewayNode<?>> gatewayCfgFunction) {
final BiConsumer<String, ZeebeGatewayNode<?>> gatewayCfgFunction) {
this.gatewayConfig = gatewayCfgFunction;
return this;
}

/**
* Sets the configuration function that will be executed in the {@link #build()} method on each
* broker. NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this
* gateway (including embedded gateways).
*
* <p>NOTE: in case of conflicts with {@link #nodeConfig} this configuration will override {@link
* #nodeConfig}.
*
* <p>NOTE: in case of conflicts with this configuration is an embedded gateway configuration and
* a broker configuration, broker configuration will override this configuration.
*
* @param gatewayCfgFunction the function that will be applied on all cluster gateways (embedded
* ones included)
* @return this builder instance for chaining
*/
public ZeebeClusterBuilder withGatewayConfig(
final Consumer<ZeebeGatewayNode<?>> gatewayCfgFunction) {
this.gatewayConfig = (memberId, gateway) -> gatewayCfgFunction.accept(gateway);
return this;
}

/**
* Sets the configuration function that will be executed in the {@link #build()} method on each
* broker. The first argument is the broker ID, and the second argument is the broker container
* itself.
*
* <p>NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this
* configuration will override them.
*
* @param brokerCfgFunction the function that will be applied on all cluster brokers
* @return this builder instance for chaining
*/
public ZeebeClusterBuilder withBrokerConfig(
final Consumer<ZeebeBrokerNode<?>> brokerCfgFunction) {
final BiConsumer<Integer, ZeebeBrokerNode<?>> brokerCfgFunction) {
this.brokerConfig = brokerCfgFunction;
return this;
}

/**
* Sets the configuration function that will be executed in the {@link #build()} method on each
* broker.
*
* <p>NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this
* configuration will override them.
*
* @param brokerCfgFunction the function that will be applied on all cluster brokers
* @return this builder instance for chaining
*/
public ZeebeClusterBuilder withBrokerConfig(
final Consumer<ZeebeBrokerNode<?>> brokerCfgFunction) {
this.brokerConfig = (id, broker) -> brokerCfgFunction.accept(broker);
return this;
}

/**
* Builds a new Zeebe cluster. Will create {@link #brokersCount} brokers (accessible later via
* {@link ZeebeCluster#getBrokers()}) and {@link #gatewaysCount} standalone gateways (accessible
Expand Down Expand Up @@ -365,22 +408,28 @@ public ZeebeCluster build() {
// is one
createStandaloneGateways();

Stream.concat(brokers.values().stream(), gateways.values().stream())
.distinct()
.forEach(this::applyConfigFunctions);
// apply free-form configuration functions
brokers.forEach(this::applyConfigFunctions);
gateways.forEach(
(memberId, gateway) -> {
// skip brokers/embedded gateways
if (!(gateway instanceof ZeebeBrokerNode<?>)) {
applyConfigFunctions(memberId, gateway);
}
});

return new ZeebeCluster(network, name, gateways, brokers, replicationFactor, partitionsCount);
}

private void applyConfigFunctions(final ZeebeNode<?> node) {
private void applyConfigFunctions(final Object id, final ZeebeNode<?> node) {
nodeConfig.accept(node);

if (node instanceof ZeebeGatewayNode) {
gatewayConfig.accept((ZeebeGatewayNode<?>) node);
gatewayConfig.accept(String.valueOf(id), (ZeebeGatewayNode<?>) node);
}

if (node instanceof ZeebeBrokerNode) {
brokerConfig.accept((ZeebeBrokerNode<?>) node);
brokerConfig.accept((Integer) id, (ZeebeBrokerNode<?>) node);
}
}

Expand Down Expand Up @@ -437,6 +486,7 @@ private void createStandaloneGateways() {
final ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < gatewaysCount; i++) {
final String memberId = GATEWAY_NETWORK_ALIAS_PREFIX + i;
//noinspection resource
final ZeebeGatewayContainer gateway = createStandaloneGateway(memberId);
gateway.withStartupTimeout(Duration.ofMinutes((long) gatewaysCount + brokersCount));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.jcip.annotations.ThreadSafe;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.HttpProcessors;
Expand All @@ -44,6 +45,7 @@
* <p>See {@link RecordHandler} for documentation about the /records endpoint.
*/
@API(status = Status.EXPERIMENTAL)
@ThreadSafe
public final class DebugReceiver implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DebugReceiver.class);

Expand Down
Loading

0 comments on commit eb99c16

Please sign in to comment.