From 4138c11ca872d0e51f29965cf3b3e8f6f801a056 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:08:26 +0200 Subject: [PATCH 1/7] feat(core): auto acknowledge exported records --- .../containers/exporter/DebugReceiver.java | 47 ++++++++++++++++++- .../containers/exporter/RecordHandler.java | 16 ++++++- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java index 52227b31..ba9dfdf7 100644 --- a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java +++ b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java @@ -65,6 +65,18 @@ public DebugReceiver(final Consumer> recordConsumer) { this(recordConsumer, 0); } + /** + * A debug receiver which binds to localhost using a random available port on start, forwarding + * all records to the given consumer. + * + * @param recordConsumer a consumer called every time a record is received + * @param autoAcknowledge if true, will automatically acknowledge all records + * @throws NullPointerException if any {@code recordConsumer} is null + */ + public DebugReceiver(final Consumer> recordConsumer, final boolean autoAcknowledge) { + this(recordConsumer, 0, autoAcknowledge); + } + /** * A debug receiver which binds to localhost using the given port on start, forwarding all records * * to the given consumer. @@ -74,7 +86,21 @@ public DebugReceiver(final Consumer> recordConsumer) { * @throws NullPointerException if any {@code recordConsumer} is null */ public DebugReceiver(final Consumer> recordConsumer, final int port) { - this(recordConsumer, new InetSocketAddress("localhost", port)); + this(recordConsumer, port, true); + } + + /** + * A debug receiver which binds to localhost using the given port on start, forwarding all records + * * to the given consumer. + * + * @param recordConsumer a consumer called every time a record is received + * @param port the port to bind to; can be 0 to grab a random port + * @param autoAcknowledge if true, will automatically acknowledge all records + * @throws NullPointerException if any {@code recordConsumer} is null + */ + public DebugReceiver( + final Consumer> recordConsumer, final int port, final boolean autoAcknowledge) { + this(recordConsumer, new InetSocketAddress("localhost", port), autoAcknowledge); } /** @@ -86,7 +112,24 @@ public DebugReceiver(final Consumer> recordConsumer, final int port) { * @throws NullPointerException if any of the arguments are null */ public DebugReceiver(final Consumer> recordConsumer, final InetSocketAddress address) { - this(new RecordHandler(recordConsumer), address); + this(recordConsumer, address, true); + } + + /** + * A debug receiver which binds to {@code address} on start, and forwards all record to the {@code + * recordConsumer}, and also automatically acknowledges all exported records, marking them for + * deletion in Zeebe (assuming no other exporters are defined). + * + * @param recordConsumer the consumer which will receive records + * @param address the address to bind to on start + * @param autoAcknowledge if true, will automatically acknowledge all records + * @throws NullPointerException if any of the arguments are null + */ + public DebugReceiver( + final Consumer> recordConsumer, + final InetSocketAddress address, + final boolean autoAcknowledge) { + this(new RecordHandler(recordConsumer, autoAcknowledge), address); } /** diff --git a/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java b/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java index 45bfb2bf..329bbe17 100644 --- a/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java +++ b/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java @@ -71,14 +71,20 @@ final class RecordHandler implements AsyncServerRequestHandler> recordConsumer; + private final boolean autoAcknowledge; private final Map positions = new HashMap<>(); RecordHandler(final Consumer> recordConsumer) { + this(recordConsumer, true); + } + + RecordHandler(final Consumer> recordConsumer, final boolean autoAcknowledge) { this.recordConsumer = Objects.requireNonNull(recordConsumer, "must specify a record consumer"); + this.autoAcknowledge = autoAcknowledge; } void acknowledge(final int partitionId, final long position) { - positions.put(partitionId, position); + positions.merge(partitionId, position, Math::max); } @Override @@ -122,7 +128,13 @@ public void handle( return; } - records.forEach(recordConsumer); + for (final Record record : records) { + recordConsumer.accept(record); + + if (autoAcknowledge) { + acknowledge(record.getPartitionId(), record.getPosition()); + } + } final int partitionId = records.get(0).getPartitionId(); final AsyncResponseProducer responseProducer = createSuccessfulResponse(partitionId); From b73f6e06da9f653dc1d3d064fe3d48e4d7ab6543 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:08:51 +0200 Subject: [PATCH 2/7] docs: add documentation for zeebe-process-test --- README.md | 161 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 137 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index e88cb443..381c2d79 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ use containers for your tests, as well as general prerequisites. - [Extracting data](#extracting-data) - [Time traveling](#time-traveling) - [Debug exporter](#debug-exporter) + - [Zeebe Test Process Compatibility](#zeebe-test-process-compatibility) - [Tips](#tips) - [Tailing your container's logs during development](#tailing-your-containers-logs-during-development) - [Configuring GenericContainer specific properties with a Zeebe*Node interface](#configuring-genericcontainer-specific-properties-with-a-zeebenode-interface) @@ -66,6 +67,12 @@ Version 1.x and 2.x is compatible with the following Zeebe versions: Version 3.x is compatible with the following Zeebe versions: - 1.x +- 8.0 + +Version 4.x is compatible with the following Zeebe versions: + +- 1.x +- 8.x ## Installation @@ -76,12 +83,12 @@ Add the project to your dependencies: io.zeebe zeebe-test-container - 3.3.0 + 4.0.0 ``` ```groovy -testImplementation 'io.zeebe:zeebe-test-container:3.3.0' +testImplementation 'io.zeebe:zeebe-test-container:4.0.0' ``` ### Requirements @@ -332,14 +339,15 @@ variables or via configuration file. You can find out more about it on the > [here](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-external-config) > > Testcontainers provide mechanisms through which -> [environment variables can be injected](https://www.javadoc.io/doc/org.testcontainers/testcontainers/1.14.3/org/testcontainers/containers/GenericContainer.html#withEnv-java.lang.String-java.lang.String-), +> [environment variables can be injected](https://www.javadoc.io/doc/org.testcontainers/testcontainers/1.14.3/org/testcontainers/containers/GenericContainer.html#withEnv-java.lang.String-java.lang.String-) +> , > or [configuration files mounted](https://www.testcontainers.org/features/files/). Refer to their > documentation for more. ## Examples A series of examples are included as part of the tests, see -[test/java/io/zeebe/containers/examples](/src/test/java/io/zeebe/containers/examples). +[test/java/io/zeebe/containers/examples](/core/src/test/java/io/zeebe/containers/examples). > Note that these are written for junit5. @@ -463,12 +471,13 @@ class ZeebeClusterWithGatewayExampleTest { ``` You can find more examples by looking at the -[test/java/io/zeebe/containers/examples/cluster](/src/test/java/io/zeebe/containers/examples/cluster) +[test/java/io/zeebe/containers/examples/cluster](/core/src/test/java/io/zeebe/containers/examples/cluster) package. ### Cluster startup time -There are some caveat as well. For example, if you want to create a large cluster with many brokers and need to increase the +There are some caveat as well. For example, if you want to create a large cluster with many brokers +and need to increase the startup time: ```java @@ -488,15 +497,15 @@ class ZeebeHugeClusterTest { @Container private final ZeebeCluster cluster = - ZeebeCluster.builder() - .withEmbeddedGateway(false) - .withGatewaysCount(3) - .withBrokersCount(6) - .withPartitionsCount(6) - .withReplicationFactor(3) - // configure each container to have a high start up time as they get started in parallel - .withNodeConfig(node -> node.self().withStartupTimeout(Duration.ofMinutes(5))) - .build(); + ZeebeCluster.builder() + .withEmbeddedGateway(false) + .withGatewaysCount(3) + .withBrokersCount(6) + .withPartitionsCount(6) + .withReplicationFactor(3) + // configure each container to have a high start up time as they get started in parallel + .withNodeConfig(node -> node.self().withStartupTimeout(Duration.ofMinutes(5))) + .build(); @Test @Timeout(value = 30, unit = TimeUnit.MINUTES) @@ -509,7 +518,8 @@ class ZeebeHugeClusterTest { ## Debugging There might be cases where you want to debug a container you just started in one of your tests. You -can use the [RemoteDebugger](/src/main/java/io/zeebe/containers/util/RemoteDebugger.java) utility +can use the [RemoteDebugger](/core/src/main/java/io/zeebe/containers/util/RemoteDebugger.java) +utility for this. By default, it will start your container and attach a debugging agent to it on port 5005. The container startup is then suspended until a debugger attaches to it. @@ -653,7 +663,7 @@ final class VolumeExampleTest { ``` You can see a more complete -example [here](/src/test/java/io/zeebe/containers/examples/ReusableVolumeExampleTest.java). +example [here](/core/src/test/java/io/zeebe/containers/examples/ReusableVolumeExampleTest.java). ## Extracting data @@ -664,7 +674,7 @@ regenerating that data. There are two main interfaces for this. If you want to extract the data from a running container, you can directly -use [ContainerArchive](/src/main/java/io/zeebe/containers/archive/ContainerArchive.java). This +use [ContainerArchive](/core/src/main/java/io/zeebe/containers/archive/ContainerArchive.java). This represents a reference to a zipped, TAR file on a given container, which can be extracted to a local path. @@ -711,7 +721,7 @@ final class ExtractDataLiveExampleTest { > is. You can find more examples for this feature -under [examples/archive](/src/test/java/io/zeebe/containers/examples/archive). +under [examples/archive](/core/src/test/java/io/zeebe/containers/examples/archive). ## Time traveling @@ -742,7 +752,8 @@ final class SetupMutableClockExample { // any subsequent tests can now mutate the clock @Test - void shouldMutateTheClock() {} + void shouldMutateTheClock() { + } } ``` @@ -750,7 +761,7 @@ final class SetupMutableClockExample { > message about the clock's immutability. Once the clock is mutable, you can then use the provided high level API, -[ZeebeClock](/src/main/java/io/zeebe/containers/clock/ZeebeClock.java). +[ZeebeClock](/core/src/main/java/io/zeebe/containers/clock/ZeebeClock.java). Here's a basic example which will simply advance the broker's time: @@ -772,6 +783,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; @Execution(ExecutionMode.SAME_THREAD) @Testcontainers final class ZeebeClockTest { + @Container private static final ZeebeBrokerContainer BROKER = new ZeebeBrokerContainer().withEnv("ZEEBE_CLOCK_CONTROLLED", "true"); @@ -801,7 +813,7 @@ final class ZeebeClockTest { ``` You can find more examples for this feature -under [examples/clock](/src/test/java/io/zeebe/containers/examples/clock). +under [examples/clock](/core/src/test/java/io/zeebe/containers/examples/clock). ### Tips and limitations @@ -852,6 +864,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.junit.jupiter.api.Test; final class WithDebugExporterTest { + @Test void shouldReadExportedRecords() { final List> records = new CopyOnWriteArrayList<>(); @@ -868,7 +881,7 @@ final class WithDebugExporterTest { } ``` -You can view a longer example [here](/src/test/java/io/zeebe/containers/examples/exporter). +You can view a longer example [here](/core/src/test/java/io/zeebe/containers/examples/exporter). ### Acknowledging records @@ -886,6 +899,103 @@ There is one limitation, which is that the acknowledged position is the one retu This means, if you receive record 1, and then acknowledge that, only when receiving record 2 will the acknowledged position take effect. +## Zeebe Test Process Compatibility + +It's possible to use a container or cluster as the backing engine when working +with [zeebe-process-test](https://github.com/camunda/zeebe-process-test). This will let you reuse +all the assertions you are used to, while running integration tests using one or more actual +Zeebe instances. + +The usage differs a little from normal `zeebe-process-test` usage. Whereas there you would use the +`@ZeebeProcessTest` annotation as your Junit 5 extension, here we stick with the standard +Testcontainers annotations, i.e. the combination of `@Testcontainers` and `@Container`. Doing so +means you keep using familiar tools to manage the lifecycle of your containers, and it lets us be +more flexible when it comes to customizing said containers. + +To illustrate, here is a minimal example which deploys a process definition, creates an instance, +and waits for its completion: + +```java +package com.acme; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.process.test.assertions.BpmnAssert; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.engine.ContainerEngine; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +final class ExampleTest { + + @Container + private final ContainerEngine engine = ContainerEngine.createDefault(); + + @Test + void shouldCompleteProcessInstance() { + // given + final BpmnModelInstance processModel = + Bpmn.createExecutableProcess("process").startEvent().endEvent().done(); + final ProcessInstanceEvent processInstance; + + // when + try (final ZeebeClient client = engine.createClient()) { + client.newDeployResourceCommand().addProcessModel(processModel, "process.bpmn").send().join(); + processInstance = + client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join(); + } + + // then + engine.waitForIdleState(Duration.ofSeconds(5)); + BpmnAssert.assertThat(processInstance).isStarted().isCompleted(); + } +} +``` + +As you can see, you can keep writing your tests the exact same way as you would with +`zeebe-process-test`, the only difference is in the creation of the `ZeebeTestEngine`. + +You can look +at [ContainerEngine](/engine/src/main/java/io/zeebe/containers/engine/ContainerEngine.java) for an +explanation of how it differs from the normal +`ZeebeTestEngine`, and for hints on how to build customized engines. There are also +some [examples](/engine/src/test/java/io/zeebe/containers/engine/examples) which hopefully +illustrate how to use this with all the customization options. + +### Caveats + +#### Idle/busy state + +The bundled, stripped down engine available with `zeebe-process-test` allows for much tighter +integration with the assertions, such that it can accurately report if it is idle or busy. When +running with a real Zeebe instance, this is hidden from us. + +Instead, we define idle to mean that no records are exported by the debug exporter for a certain +period of time (the `ContainerEngineBuilder#withIdlePeriod` configuration). The idle state is +defined over all partitions, not only a single partition. + +Similarly, we define a busy state as at least one record was exported during the timeout period. + +### Awaitility issues + +If you wish to wrap your `BpmnAssert` calls with `Awaitility` for resilience, note that `BpmnAssert` +sets the record stream source as a thread local. Since `Awaitility`, by default, polls on different +threads, you may run into issues where no record stream source is found, or the wrong record stream +source is used. + +To properly use it, make sure to either configure `Awaility#pollInSameThread`, or in your callback, +overwrite the thread local stream source with `BpmnAssert#initRecordStreamSource`. This latter is +not recommended if you run your tests in parallel, however, as you may affect other tests. + # Tips ## Tailing your container's logs during development @@ -1027,6 +1137,8 @@ The library is split into three modules: - `core`: the core library. It's artifact ID is `zeebe-test-container` for backwards compatibility. This is what users will include in their project. +- `engine`: the implementation of `ZeebeTestEngine`, the compatibility layer between this library + and [Zeebe Process Test](https://github.com/camunda/zeebe-process-test). - `exporter`: the debug exporter module. It will be packaged as a fat JAR and included as a resource in the core module. It has to be a separate module as it targets Java 17, same as the `zeebe-exporter-api` module it implements. @@ -1059,7 +1171,8 @@ Testing is done via GitHub actions, using two workflows: analysis, linting, formatting, etc.) - [test.yml](/.github/workflows/test.yml): testing jobs for each module. For the `core` module, there are two testing jobs, one using TestContainers Cloud, and one using the local Docker daemon. - This is due to a limitation of TestContainers Cloud, which does not work with host binds. Any tests + This is due to a limitation of TestContainers Cloud, which does not work with host binds. Any + tests which need to run on the local job should be annotated with `@DisabledIfTestcontainersCloud`. ## Code style From 52737592aa7e872c0d4cc771e314e09dc1606acf Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Sun, 17 Jul 2022 16:48:49 +0200 Subject: [PATCH 3/7] feat(engine): add ZeebeTestEngine implementation Adds a new module, `engine`, which provides a `ZeebeTestEngine` implementation backed by either a container or a cluster. The implementations define idle/busy state based on the rate of records exported (idle being no records within a certain period, and busy being at least one record within a certain period). It also allows for a grace period to be defined, meaning one can omit calls to `ZeebeTestEngine#waitForIdleState(Duration)` (with some caveats). The main entry point of the module is the interface `ContainerEngine`, which extends both `Startable` and `ZeebeTestEngine`. By extending `Startable`, we can let the Testcontainers extension manage the lifecycle of our containers/engine. Additionally, the concrete implementations implement `TestLifecycleAware`, which allows us to do pre/post test tasks, such as initializing the `BpmnAssert` record source, and printing out the log on failure. --- core/pom.xml | 11 +- .../io/zeebe/containers/ZeebeBrokerNode.java | 2 + .../containers/cluster/ZeebeCluster.java | 44 ++++- .../cluster/ZeebeClusterBuilder.java | 82 ++++++-- .../containers/exporter/DebugReceiver.java | 2 + .../cluster/ZeebeClusterBuilderTest.java | 34 ++-- .../zeebe/containers/util/TopologyAssert.java | 3 +- engine/pom.xml | 133 +++++++++++++ .../containers/engine/ContainerEngine.java | 176 ++++++++++++++++++ .../engine/ContainerEngineBuilder.java | 135 ++++++++++++++ .../containers/engine/ContainerEngines.java | 151 +++++++++++++++ .../engine/DebugReceiverStream.java | 163 ++++++++++++++++ .../containers/engine/InfiniteIterator.java | 95 ++++++++++ .../zeebe/containers/engine/InfiniteList.java | 62 ++++++ .../engine/TestAwareContainerEngine.java | 46 +++++ .../containers/engine/ZeebeClusterEngine.java | 109 +++++++++++ .../engine/ZeebeContainerEngine.java | 108 +++++++++++ .../engine/DebugReceiverStreamTest.java | 159 ++++++++++++++++ .../containers/engine/InfiniteListTest.java | 107 +++++++++++ .../engine/ZeebeClusterEngineTest.java | 148 +++++++++++++++ .../engine/ZeebeContainerEngineTest.java | 119 ++++++++++++ .../examples/ClusterEngineExampleTest.java | 75 ++++++++ .../examples/ContainerEngineExampleTest.java | 80 ++++++++ .../examples/GracePeriodExampleTest.java | 62 ++++++ .../test/resources/simplelogger.properties | 5 + .../exporter/ExporterIntegrationTest.java | 2 +- .../containers/exporter/TestExporterApi.java | 2 +- .../containers/exporter/DebugExporter.java | 7 +- pom.xml | 28 +++ 29 files changed, 2095 insertions(+), 55 deletions(-) create mode 100644 engine/pom.xml create mode 100644 engine/src/main/java/io/zeebe/containers/engine/ContainerEngine.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/ContainerEngines.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/DebugReceiverStream.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/InfiniteIterator.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/InfiniteList.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/TestAwareContainerEngine.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/ZeebeClusterEngine.java create mode 100644 engine/src/main/java/io/zeebe/containers/engine/ZeebeContainerEngine.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/DebugReceiverStreamTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/InfiniteListTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/examples/ClusterEngineExampleTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/examples/ContainerEngineExampleTest.java create mode 100644 engine/src/test/java/io/zeebe/containers/engine/examples/GracePeriodExampleTest.java create mode 100644 engine/src/test/resources/simplelogger.properties diff --git a/core/pom.xml b/core/pom.xml index 641d727a..bf7b3374 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -107,13 +107,19 @@ jackson-databind - - com.fasterxml.jackson.core jackson-core + + net.jcip + jcip-annotations + provided + + + + org.mockito mockito-core @@ -201,6 +207,7 @@ copy + generate-resources diff --git a/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java b/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java index 0be9a58e..7b07651e 100644 --- a/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java +++ b/core/src/main/java/io/zeebe/containers/ZeebeBrokerNode.java @@ -97,6 +97,8 @@ default T withZeebeData(final ZeebeData data) { @API(status = Status.EXPERIMENTAL) default T withDebugExporter(final int port) { Testcontainers.exposeHostPorts(port); + + //noinspection resource withCopyToContainer( MountableFile.forClasspathResource("debug-exporter.jar"), "/tmp/debug-exporter.jar") .withEnv("ZEEBE_BROKER_EXPORTERS_DEBUG_JARPATH", "/tmp/debug-exporter.jar") diff --git a/core/src/main/java/io/zeebe/containers/cluster/ZeebeCluster.java b/core/src/main/java/io/zeebe/containers/cluster/ZeebeCluster.java index c07354e8..b914d81a 100644 --- a/core/src/main/java/io/zeebe/containers/cluster/ZeebeCluster.java +++ b/core/src/main/java/io/zeebe/containers/cluster/ZeebeCluster.java @@ -21,6 +21,7 @@ import io.zeebe.containers.ZeebeGatewayNode; import io.zeebe.containers.ZeebeNode; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.Stream; @@ -37,7 +38,7 @@ /** * A convenience class representing a one or more containers that form a Zeebe cluster. * - *

It's recommended to use the {@link ZeebeClientBuilder} to build one. + *

It's recommended to use the {@link ZeebeClusterBuilder} to build one. * *

As the cluster is not started automatically, the containers can still be modified/configured * beforehand. Be aware however that the replication factor and the partitions count cannot be @@ -209,6 +210,19 @@ public Map>> getBrokers() return brokers; } + /** + * Returns a map of all nodes in the cluster, where the keys are the member IDs (for brokers, the + * node ID), and the values are the containers. + * + * @return the nodes of this cluster + */ + public Map>> getNodes() { + final Map>> nodes = new HashMap<>(gateways); + brokers.forEach((id, node) -> nodes.put(String.valueOf(id), node)); + + return nodes; + } + /** * Builds a new client builder by picking a random gateway started gateway for it and disabling * transport security. @@ -217,21 +231,31 @@ public Map>> getBrokers() * @throws NoSuchElementException if there are no started gateways */ public ZeebeClientBuilder newClientBuilder() { - final ZeebeGatewayNode gateway = - gateways.values().stream() - .filter(ZeebeNode::isStarted) - .findAny() - .orElseThrow( - () -> - new NoSuchElementException( - "Expected at least one gateway for the client to connect to, but there is" - + " none")); + final ZeebeGatewayNode gateway = getAvailableGateway(); return ZeebeClient.newClientBuilder() .gatewayAddress(gateway.getExternalGatewayAddress()) .usePlaintext(); } + /** + * Returns the first gateway which can accept requests from a Zeebe client. + * + * @return a gateway ready to accept requests + * @throws NoSuchElementException if there are no such gateways (e.g. none are started, or they + * are dead, etc.) + */ + public ZeebeGatewayNode> getAvailableGateway() { + return gateways.values().stream() + .filter(ZeebeNode::isStarted) + .findAny() + .orElseThrow( + () -> + new NoSuchElementException( + "Expected at least one gateway for the client to connect to, but there is" + + " none")); + } + private Stream> getGatewayContainers() { return gateways.values().stream().map(Container::self); } diff --git a/core/src/main/java/io/zeebe/containers/cluster/ZeebeClusterBuilder.java b/core/src/main/java/io/zeebe/containers/cluster/ZeebeClusterBuilder.java index 521afa19..50f64224 100644 --- a/core/src/main/java/io/zeebe/containers/cluster/ZeebeClusterBuilder.java +++ b/core/src/main/java/io/zeebe/containers/cluster/ZeebeClusterBuilder.java @@ -29,9 +29,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apiguardian.api.API; import org.apiguardian.api.API.Status; import org.testcontainers.containers.GenericContainer; @@ -114,8 +114,8 @@ public class ZeebeClusterBuilder { private DockerImageName brokerImageName = getInstance().getDefaultDockerImage(); private Consumer> nodeConfig = cfg -> {}; - private Consumer> brokerConfig = cfg -> {}; - private Consumer> gatewayConfig = cfg -> {}; + private BiConsumer> brokerConfig = (id, cfg) -> {}; + private BiConsumer> gatewayConfig = (memberId, cfg) -> {}; private final Map>> gateways = new HashMap<>(); @@ -305,35 +305,78 @@ public ZeebeClusterBuilder withNodeConfig(final Consumer> nodeCfgFu /** * Sets the configuration function that will be executed in the {@link #build()} method on each - * gateway (including embedded gateways). NOTE: in case of conflicts with {@link #nodeConfig} this - * configuration will override {@link #nodeConfig}. NOTE: in case of conflicts with this - * configuration is an embedded gateway configuration and a broker configuration, broker - * configuration will override this configuration. + * gateway (including embedded gateways). The first argument of is the member ID of the gateway, + * and the second argument is the gateway container itself. + * + *

NOTE: in case of conflicts with {@link #nodeConfig} this configuration will override {@link + * #nodeConfig}. + * + *

NOTE: in case of conflicts with this configuration is an embedded gateway configuration and + * a broker configuration, broker configuration will override this configuration. * * @param gatewayCfgFunction the function that will be applied on all cluster gateways (embedded * ones included) * @return this builder instance for chaining */ public ZeebeClusterBuilder withGatewayConfig( - final Consumer> gatewayCfgFunction) { + final BiConsumer> gatewayCfgFunction) { this.gatewayConfig = gatewayCfgFunction; return this; } /** * Sets the configuration function that will be executed in the {@link #build()} method on each - * broker. NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this + * gateway (including embedded gateways). + * + *

NOTE: in case of conflicts with {@link #nodeConfig} this configuration will override {@link + * #nodeConfig}. + * + *

NOTE: in case of conflicts with this configuration is an embedded gateway configuration and + * a broker configuration, broker configuration will override this configuration. + * + * @param gatewayCfgFunction the function that will be applied on all cluster gateways (embedded + * ones included) + * @return this builder instance for chaining + */ + public ZeebeClusterBuilder withGatewayConfig( + final Consumer> gatewayCfgFunction) { + this.gatewayConfig = (memberId, gateway) -> gatewayCfgFunction.accept(gateway); + return this; + } + + /** + * Sets the configuration function that will be executed in the {@link #build()} method on each + * broker. The first argument is the broker ID, and the second argument is the broker container + * itself. + * + *

NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this * configuration will override them. * * @param brokerCfgFunction the function that will be applied on all cluster brokers * @return this builder instance for chaining */ public ZeebeClusterBuilder withBrokerConfig( - final Consumer> brokerCfgFunction) { + final BiConsumer> brokerCfgFunction) { this.brokerConfig = brokerCfgFunction; return this; } + /** + * Sets the configuration function that will be executed in the {@link #build()} method on each + * broker. + * + *

NOTE: in case of conflicts with {@link #nodeConfig} or {@link #gatewayConfig} this + * configuration will override them. + * + * @param brokerCfgFunction the function that will be applied on all cluster brokers + * @return this builder instance for chaining + */ + public ZeebeClusterBuilder withBrokerConfig( + final Consumer> brokerCfgFunction) { + this.brokerConfig = (id, broker) -> brokerCfgFunction.accept(broker); + return this; + } + /** * Builds a new Zeebe cluster. Will create {@link #brokersCount} brokers (accessible later via * {@link ZeebeCluster#getBrokers()}) and {@link #gatewaysCount} standalone gateways (accessible @@ -365,22 +408,28 @@ public ZeebeCluster build() { // is one createStandaloneGateways(); - Stream.concat(brokers.values().stream(), gateways.values().stream()) - .distinct() - .forEach(this::applyConfigFunctions); + // apply free-form configuration functions + brokers.forEach(this::applyConfigFunctions); + gateways.forEach( + (memberId, gateway) -> { + // skip brokers/embedded gateways + if (!(gateway instanceof ZeebeBrokerNode)) { + applyConfigFunctions(memberId, gateway); + } + }); return new ZeebeCluster(network, name, gateways, brokers, replicationFactor, partitionsCount); } - private void applyConfigFunctions(final ZeebeNode node) { + private void applyConfigFunctions(final Object id, final ZeebeNode node) { nodeConfig.accept(node); if (node instanceof ZeebeGatewayNode) { - gatewayConfig.accept((ZeebeGatewayNode) node); + gatewayConfig.accept(String.valueOf(id), (ZeebeGatewayNode) node); } if (node instanceof ZeebeBrokerNode) { - brokerConfig.accept((ZeebeBrokerNode) node); + brokerConfig.accept((Integer) id, (ZeebeBrokerNode) node); } } @@ -437,6 +486,7 @@ private void createStandaloneGateways() { final ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = 0; i < gatewaysCount; i++) { final String memberId = GATEWAY_NETWORK_ALIAS_PREFIX + i; + //noinspection resource final ZeebeGatewayContainer gateway = createStandaloneGateway(memberId); gateway.withStartupTimeout(Duration.ofMinutes((long) gatewaysCount + brokersCount)); diff --git a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java index ba9dfdf7..dbd39583 100644 --- a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java +++ b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import net.jcip.annotations.ThreadSafe; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.impl.HttpProcessors; @@ -44,6 +45,7 @@ *

See {@link RecordHandler} for documentation about the /records endpoint. */ @API(status = Status.EXPERIMENTAL) +@ThreadSafe public final class DebugReceiver implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DebugReceiver.class); diff --git a/core/src/test/java/io/zeebe/containers/cluster/ZeebeClusterBuilderTest.java b/core/src/test/java/io/zeebe/containers/cluster/ZeebeClusterBuilderTest.java index 0759c336..43a331c3 100644 --- a/core/src/test/java/io/zeebe/containers/cluster/ZeebeClusterBuilderTest.java +++ b/core/src/test/java/io/zeebe/containers/cluster/ZeebeClusterBuilderTest.java @@ -25,13 +25,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.assertj.core.api.Condition; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; +@SuppressWarnings("resource") final class ZeebeClusterBuilderTest { @Test void shouldThrowIllegalArgumentIfBrokersCountIsNegative() { @@ -517,7 +517,7 @@ void shouldApplyBrokerConfigurationOnlyOnBrokers() { final String foreseeEnv = "IS_CONFIGURED_BY_BROKER_FUNCTION"; final ZeebeClusterBuilder builder = new ZeebeClusterBuilder() - .withBrokerConfig(broker -> broker.addEnv(foreseeEnv, "")) + .withBrokerConfig((id, broker) -> broker.addEnv(foreseeEnv, String.valueOf(id))) .withBrokersCount(1) .withGatewaysCount(1) .withEmbeddedGateway(false); @@ -528,15 +528,15 @@ void shouldApplyBrokerConfigurationOnlyOnBrokers() { // then assertThat(cluster.getBrokers()) .allSatisfy( - (integer, zeebeBrokerNode) -> + (id, zeebeBrokerNode) -> assertThat(zeebeBrokerNode.getEnvMap()) .as( "Broker node: %s must have %s environment variable", zeebeBrokerNode, foreseeEnv) - .containsKey(foreseeEnv)); + .containsEntry(foreseeEnv, String.valueOf(id))); assertThat(cluster.getGateways()) .allSatisfy( - (s, zeebeGatewayNode) -> + (memberId, zeebeGatewayNode) -> assertThat(zeebeGatewayNode.getEnvMap()) .as( "Gateway node: %s must not have %s environment variable", @@ -583,7 +583,7 @@ void shouldApplyGatewayConfigurationOnEmbeddedGateways() { final String foreseeEnv = "IS_CONFIGURED_BY_GATEWAY_FUNCTION"; final ZeebeClusterBuilder builder = new ZeebeClusterBuilder() - .withGatewayConfig(gateway -> gateway.addEnv(foreseeEnv, "")) + .withGatewayConfig((memberId, gateway) -> gateway.addEnv(foreseeEnv, memberId)) .withBrokersCount(1) .withGatewaysCount(1) .withEmbeddedGateway(true); @@ -594,20 +594,20 @@ void shouldApplyGatewayConfigurationOnEmbeddedGateways() { // then assertThat(cluster.getBrokers()) .allSatisfy( - (integer, zeebeBrokerNode) -> + (id, zeebeBrokerNode) -> assertThat(zeebeBrokerNode.getEnvMap()) .as( "Broker node: %s must have %s environment variable", zeebeBrokerNode, foreseeEnv) - .containsKey(foreseeEnv)); + .containsEntry(foreseeEnv, String.valueOf(id))); assertThat(cluster.getGateways()) .allSatisfy( - (s, zeebeGatewayNode) -> + (memberId, zeebeGatewayNode) -> assertThat(zeebeGatewayNode.getEnvMap()) .as( "Gateway node: %s must have %s environment variable", zeebeGatewayNode, foreseeEnv) - .containsKey(foreseeEnv)); + .containsEntry(foreseeEnv, memberId)); } @Test @@ -616,7 +616,7 @@ void shouldApplyGatewayConfigurationOnlyOnGateways() { final String foreseeEnv = "IS_CONFIGURED_BY_GATEWAY_FUNCTION"; final ZeebeClusterBuilder builder = new ZeebeClusterBuilder() - .withGatewayConfig(gateway -> gateway.addEnv(foreseeEnv, "")) + .withGatewayConfig((memberId, gateway) -> gateway.addEnv(foreseeEnv, memberId)) .withBrokersCount(1) .withGatewaysCount(1) .withEmbeddedGateway(false); @@ -627,7 +627,7 @@ void shouldApplyGatewayConfigurationOnlyOnGateways() { // then assertThat(cluster.getBrokers()) .allSatisfy( - (integer, zeebeBrokerNode) -> + (id, zeebeBrokerNode) -> assertThat(zeebeBrokerNode.getEnvMap()) .as( "Broker node: %s must not have %s environment variable", @@ -635,12 +635,12 @@ void shouldApplyGatewayConfigurationOnlyOnGateways() { .doesNotContainKey(foreseeEnv)); assertThat(cluster.getGateways()) .allSatisfy( - (s, zeebeGatewayNode) -> + (memberId, zeebeGatewayNode) -> assertThat(zeebeGatewayNode.getEnvMap()) .as( "Gateway node: %s must have %s environment variable", zeebeGatewayNode, foreseeEnv) - .containsKey(foreseeEnv)); + .containsEntry(foreseeEnv, memberId)); } @Test @@ -820,12 +820,6 @@ void shouldSetImageNameForGatewaysAndBrokers() { gatewayEntry -> verifyZeebeHasImageName(gatewayEntry.getValue(), zeebeDockerImage)); } - private Condition>> zeebeImageHasImageName( - final String imageName) { - return new Condition<>( - node -> node.getDockerImageName().equals(imageName), "Image Name Condition"); - } - private void verifyZeebeHasImageName( final ZeebeNode> zeebe, final String imageName) { assertThat(zeebe.getDockerImageName()).isEqualTo(imageName); diff --git a/core/src/test/java/io/zeebe/containers/util/TopologyAssert.java b/core/src/test/java/io/zeebe/containers/util/TopologyAssert.java index 18d6fbd7..3af6f0b0 100644 --- a/core/src/test/java/io/zeebe/containers/util/TopologyAssert.java +++ b/core/src/test/java/io/zeebe/containers/util/TopologyAssert.java @@ -80,7 +80,8 @@ public TopologyAssert hasClusterSize(final int clusterSize) { isNotNull(); if (actual.getClusterSize() != clusterSize) { - throw failure("Expected cluster size to be <%d> but was <%d>", clusterSize, clusterSize); + throw failure( + "Expected cluster size to be <%d> but was <%d>", clusterSize, actual.getClusterSize()); } return myself; diff --git a/engine/pom.xml b/engine/pom.xml new file mode 100644 index 00000000..f514f41d --- /dev/null +++ b/engine/pom.xml @@ -0,0 +1,133 @@ + + + 4.0.0 + + + io.zeebe + zeebe-test-container-root + 3.4.1-SNAPSHOT + ../pom.xml + + + zeebe-test-container-engine + jar + Zeebe Test Container Process Test Engine + + + + 1.8 + + + -Xdoclint:none + + + + + io.zeebe + zeebe-test-container + + + + io.camunda + zeebe-process-test-api + + + + io.camunda + zeebe-process-test-assertions + + + + org.slf4j + slf4j-api + + + + org.apiguardian + apiguardian-api + + + + net.jcip + jcip-annotations + provided + + + + io.camunda + zeebe-client-java + + + + io.camunda + zeebe-protocol + + + + org.agrona + agrona + + + + com.fasterxml.jackson.core + jackson-annotations + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-core + + + + org.junit.jupiter + junit-jupiter-api + + + + + + org.mockito + mockito-core + test + + + + org.junit.jupiter + junit-jupiter-params + test + + + + org.testcontainers + junit-jupiter + test + + + + org.assertj + assertj-core + test + + + + org.awaitility + awaitility + test + + + + + + + org.revapi + revapi-maven-plugin + ${plugin.version.revapi} + + + + diff --git a/engine/src/main/java/io/zeebe/containers/engine/ContainerEngine.java b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngine.java new file mode 100644 index 00000000..9d90e2b0 --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngine.java @@ -0,0 +1,176 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.assertions.ProcessInstanceAssert; +import io.zeebe.containers.ZeebeBrokerNode; +import io.zeebe.containers.ZeebeGatewayNode; +import io.zeebe.containers.cluster.ZeebeCluster; +import java.time.Duration; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.lifecycle.Startable; + +/** + * A {@link ContainerEngine} is a {@link ZeebeTestEngine} implementation which wraps a container or + * a set of containers. + * + *

You can use the provided {@link Builder} interface (via {@link #builder()} to build a custom + * engine. + */ +@API(status = Status.EXPERIMENTAL) +public interface ContainerEngine extends Startable, ZeebeTestEngine { + + /** + * Marks all records with a position less than {@code position} on partition with ID {@code + * partitionId} as acknowledged, meaning they can now be deleted from Zeebe. + * + *

Note that this is not a synchronous operation, but instead will take effect when the next + * record is exported. See {@link io.zeebe.containers.exporter.DebugReceiver#acknowledge(int, + * long)} for more. + * + * @param partitionId the ID of the partition on which to acknowledge + * @param position the position up to which they should be acknowledged + */ + void acknowledge(final int partitionId, final long position); + + /** + * Returns a default builder. Calling {@link Builder#build()} on a fresh builder will return a + * builder wrapping a default {@link io.zeebe.containers.ZeebeContainer}, with an idle period of 1 + * second. + * + * @return a new {@link Builder} instance + */ + static Builder builder() { + return new ContainerEngineBuilder(); + } + + /** + * Creates a {@link ContainerEngine} with a default {@link io.zeebe.containers.ZeebeContainer}, an + * idle period of 1 second, a grace period of 0, and auto acknowledging all records. + * + * @return a new, default container engine + */ + static ContainerEngine createDefault() { + return builder().build(); + } + + /** + * A helper class to build {@link ContainerEngine} instances. A fresh, non-configured builder will + * always return one which has an idle period of 1 second, and uses a default {@link + * io.zeebe.containers.ZeebeContainer} as its gateway and broker. + * + *

The builder can wrap either a {@link GenericContainer} which implements {@link + * ZeebeGatewayNode} and {@link ZeebeBrokerNode}, or a {@link ZeebeCluster}, but not both. Setting + * one will overwrite and nullify any previous assignment to either. + * + *

The default idle period is 1 second, and the default grace period is 0. + */ + interface Builder { + + /** + * Sets the given container to be used as both gateway and broker. Will set any assigned cluster + * (via {@link #withCluster(ZeebeCluster)} to null. + * + * @param container the container to use as a gateway and broker + * @return itself for chaining + * @param the concrete type of the container, e.g. {@link + * io.zeebe.containers.ZeebeContainer} + */ + & ZeebeGatewayNode & ZeebeBrokerNode> + Builder withContainer(final T container); + + /** + * Sets the given cluster to be used as engine(s)/gateway(s). + * + *

When using a cluster, calls to {@link ZeebeTestEngine#increaseTime(Duration)} will + * increase the time on all nodes of the cluster. Additionally, calls to {@link + * ZeebeTestEngine#getGatewayAddress()} will return the address of a random, available gateway, + * and thus may not always return the same value if there are multiple gateways. Finally, calls + * to {@link ZeebeTestEngine#createClient()} will create a client pointing to a random available + * gateway. If that gateway shuts down (gracefully or not), the client will not know how to + * reconnect, and a new client must be obtained. + * + * @param cluster the cluster to wrap + * @return itself for chaining + */ + Builder withCluster(final ZeebeCluster cluster); + + /** + * Sets the idle period of the engine, used when calling {@link + * ZeebeTestEngine#waitForIdleState(Duration)}. In a {@link ContainerEngine}, we define the + * engine to be idle if no records have been exported during the idle period. While this is not + * extremely accurate (there could be some issue with the engine, after all), it's the best we + * can do from the outside. + * + *

By default, the idle period is 1 second. + * + * @param idlePeriod how long no records must have been exported for the engine to be considered + * idle + * @return itself for chaining + */ + Builder withIdlePeriod(final Duration idlePeriod); + + /** + * Sets the grace period to use when reaching the end of the underlying {@link + * io.camunda.zeebe.process.test.api.RecordStreamSource}. + * + *

When a positive grace period is configured, upon reaching the end of the stream, any + * assertion will wait for new records until the grace period is expired. If a record is + * appended to the stream during the grace period, the period is reset. + * + *

What this means concretely is, if you call something like {@link + * ProcessInstanceAssert#isCompleted()} immediately after starting a process instance, it will + * block and wait up to the grace period for new records to be processed/emitted before + * returning. + * + *

While not required, setting this can be a useful alternative to calling {@link + * ZeebeTestEngine#waitForIdleState(Duration)}. + * + *

NOTE: one of the pitfalls with this method however is that certain assertions, notably + * those7 which check for the absence of something, will typically block for the + * complete duration * of the grace period, thus slowing down your tests. + * + * @param gracePeriod the grace period to use when reaching the end of the record stream + * @return itself for chaining + */ + Builder withGracePeriod(final Duration gracePeriod); + + /** + * Sets whether records should be automatically acknowledged as they are exported by the broker. + * If true, then as soon as a record is received, it will be eligible for deletion in Zeebe. If + * false, then records must be explicitly acknowledged by the user via {@link #acknowledge(int, + * long)}. + * + *

By default, this is true. + * + * @param acknowledge whether to automatically acknowledge exported records or not + * @return itself for chaining + */ + Builder withAutoAcknowledge(final boolean acknowledge); + + /** + * Builds a {@link ContainerEngine} based on the configuration. If nothing else was called, will + * build an engine using a default {@link io.zeebe.containers.ZeebeContainer}, an idle period of + * 1 second, and a grace period of 0. + * + * @return a new, stopped container engine + */ + ContainerEngine build(); + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java new file mode 100644 index 00000000..a5485453 --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java @@ -0,0 +1,135 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.protocol.record.Record; +import io.zeebe.containers.ZeebeBrokerNode; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.ZeebeGatewayNode; +import io.zeebe.containers.cluster.ZeebeCluster; +import io.zeebe.containers.engine.ContainerEngine.Builder; +import io.zeebe.containers.exporter.DebugReceiver; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; + +@API(status = Status.INTERNAL) +final class ContainerEngineBuilder implements Builder { + private static final Duration DEFAULT_IDLE_PERIOD = Duration.ofSeconds(1); + private static final Duration DEFAULT_GRACE_PERIOD = Duration.ZERO; + private static final Logger LOGGER = LoggerFactory.getLogger(ContainerEngineBuilder.class); + private static final Duration MINIMUM_IDLE_PERIOD = Duration.ofMillis(100); + private Holder> container; + private ZeebeCluster cluster; + private Duration idlePeriod; + private Duration gracePeriod; + private boolean autoAcknowledge; + + @Override + public & ZeebeGatewayNode & ZeebeBrokerNode> + ContainerEngineBuilder withContainer(final T container) { + if (cluster != null) { + LOGGER.warn("Setting a container will overwrite the previously assigned cluster"); + cluster = null; + } + + this.container = new Holder<>(Objects.requireNonNull(container, "must specify a container")); + return this; + } + + @Override + public ContainerEngineBuilder withCluster(final ZeebeCluster cluster) { + if (container != null) { + LOGGER.warn("Setting a cluster will overwrite the previously assigned container"); + container = null; + } + + this.cluster = Objects.requireNonNull(cluster, "must specify a cluster"); + return this; + } + + @Override + public ContainerEngineBuilder withIdlePeriod(final Duration idlePeriod) { + Objects.requireNonNull(idlePeriod, "must specify an idle period"); + if (idlePeriod.compareTo(MINIMUM_IDLE_PERIOD) < 0) { + throw new IllegalArgumentException( + String.format( + "Cannot assign idle period [%s] less than the minimum [%s]", + idlePeriod, MINIMUM_IDLE_PERIOD)); + } + + this.idlePeriod = idlePeriod; + return this; + } + + @Override + public ContainerEngineBuilder withGracePeriod(final Duration gracePeriod) { + Objects.requireNonNull(gracePeriod, "must specify an grace period"); + if (gracePeriod.isNegative()) { + LOGGER.warn("Cannot assign negative grace period {}; will default to 0", gracePeriod); + this.gracePeriod = DEFAULT_GRACE_PERIOD; + } else { + this.gracePeriod = gracePeriod; + } + + return this; + } + + @Override + public Builder withAutoAcknowledge(final boolean autoAcknowledge) { + this.autoAcknowledge = autoAcknowledge; + return this; + } + + @SuppressWarnings("unchecked") + public ContainerEngine build() { + final Duration listGracePeriod = Optional.ofNullable(gracePeriod).orElse(DEFAULT_GRACE_PERIOD); + final Duration receiveIdlePeriod = Optional.ofNullable(idlePeriod).orElse(DEFAULT_IDLE_PERIOD); + final InfiniteList> records = new InfiniteList<>(listGracePeriod); + final DebugReceiverStream recordStream = + new DebugReceiverStream( + records, new DebugReceiver(records::add, autoAcknowledge), receiveIdlePeriod); + + try { + if (container != null) { + return new ZeebeContainerEngine(container.container, recordStream); + } + + if (cluster != null) { + return new ZeebeClusterEngine(cluster, recordStream); + } + + return new ZeebeContainerEngine<>(new ZeebeContainer(), recordStream); + } catch (final Exception e) { + recordStream.close(); + throw e; + } + } + + private static final class Holder< + T extends GenericContainer & ZeebeGatewayNode & ZeebeBrokerNode> { + private final T container; + + private Holder(final T container) { + this.container = container; + } + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/ContainerEngines.java b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngines.java new file mode 100644 index 00000000..c6912825 --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngines.java @@ -0,0 +1,151 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.protocol.record.Record; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.cluster.ZeebeCluster; +import java.time.Duration; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; + +/** + * Collection of factories which create {@link ContainerEngine} instances based on either containers + * ({@link ZeebeContainer}) or clusters ({@link ZeebeCluster}). + * + *

In each case, you can configure a grace period for which to wait for events. This may help you + * deal with asynchronicity in your tests in a more succinct way. For example, instead of wrapping + * an assertion in an `Awaitility` block, you could configure a grace period of 10 seconds and use + * {@link io.camunda.zeebe.process.test.assertions.BpmnAssert} as you normally would. When the + * assertion reaches the end of its stream, it will wait up to the grace period for new events + * before giving up. This means, in our example, it would wait up to 10 seconds for new events. Once + * a new event comes in during the grace period, that period is reset. + * + *

There are some differences with the normal {@link ZeebeTestEngine} implementations from the + * zeebe-process-test project, notably in how we wait for idle/busy states. As there are no external + * ways to do so with a real Zeebe engine, we define these states as: + * + *

    + *
  • idle: no records were exported for at least 1 second since the call + *
  • busy: a new record was exported within the timeout since the call + *
+ */ +@API(status = Status.EXPERIMENTAL) +public final class ContainerEngines { + private ContainerEngines() {} + + /** + * Returns a container engine pointing to a default {@link ZeebeContainer}, with a non-delayed + * record stream. + * + *

If you wish to customize the container, consider declaring it separately and using {@link + * #of(ZeebeContainer)}. + * + * @return a single-container engine + */ + public static ContainerEngine of() { + return of(Duration.ZERO); + } + + /** + * Returns a container engine pointing to a default {@link ZeebeContainer}. When the underlying + * record stream reaches the end, it will wait up to at most {@code timeout} duration for new + * events. If a new event comes in within that grace period, the timeout is reset. + * + *

You can use this to succinctly deal with asynchronicity in your tests, instead of wrapping + * with Awaitility. + * + *

If you wish to customize the container, consider declaring it separately and using {@link + * #of(Duration, ZeebeContainer)}. + * + * @param timeout the grace period to wait for new exported events + * @return a single-container engine with a grace period + */ + public static ContainerEngine of(final Duration timeout) { + return of(timeout, new ZeebeContainer()); + } + + /** + * Returns a container engine pointing to the provided {@link ZeebeContainer}, with a non-delayed + * record stream. + * + * @return a single-container engine + */ + public static ContainerEngine of(final ZeebeContainer container) { + return of(Duration.ZERO, container); + } + + /** + * Returns a container engine pointing to a configured {@link ZeebeContainer}. When the underlying + * record stream reaches the end, it will wait up to at most {@code timeout} duration for new + * events. If a new event comes in within that grace period, the timeout is reset. + * + *

You can use this to succinctly deal with asynchronicity in your tests, instead of wrapping + * with Awaitility. + * + * @param timeout the grace period to wait for new exported events + * @return a single-container engine with a grace period + */ + @SuppressWarnings("java:S2095") + public static ContainerEngine of(final Duration timeout, final ZeebeContainer container) { + final InfiniteList> records = new InfiniteList<>(timeout); + return new ZeebeContainerEngine(container, new DebugReceiverStream(records)); + } + + /** + * Returns a container engine pointing to a configured {@link ZeebeCluster}, with a non-delayed + * record stream. + * + *

Records will be streamed from all partition leaders at the same time, so you can run + * assertions across your whole cluster easily. + * + *

When manipulating the clock through the {@link ZeebeTestEngine} interface, it will + * manipulate the clock of all brokers at the same time. + * + *

When obtaining a gateway, it will return a random available healthy gateway. + * + * @return a single-container engine + */ + public static ContainerEngine of(final ZeebeCluster cluster) { + return of(Duration.ZERO, cluster); + } + + /** + * Returns a container engine pointing to a configured {@link ZeebeCluster}. When the underlying + * record stream reaches the end, it will wait up to at most {@code timeout} duration for new + * events. If a new event comes in within that grace period, the timeout is reset. + * + *

You can use this to succinctly deal with asynchronicity in your tests, instead of wrapping + * with Awaitility. + * + *

Records will be streamed from all partition leaders at the same time, so you can run + * assertions across your whole cluster easily. + * + *

When manipulating the clock through the {@link ZeebeTestEngine} interface, it will + * manipulate the clock of all brokers at the same time. + * + *

When obtaining a gateway, it will return a random available healthy gateway. + * + * @param timeout the grace period to wait for new exported events + * @return a single-container engine with a grace period + */ + @SuppressWarnings("java:S2095") + public static ContainerEngine of(final Duration timeout, final ZeebeCluster cluster) { + final InfiniteList> records = new InfiniteList<>(timeout); + return new ZeebeClusterEngine(cluster, new DebugReceiverStream(records)); + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/DebugReceiverStream.java b/engine/src/main/java/io/zeebe/containers/engine/DebugReceiverStream.java new file mode 100644 index 00000000..f874e43a --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/DebugReceiverStream.java @@ -0,0 +1,163 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.process.test.api.RecordStreamSource; +import io.camunda.zeebe.protocol.record.Record; +import io.zeebe.containers.ZeebeBrokerNode; +import io.zeebe.containers.exporter.DebugReceiver; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; +import net.jcip.annotations.ThreadSafe; +import org.agrona.collections.MutableInteger; +import org.agrona.collections.MutableLong; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; + +/** + * Implementation of {@link RecordStreamSource} which wraps an infinite list. It will manage an + * instance of a {@link DebugReceiver} which will add records to the infinite list, and use that + * list as the record stream. + * + *

In order to wait for idle and busy states, we currently use two hacks: + * + *

    + *
  • idle: idle means no records were exported for 1 second + *
  • busy: busy means a new record is exported after the call + *
+ */ +@API(status = Status.INTERNAL) +@ThreadSafe +final class DebugReceiverStream implements RecordStreamSource, AutoCloseable { + private final InfiniteList> records; + private final DebugReceiver receiver; + private final Duration idlePeriod; + + DebugReceiverStream(final InfiniteList> records) { + this(records, new DebugReceiver(records::add)); + } + + DebugReceiverStream(final InfiniteList> records, final Duration idlePeriod) { + this(records, new DebugReceiver(records::add), idlePeriod); + } + + DebugReceiverStream(final InfiniteList> records, final DebugReceiver receiver) { + this(records, receiver, Duration.ofSeconds(1)); + } + + DebugReceiverStream( + final InfiniteList> records, + final DebugReceiver receiver, + final Duration idlePeriod) { + this.records = records; + this.receiver = receiver; + this.idlePeriod = idlePeriod; + } + + void start(final Collection> brokers) { + receiver.start(); + + final int port = receiver.serverAddress().getPort(); + brokers.forEach(broker -> broker.withDebugExporter(port)); + } + + void stop() { + receiver.stop(); + } + + void acknowledge(final int partitionId, final long position) { + receiver.acknowledge(partitionId, position); + } + + @Override + public void close() { + stop(); + } + + @Override + public Iterable> getRecords() { + return records; + } + + void waitForIdleState(final Duration timeout) throws InterruptedException, TimeoutException { + final MutableInteger recordsCount = new MutableInteger(records.size()); + final MutableLong lastInvoked = new MutableLong(System.nanoTime()); + final MutableLong conditionHeld = new MutableLong(0L); + final Duration pollingInterval = Duration.ofMillis(100); + final long mustHold = idlePeriod.toNanos(); + + awaitConditionHolds( + timeout, + pollingInterval, + "until no records are exported for 1 second", + () -> { + final int count = recordsCount.get(); + recordsCount.set(records.size()); + + final long currentNano = System.nanoTime(); + final long timeElapsed = currentNano - lastInvoked.get(); + lastInvoked.set(currentNano); + + if (count == recordsCount.get()) { + conditionHeld.set(conditionHeld.get() + timeElapsed); + } else { + conditionHeld.set(0L); + } + + return conditionHeld.get() >= mustHold; + }); + } + + void waitForBusyState(final Duration timeout) throws InterruptedException, TimeoutException { + final MutableInteger recordsCount = new MutableInteger(records.size()); + awaitConditionHolds( + timeout, + Duration.ofMillis(100), + "until a record is exported", + () -> { + final int count = recordsCount.get(); + recordsCount.set(records.size()); + return count != recordsCount.get(); + }); + } + + private void awaitConditionHolds( + final Duration timeout, + final Duration pollInterval, + final String description, + final BooleanSupplier condition) + throws TimeoutException, InterruptedException { + final Thread current = Thread.currentThread(); + + final long timeoutNs = System.nanoTime() + timeout.toNanos(); + while (!current.isInterrupted() && System.nanoTime() < timeoutNs) { + if (condition.getAsBoolean()) { + break; + } + + // we do want to busy wait here + //noinspection BusyWait + Thread.sleep(pollInterval.toMillis()); + } + + final boolean timedOut = System.nanoTime() >= timeoutNs; + if (timedOut) { + throw new TimeoutException("Timed out waiting " + description); + } + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/InfiniteIterator.java b/engine/src/main/java/io/zeebe/containers/engine/InfiniteIterator.java new file mode 100644 index 00000000..2637a20a --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/InfiniteIterator.java @@ -0,0 +1,95 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import net.jcip.annotations.ThreadSafe; +import org.agrona.LangUtil; + +/** + * An {@link Iterator} implementation which tails a list, meaning it will see items added after the + * iterator was created, but before it reached the end of the list. It can optionally be configured + * to have a grace period to wait for new items once it reaches its end. If a new item comes in at + * that point, the grace period is reset. + * + * @param the type of the items iterated on + */ +@ThreadSafe +final class InfiniteIterator implements Iterator { + private final List items; + private final Duration timeout; + private int cursor = 0; + + /** + * Creates a new iterator which will tail the given collection, optionally waiting for new items + * if it reaches the end. + * + *

To disable waiting at the end, simply pass a negative or zero duration. + * + * @param items the items to tail + * @param timeout the optional timeout to wait for when reaching the end of the list + */ + InfiniteIterator(final List items, final Duration timeout) { + this.items = items; + this.timeout = timeout; + } + + @Override + public boolean hasNext() { + synchronized (items) { + try { + Duration currentTimeout = timeout; + + while (hasTimeRemaining(currentTimeout) && cursor >= items.size()) { + final long currentNanos = System.nanoTime(); + items.wait(currentTimeout.toMillis(), currentTimeout.getNano()); + currentTimeout = Duration.of(currentNanos - System.nanoTime(), ChronoUnit.NANOS); + } + + return cursor < items.size(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LangUtil.rethrowUnchecked(e); + } + } + + return false; + } + + @Override + public T next() { + final T item; + + synchronized (items) { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + item = items.get(cursor); + cursor++; + } + + return item; + } + + private boolean hasTimeRemaining(final Duration duration) { + return !duration.isNegative() && !duration.isZero(); + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/InfiniteList.java b/engine/src/main/java/io/zeebe/containers/engine/InfiniteList.java new file mode 100644 index 00000000..1a8890c6 --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/InfiniteList.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import net.jcip.annotations.ThreadSafe; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; + +/** + * A thread safe wrapper around a list which will produce {@link InfiniteIterator} instances for + * said list. + * + * @param the type of the list entries + */ +@ThreadSafe +@API(status = Status.INTERNAL) +final class InfiniteList implements Iterable { + private final Duration timeout; + private final List items; + + InfiniteList() { + this(Duration.ZERO); + } + + InfiniteList(final Duration timeout) { + this.timeout = timeout; + items = new LinkedList<>(); + } + + @Override + public Iterator iterator() { + return new InfiniteIterator<>(items, timeout); + } + + int size() { + return items.size(); + } + + void add(final T item) { + synchronized (items) { + items.add(item); + items.notifyAll(); + } + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/TestAwareContainerEngine.java b/engine/src/main/java/io/zeebe/containers/engine/TestAwareContainerEngine.java new file mode 100644 index 00000000..fcccb91b --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/TestAwareContainerEngine.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.process.test.assertions.BpmnAssert; +import io.camunda.zeebe.process.test.filters.RecordStream; +import java.util.Optional; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; +import org.testcontainers.lifecycle.TestDescription; +import org.testcontainers.lifecycle.TestLifecycleAware; + +/** + * Extend {@link ContainerEngine} to be aware of the test lifecycle in order to initialize {@link + * BpmnAssert} and print out the record stream on error. + * + *

The Testcontainers extension uses the {@link TestLifecycleAware} marker annotation and will + * call the methods accordingly. + */ +@API(status = Status.INTERNAL) +interface TestAwareContainerEngine extends ContainerEngine, TestLifecycleAware { + @Override + default void beforeTest(final TestDescription description) { + BpmnAssert.initRecordStream(RecordStream.of(getRecordStreamSource())); + } + + @Override + default void afterTest(final TestDescription description, final Optional throwable) { + if (throwable.isPresent()) { + RecordStream.of(getRecordStreamSource()).print(true); + } + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/ZeebeClusterEngine.java b/engine/src/main/java/io/zeebe/containers/engine/ZeebeClusterEngine.java new file mode 100644 index 00000000..5664252b --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/ZeebeClusterEngine.java @@ -0,0 +1,109 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.process.test.api.RecordStreamSource; +import io.zeebe.containers.ZeebeNode; +import io.zeebe.containers.clock.ZeebeClock; +import io.zeebe.containers.cluster.ZeebeCluster; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.agrona.CloseHelper; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; + +/** + * A {@link ContainerEngine} implementation which wraps a {@link ZeebeCluster}. Records are streamed + * from all brokers/partitions to a single underlying receiver. + * + *

Manipulating the time will update the clock on all nodes more or less at the same time. + */ +@API(status = Status.INTERNAL) +final class ZeebeClusterEngine implements TestAwareContainerEngine { + private final List clients = new ArrayList<>(); + private final DebugReceiverStream recordStream; + private final ZeebeCluster cluster; + private final Collection clocks; + + public ZeebeClusterEngine(final ZeebeCluster cluster, final DebugReceiverStream recordStream) { + this.cluster = cluster; + this.recordStream = recordStream; + + clocks = new ArrayList<>(); + for (final ZeebeNode node : cluster.getNodes().values()) { + node.withEnv("ZEEBE_CLOCK_CONTROLLED", "true"); + clocks.add(ZeebeClock.newDefaultClock(node)); + } + } + + @Override + public void acknowledge(final int partitionId, final long position) { + recordStream.acknowledge(partitionId, position); + } + + @Override + public RecordStreamSource getRecordStreamSource() { + return recordStream; + } + + @Override + public ZeebeClient createClient() { + final ZeebeClient client = cluster.newClientBuilder().build(); + clients.add(client); + + return client; + } + + @Override + public String getGatewayAddress() { + return cluster.getAvailableGateway().getExternalGatewayAddress(); + } + + @Override + public void increaseTime(final Duration timeToAdd) { + clocks.forEach(clock -> clock.addTime(timeToAdd)); + } + + @Override + public void waitForIdleState(final Duration timeout) + throws InterruptedException, TimeoutException { + recordStream.waitForIdleState(timeout); + } + + @Override + public void waitForBusyState(final Duration timeout) + throws InterruptedException, TimeoutException { + recordStream.waitForBusyState(timeout); + } + + @Override + public void start() { + recordStream.start(cluster.getBrokers().values()); + cluster.start(); + } + + @Override + public void stop() { + CloseHelper.closeAll(clients); + clients.clear(); + + CloseHelper.closeAll(cluster, recordStream); + } +} diff --git a/engine/src/main/java/io/zeebe/containers/engine/ZeebeContainerEngine.java b/engine/src/main/java/io/zeebe/containers/engine/ZeebeContainerEngine.java new file mode 100644 index 00000000..9f9e0df1 --- /dev/null +++ b/engine/src/main/java/io/zeebe/containers/engine/ZeebeContainerEngine.java @@ -0,0 +1,108 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.process.test.api.RecordStreamSource; +import io.zeebe.containers.ZeebeBrokerNode; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.ZeebeGatewayNode; +import io.zeebe.containers.clock.ZeebeClock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.agrona.CloseHelper; +import org.apiguardian.api.API; +import org.apiguardian.api.API.Status; +import org.testcontainers.containers.GenericContainer; + +/** + * A {@link ContainerEngine} which wraps a single {@link ZeebeContainer}, which is both a gateway + * and a broker. + */ +@API(status = Status.INTERNAL) +final class ZeebeContainerEngine< + T extends GenericContainer & ZeebeGatewayNode & ZeebeBrokerNode> + implements TestAwareContainerEngine { + private final List clients = new ArrayList<>(); + private final DebugReceiverStream recordStream; + private final T container; + private final ZeebeClock clock; + + ZeebeContainerEngine(final T container, final DebugReceiverStream recordStream) { + this.container = container.withEnv("ZEEBE_CLOCK_CONTROLLED", "true"); + this.recordStream = recordStream; + + clock = ZeebeClock.newDefaultClock(container); + } + + @Override + public void acknowledge(final int partitionId, final long position) { + recordStream.acknowledge(partitionId, position); + } + + @Override + public RecordStreamSource getRecordStreamSource() { + return recordStream; + } + + @Override + public ZeebeClient createClient() { + final ZeebeClient client = + ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress(getGatewayAddress()).build(); + clients.add(client); + + return client; + } + + @Override + public String getGatewayAddress() { + return container.getExternalGatewayAddress(); + } + + @Override + public void increaseTime(final Duration timeToAdd) { + clock.addTime(timeToAdd); + } + + @Override + public void waitForIdleState(final Duration timeout) + throws InterruptedException, TimeoutException { + recordStream.waitForIdleState(timeout); + } + + @Override + public void waitForBusyState(final Duration timeout) + throws InterruptedException, TimeoutException { + recordStream.waitForBusyState(timeout); + } + + @Override + public void start() { + recordStream.start(Collections.singleton(container)); + container.start(); + } + + @Override + public void stop() { + CloseHelper.closeAll(clients); + clients.clear(); + + CloseHelper.closeAll(container, recordStream); + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/DebugReceiverStreamTest.java b/engine/src/test/java/io/zeebe/containers/engine/DebugReceiverStreamTest.java new file mode 100644 index 00000000..1b8a1c8d --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/DebugReceiverStreamTest.java @@ -0,0 +1,159 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import io.camunda.zeebe.protocol.record.ImmutableRecord; +import io.camunda.zeebe.protocol.record.Record; +import io.zeebe.containers.ZeebeBrokerContainer; +import io.zeebe.containers.ZeebeBrokerNode; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.exporter.DebugReceiver; +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +final class DebugReceiverStreamTest { + private final InfiniteList> records = new InfiniteList<>(); + private final DebugReceiver receiver = new DebugReceiver(records::add); + + @Test + void shouldConfigureBrokersOnStart() { + // given + final Collection> brokers = + Arrays.asList(new ZeebeBrokerContainer(), new ZeebeContainer()); + + // when + final int port; + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + stream.start(brokers); + port = receiver.serverAddress().getPort(); + } + + // then - we don't want to know too much about how the exporter is configured, so it should be + // sufficient to only + final String expectedExporterUrl = "http://host.testcontainers.internal:" + port + "/records"; + assertThat(brokers) + .allSatisfy( + broker -> + assertThat(broker.getEnvMap()) + .containsEntry("ZEEBE_BROKER_EXPORTERS_DEBUG_ARGS_URL", expectedExporterUrl)); + } + + @Test + void shouldStartReceiverOnStart() { + // given + final Collection> brokers = Collections.singleton(new ZeebeContainer()); + + // when + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + stream.start(brokers); + // then + assertThatNoException().isThrownBy(() -> testServerConnection(receiver.serverAddress())); + } + } + + @Test + void shouldStopReceiverOnStop() { + // given + final Collection> brokers = Collections.singleton(new ZeebeContainer()); + + // when + final InetSocketAddress serverAddress; + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + stream.start(brokers); + serverAddress = receiver.serverAddress(); + } + + // then + assertThatCode(() -> testServerConnection(serverAddress)).isInstanceOf(ConnectException.class); + } + + @Test + void shouldWaitForIdleState() { + // given + + // when + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + // then + assertThatNoException().isThrownBy(() -> stream.waitForIdleState(Duration.ofSeconds(2))); + } + } + + @Test + void shouldTimeoutWaitingForIdleStateWhenNoRecords() { + // given + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + // when + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + executor.scheduleAtFixedRate( + () -> records.add(ImmutableRecord.builder().build()), 100, 500, TimeUnit.MILLISECONDS); + + // then + assertThatCode(() -> stream.waitForIdleState(Duration.ofSeconds(2))) + .isInstanceOf(TimeoutException.class); + } + } + + @Test + void shouldWaitForBusyState() { + // given + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + // when + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + executor.schedule( + () -> records.add(ImmutableRecord.builder().build()), 500, TimeUnit.MILLISECONDS); + + // then + assertThatNoException().isThrownBy(() -> stream.waitForIdleState(Duration.ofSeconds(2))); + } finally { + executor.shutdownNow(); + } + } + + @Test + void shouldTimeOutWaitingForBusyState() { + // given + + // when + try (final DebugReceiverStream stream = new DebugReceiverStream(records, receiver)) { + // then + assertThatCode(() -> stream.waitForBusyState(Duration.ofSeconds(1))) + .isInstanceOf(TimeoutException.class); + } + } + + private void testServerConnection(final InetSocketAddress receiverAddress) throws IOException { + try (final Socket socket = new Socket()) { + socket.connect(receiverAddress, 1_000); + } + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/InfiniteListTest.java b/engine/src/test/java/io/zeebe/containers/engine/InfiniteListTest.java new file mode 100644 index 00000000..feb5ab2f --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/InfiniteListTest.java @@ -0,0 +1,107 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.Thread.State; +import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +final class InfiniteListTest { + @Test + void shouldReturnSize() { + // given + final InfiniteList list = new InfiniteList<>(); + list.add(1); + + // when + final int initialSize = list.size(); + list.add(2); + final int finalSize = list.size(); + + // then + assertThat(initialSize).isEqualTo(1); + assertThat(finalSize).isEqualTo(2); + } + + @Nested + final class IterationTest { + @Test + void shouldTailItems() { + // given + final InfiniteList list = new InfiniteList<>(); + final Iterator iterator = list.iterator(); + + // when + list.add(1); + + // then + assertThat(iterator).hasNext(); + assertThat(iterator.next()).isEqualTo(1); + } + + @Test + void shouldSignalOnNewItems() throws InterruptedException { + // given + final InfiniteList list = new InfiniteList<>(Duration.ofMinutes(5)); + final Iterator iterator = list.iterator(); + final AtomicReference receivedItem = new AtomicReference<>(); + + // when + final Thread thread = new Thread(() -> receivedItem.set(iterator.next())); + thread.start(); + + Awaitility.await("until the thread is parked and waiting for the next item") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(thread.getState()).isEqualTo(State.TIMED_WAITING)); + list.add(1); + thread.join(); + + // then + assertThat(receivedItem).hasValue(1); + } + + @Test + void shouldTimeoutWhenNoItems() throws InterruptedException { + // given + final InfiniteList list = new InfiniteList<>(Duration.ofSeconds(1)); + final Iterator iterator = list.iterator(); + final AtomicBoolean hasNext = new AtomicBoolean(true); + + // when + final Thread thread = new Thread(() -> hasNext.set(iterator.hasNext())); + final Instant start = Instant.now(); + thread.start(); + + Awaitility.await("until the thread is parked and waiting for the next item") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(thread.getState()).isEqualTo(State.TIMED_WAITING)); + thread.join(); + final Instant finish = Instant.now(); + + // then - give some lenient timeout, especially the upper bound + assertThat(finish).isBetween(start.plusMillis(800), start.plusMillis(2000)); + assertThat(hasNext).isFalse(); + } + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java b/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java new file mode 100644 index 00000000..5607e2de --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java @@ -0,0 +1,148 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.within; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.command.FinalCommandStep; +import io.camunda.zeebe.protocol.record.Record; +import io.grpc.StatusRuntimeException; +import io.zeebe.containers.ZeebeGatewayNode; +import io.zeebe.containers.clock.ZeebeClock; +import io.zeebe.containers.cluster.ZeebeCluster; +import io.zeebe.containers.exporter.DebugReceiver; +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.agrona.CloseHelper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +final class ZeebeClusterEngineTest { + private final Network network = Network.newNetwork(); + private final InfiniteList> records = new InfiniteList<>(); + private final DebugReceiver receiver = new DebugReceiver(records::add); + private final DebugReceiverStream recordStream = new DebugReceiverStream(records, receiver); + private final ZeebeCluster cluster = + ZeebeCluster.builder() + .withEmbeddedGateway(true) + .withBrokersCount(2) + .withPartitionsCount(1) + .withReplicationFactor(2) + .build(); + + @AfterEach + void afterEach() { + CloseHelper.close(network); + } + + @Test + void shouldCloseEverythingOnStop() { + // given + final ZeebeClient client; + final InetSocketAddress receiverAddress; + try (final ZeebeClusterEngine engine = new ZeebeClusterEngine(cluster, recordStream)) { + engine.start(); + client = engine.createClient(); + receiverAddress = receiver.serverAddress(); + } + + // then + final FinalCommandStep request = client.newTopologyRequest(); + assertThatCode(request::send).isInstanceOf(StatusRuntimeException.class); + assertThat(cluster.getNodes().values()).allMatch(node -> !node.isRunning()); + assertThatCode(() -> testServerConnection(receiverAddress)) + .isInstanceOf(ConnectException.class); + } + + private void testServerConnection(final InetSocketAddress receiverAddress) throws IOException { + try (final Socket socket = new Socket()) { + socket.connect(receiverAddress, 1_000); + } + } + + @Nested + final class WithClusterTest { + @Container final ZeebeClusterEngine engine = new ZeebeClusterEngine(cluster, recordStream); + + @Test + void shouldCreateClient() { + // given + + // when + final ZeebeClient client = engine.createClient(); + + // then + assertThat((Future) client.newTopologyRequest().send()) + .succeedsWithin(Duration.ofSeconds(1)); + } + + @Test + void shouldReturnGatewayAddress() { + // given + + // when + final String address = engine.getGatewayAddress(); + + // then + final Set gatewayAddress = + cluster.getGateways().values().stream() + .map(ZeebeGatewayNode::getExternalGatewayAddress) + .collect(Collectors.toSet()); + assertThat(address).isIn(gatewayAddress); + } + + @Test + void shouldIncreaseTime() { + // given + final Duration offset = Duration.ofMinutes(5); + final Map clocks = new HashMap<>(); + cluster.getNodes().forEach((id, node) -> clocks.put(id, ZeebeClock.newDefaultClock(node))); + final Map startTimes = new HashMap<>(); + clocks.forEach((id, clock) -> startTimes.put(id, clock.getCurrentTime())); + + // when + engine.increaseTime(offset); + + // then + final Map endTimes = new HashMap<>(); + clocks.forEach((id, clock) -> endTimes.put(id, clock.getCurrentTime())); + + assertThat(endTimes) + .allSatisfy( + (id, endTime) -> + assertThat(endTime) + .isCloseTo(startTimes.get(id).plus(offset), within(10, ChronoUnit.SECONDS))); + } + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java b/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java new file mode 100644 index 00000000..5ab62e82 --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java @@ -0,0 +1,119 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.within; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.command.FinalCommandStep; +import io.camunda.zeebe.protocol.record.Record; +import io.grpc.StatusRuntimeException; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.clock.ZeebeClock; +import io.zeebe.containers.exporter.DebugReceiver; +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.Future; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +final class ZeebeContainerEngineTest { + private final InfiniteList> records = new InfiniteList<>(); + private final DebugReceiver receiver = new DebugReceiver(records::add); + private final DebugReceiverStream recordStream = new DebugReceiverStream(records, receiver); + private final ZeebeContainer container = new ZeebeContainer(); + + @Test + void shouldCloseEverythingOnStop() { + // given + final ZeebeClient client; + final InetSocketAddress receiverAddress; + try (final ZeebeContainerEngine engine = + new ZeebeContainerEngine<>(container, recordStream)) { + engine.start(); + client = engine.createClient(); + receiverAddress = receiver.serverAddress(); + } + + // then + final FinalCommandStep request = client.newTopologyRequest(); + assertThatCode(request::send).isInstanceOf(StatusRuntimeException.class); + assertThat(container.isStarted()).isFalse(); + assertThatCode(() -> testServerConnection(receiverAddress)) + .isInstanceOf(ConnectException.class); + } + + private void testServerConnection(final InetSocketAddress receiverAddress) throws IOException { + try (final Socket socket = new Socket()) { + socket.connect(receiverAddress, 1_000); + } + } + + @Nested + final class WithContainerTest { + @Container + private final ZeebeContainerEngine engine = + new ZeebeContainerEngine<>(container, recordStream); + + @Test + void shouldCreateClient() { + // given + + // when + final ZeebeClient client = engine.createClient(); + + // then + assertThat((Future) client.newTopologyRequest().send()) + .succeedsWithin(Duration.ofSeconds(1)); + } + + @Test + void shouldReturnGatewayAddress() { + // given + + // when + final String address = engine.getGatewayAddress(); + + // then + assertThat(address).isEqualTo(container.getExternalGatewayAddress()); + } + + @Test + void shouldIncreaseTime() { + // given + final Duration offset = Duration.ofMinutes(5); + final ZeebeClock clock = ZeebeClock.newDefaultClock(container); + + // when + final Instant startTime = clock.getCurrentTime(); + engine.increaseTime(offset); + final Instant endTime = clock.getCurrentTime(); + + // then + assertThat(endTime).isCloseTo(startTime.plus(offset), within(10, ChronoUnit.SECONDS)); + } + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/examples/ClusterEngineExampleTest.java b/engine/src/test/java/io/zeebe/containers/engine/examples/ClusterEngineExampleTest.java new file mode 100644 index 00000000..9fe2a31f --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/examples/ClusterEngineExampleTest.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine.examples; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.process.test.assertions.BpmnAssert; +import io.zeebe.containers.cluster.ZeebeCluster; +import io.zeebe.containers.engine.ContainerEngine; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +final class ClusterEngineExampleTest { + private final Network network = Network.newNetwork(); + + // a container which will print out its log to the given logger + private final ZeebeCluster cluster = + ZeebeCluster.builder() + .withGatewaysCount(1) + .withPartitionsCount(2) + .withReplicationFactor(2) + .withBrokersCount(2) + .withEmbeddedGateway(false) + .withNetwork(network) + .build(); + + @Container + private final ContainerEngine engine = + ContainerEngine.builder().withCluster(cluster).withIdlePeriod(Duration.ofSeconds(2)).build(); + + @AfterEach + void afterEach() { + network.close(); + } + + @Test + void shouldCompleteProcessInstance() throws InterruptedException, TimeoutException { + // given + final BpmnModelInstance processModel = + Bpmn.createExecutableProcess("process").startEvent().endEvent().done(); + final ProcessInstanceEvent processInstance; + + // when + try (final ZeebeClient client = engine.createClient()) { + client.newDeployResourceCommand().addProcessModel(processModel, "process.bpmn").send().join(); + processInstance = + client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join(); + } + + // then + engine.waitForIdleState(Duration.ofSeconds(5)); + BpmnAssert.assertThat(processInstance).isStarted().isCompleted(); + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/examples/ContainerEngineExampleTest.java b/engine/src/test/java/io/zeebe/containers/engine/examples/ContainerEngineExampleTest.java new file mode 100644 index 00000000..8c1881ce --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/examples/ContainerEngineExampleTest.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine.examples; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.process.test.assertions.BpmnAssert; +import io.zeebe.containers.ZeebeContainer; +import io.zeebe.containers.engine.ContainerEngine; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * This example showcases how to set up a {@link io.camunda.zeebe.process.test.api.ZeebeTestEngine} + * which points to a pre-configured {@link io.zeebe.containers.ZeebeContainer}. + * + *

Note that the lifecycle of the configured container is managed by the {@link ContainerEngine}. + * As such, you will notice the {@code engine} field is annotated with {@link Container} and not the + * container itself. + * + *

For a more complete example of how to use {@link + * io.camunda.zeebe.process.test.api.ZeebeTestEngine} and {@link + * io.camunda.zeebe.process.test.assertions.BpmnAssert}, refer to zeebe-process-test. + */ +@Testcontainers +final class ContainerEngineExampleTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ContainerEngineExampleTest.class); + + // a container which will print out its log to the given logger + private final ZeebeContainer container = + new ZeebeContainer().withLogConsumer(new Slf4jLogConsumer(LOGGER)); + + @Container + private final ContainerEngine engine = + ContainerEngine.builder() + .withContainer(container) + .withIdlePeriod(Duration.ofSeconds(2)) + .build(); + + @Test + void shouldCompleteProcessInstance() throws InterruptedException, TimeoutException { + // given + final BpmnModelInstance processModel = + Bpmn.createExecutableProcess("process").startEvent().endEvent().done(); + final ProcessInstanceEvent processInstance; + + // when + try (final ZeebeClient client = engine.createClient()) { + client.newDeployResourceCommand().addProcessModel(processModel, "process.bpmn").send().join(); + processInstance = + client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join(); + } + + // then + engine.waitForIdleState(Duration.ofSeconds(5)); + BpmnAssert.assertThat(processInstance).isStarted().isCompleted(); + } +} diff --git a/engine/src/test/java/io/zeebe/containers/engine/examples/GracePeriodExampleTest.java b/engine/src/test/java/io/zeebe/containers/engine/examples/GracePeriodExampleTest.java new file mode 100644 index 00000000..8410f67f --- /dev/null +++ b/engine/src/test/java/io/zeebe/containers/engine/examples/GracePeriodExampleTest.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2022 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.containers.engine.examples; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.process.test.assertions.BpmnAssert; +import io.zeebe.containers.engine.ContainerEngine; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * This example shows how setting a grace period to your engine via {@link + * ContainerEngine.Builder#withGracePeriod(Duration)} can be a good alternative to using {@link + * io.camunda.zeebe.process.test.api.ZeebeTestEngine#waitForIdleState(Duration)}, as you do not need + * to call it ever. + * + *

NOTE: one of the pitfalls with this method however is that certain assertions, notably those + * which check for the absence of something, will typically block for the complete duration + * of the grace period, thus slowing down your tests. + */ +@Testcontainers +final class GracePeriodExampleTest { + @Container + private final ContainerEngine engine = + ContainerEngine.builder().withGracePeriod(Duration.ofSeconds(5)).build(); + + @Test + void shouldCompleteProcessInstance() { + // given + final BpmnModelInstance processModel = + Bpmn.createExecutableProcess("process").startEvent().endEvent().done(); + final ProcessInstanceEvent processInstance; + + // when + try (final ZeebeClient client = engine.createClient()) { + client.newDeployResourceCommand().addProcessModel(processModel, "process.bpmn").send().join(); + processInstance = + client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join(); + } + + // then + BpmnAssert.assertThat(processInstance).isStarted().isCompleted(); + } +} diff --git a/engine/src/test/resources/simplelogger.properties b/engine/src/test/resources/simplelogger.properties new file mode 100644 index 00000000..aa66fbe6 --- /dev/null +++ b/engine/src/test/resources/simplelogger.properties @@ -0,0 +1,5 @@ +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simplerLogger.showShortLogName=true +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.log.io.zeebe.containers=debug +org.slf4j.simpleLogger.showDateTime=true diff --git a/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java b/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java index 785231f3..5bf8bc81 100644 --- a/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java +++ b/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java @@ -59,7 +59,7 @@ final class SinglePartitionTest { private final DebugExporter exporter = new DebugExporter(); @BeforeEach - void beforeEach() throws Exception { + void beforeEach() { context.getConfiguration().getArguments().put("url", receiver.recordsEndpoint().toString()); exporter.configure(context); diff --git a/exporter-test/src/test/java/io/zeebe/containers/exporter/TestExporterApi.java b/exporter-test/src/test/java/io/zeebe/containers/exporter/TestExporterApi.java index 7da79a6d..44f1f9fc 100644 --- a/exporter-test/src/test/java/io/zeebe/containers/exporter/TestExporterApi.java +++ b/exporter-test/src/test/java/io/zeebe/containers/exporter/TestExporterApi.java @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: replace this with io.camunda.zeebe:zeebe-exporter-test:8.1.0-alpha2 when available +// TODO: replace this with io.camunda.zeebe:zeebe-exporter-test:8.1.0 when available final class TestExporterApi { private static final Logger LOGGER = LoggerFactory.getLogger(TestExporterApi.class); diff --git a/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java b/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java index 79c76d0b..8b86485c 100644 --- a/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java +++ b/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java @@ -95,11 +95,10 @@ public void export(final Record record) { try { pushRecord(record); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LangUtil.rethrowUnchecked(e); } catch (final Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - LangUtil.rethrowUnchecked(e); } diff --git a/pom.xml b/pom.xml index 1724de95..c6f186eb 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ core + engine exporter exporter-test @@ -67,6 +68,7 @@ 11.9 5.1.4 2.13.3 + 1.0 5.8.2 1.3.2 4.6.1 @@ -118,6 +120,12 @@ ${project.version} + + io.zeebe + zeebe-test-container-engine + ${project.version} + + io.camunda @@ -133,6 +141,19 @@ ${version.zeebe} + + + io.camunda + zeebe-process-test-api + ${version.zeebe} + + + + io.camunda + zeebe-process-test-assertions + ${version.zeebe} + + org.junit @@ -272,6 +293,13 @@ + + net.jcip + jcip-annotations + ${version.jcip} + provided + + com.google.errorprone From 246f4417ace4aff5fe233b4bc536ae2e9587f1b8 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:27:14 +0200 Subject: [PATCH 4/7] style: fix compilation/checks issues --- README.md | 4 +- engine/pom.xml | 38 +++++++------------ .../engine/ZeebeClusterEngineTest.java | 3 +- .../engine/ZeebeContainerEngineTest.java | 3 +- pom.xml | 8 +++- 5 files changed, 25 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 381c2d79..85a9fc26 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,12 @@ Add the project to your dependencies: io.zeebe zeebe-test-container - 4.0.0 + 3.5.0 ``` ```groovy -testImplementation 'io.zeebe:zeebe-test-container:4.0.0' +testImplementation 'io.zeebe:zeebe-test-container:3.5.0' ``` ### Requirements diff --git a/engine/pom.xml b/engine/pom.xml index f514f41d..a4b9be4a 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -37,6 +37,11 @@ zeebe-process-test-assertions + + io.camunda + zeebe-process-test-filters + + org.slf4j slf4j-api @@ -69,36 +74,15 @@ - com.fasterxml.jackson.core - jackson-annotations - - - - com.fasterxml.jackson.core - jackson-databind - - - - com.fasterxml.jackson.core - jackson-core - - - - org.junit.jupiter - junit-jupiter-api + org.testcontainers + testcontainers - - org.mockito - mockito-core - test - - org.junit.jupiter - junit-jupiter-params + junit-jupiter-api test @@ -119,6 +103,12 @@ awaitility test + + + io.camunda + zeebe-bpmn-model + test + diff --git a/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java b/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java index 5607e2de..5786cf59 100644 --- a/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java +++ b/engine/src/test/java/io/zeebe/containers/engine/ZeebeClusterEngineTest.java @@ -22,7 +22,6 @@ import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.client.api.command.FinalCommandStep; import io.camunda.zeebe.protocol.record.Record; -import io.grpc.StatusRuntimeException; import io.zeebe.containers.ZeebeGatewayNode; import io.zeebe.containers.clock.ZeebeClock; import io.zeebe.containers.cluster.ZeebeCluster; @@ -79,7 +78,7 @@ void shouldCloseEverythingOnStop() { // then final FinalCommandStep request = client.newTopologyRequest(); - assertThatCode(request::send).isInstanceOf(StatusRuntimeException.class); + assertThatCode(request::send).isInstanceOf(RuntimeException.class); assertThat(cluster.getNodes().values()).allMatch(node -> !node.isRunning()); assertThatCode(() -> testServerConnection(receiverAddress)) .isInstanceOf(ConnectException.class); diff --git a/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java b/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java index 5ab62e82..e5f516a0 100644 --- a/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java +++ b/engine/src/test/java/io/zeebe/containers/engine/ZeebeContainerEngineTest.java @@ -22,7 +22,6 @@ import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.client.api.command.FinalCommandStep; import io.camunda.zeebe.protocol.record.Record; -import io.grpc.StatusRuntimeException; import io.zeebe.containers.ZeebeContainer; import io.zeebe.containers.clock.ZeebeClock; import io.zeebe.containers.exporter.DebugReceiver; @@ -60,7 +59,7 @@ void shouldCloseEverythingOnStop() { // then final FinalCommandStep request = client.newTopologyRequest(); - assertThatCode(request::send).isInstanceOf(StatusRuntimeException.class); + assertThatCode(request::send).isInstanceOf(RuntimeException.class); assertThat(container.isStarted()).isFalse(); assertThatCode(() -> testServerConnection(receiverAddress)) .isInstanceOf(ConnectException.class); diff --git a/pom.xml b/pom.xml index c6f186eb..cbbe28b4 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ io.zeebe zeebe-test-container-root - 3.4.1-SNAPSHOT + 3.5.0-SNAPSHOT pom Zeebe Test Container Root https://github.com/zeebe-io/zeebe-test-container @@ -154,6 +154,12 @@ ${version.zeebe} + + io.camunda + zeebe-process-test-filters + ${version.zeebe} + + org.junit From bd78c7ee46935e3a97cea9d1e445cdee1f83f428 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:27:58 +0200 Subject: [PATCH 5/7] style(engine): add missing override --- .../java/io/zeebe/containers/engine/ContainerEngineBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java index a5485453..926fe64d 100644 --- a/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java +++ b/engine/src/main/java/io/zeebe/containers/engine/ContainerEngineBuilder.java @@ -100,6 +100,7 @@ public Builder withAutoAcknowledge(final boolean autoAcknowledge) { } @SuppressWarnings("unchecked") + @Override public ContainerEngine build() { final Duration listGracePeriod = Optional.ofNullable(gracePeriod).orElse(DEFAULT_GRACE_PERIOD); final Duration receiveIdlePeriod = Optional.ofNullable(idlePeriod).orElse(DEFAULT_IDLE_PERIOD); From 4328c6a196163f6011cce1720c27eebb0b14bf86 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:34:29 +0200 Subject: [PATCH 6/7] build: fix CI and build issues --- .github/workflows/test.yml | 41 ++++++++++++++++++++++++++++++++++++++ core/pom.xml | 2 +- engine/pom.xml | 2 +- exporter-test/pom.xml | 2 +- exporter/pom.xml | 2 +- 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 38cda0d9..18a65a63 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -81,6 +81,47 @@ jobs: name: local-core-test-results path: "**/target/surefire-reports/**" retention-days: 3 + engine: + env: + TC_CLOUD_TOKEN: ${{ secrets.TC_CLOUD_TOKEN }} + TC_CLOUD_CONCURRENCY: 4 + name: Test (Cloud) - engine + runs-on: ubuntu-latest + steps: + - name: Prepare Testcontainers Cloud agent + run: | + curl -L -o agent https://app.testcontainers.cloud/download/testcontainers-cloud-agent_linux_x86-64 + chmod +x agent + ./agent & + ./agent wait + - uses: actions/checkout@v3 + # remove exports causing issues on JDK 8 + - run: rm .mvn/jvm.config + - name: Setup JDK 17 for exporter build + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + cache: 'maven' + - name: Build & Copy exporter + run: > + mvn -B -T1C -DskipTests -DskipChecks install + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'temurin' + - name: Test + timeout-minutes: 20 + run: > + mvn -B -Pparallel-tests -DforkCount=2C -DskipChecks -pl engine test + - name: Archive Test Results + uses: actions/upload-artifact@v3 + if: always() + with: + name: cloud-core-test-results + path: "**/target/surefire-reports/**" + retention-days: 3 exporter: name: Test (Local) - exporter runs-on: ubuntu-latest diff --git a/core/pom.xml b/core/pom.xml index bf7b3374..67fa2769 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ io.zeebe zeebe-test-container-root - 3.4.1-SNAPSHOT + 3.5.0-SNAPSHOT ../pom.xml diff --git a/engine/pom.xml b/engine/pom.xml index a4b9be4a..e7458d7b 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -5,7 +5,7 @@ io.zeebe zeebe-test-container-root - 3.4.1-SNAPSHOT + 3.5.0-SNAPSHOT ../pom.xml diff --git a/exporter-test/pom.xml b/exporter-test/pom.xml index ae0df48a..f50d0911 100644 --- a/exporter-test/pom.xml +++ b/exporter-test/pom.xml @@ -5,7 +5,7 @@ io.zeebe zeebe-test-container-root - 3.4.1-SNAPSHOT + 3.5.0-SNAPSHOT ../pom.xml diff --git a/exporter/pom.xml b/exporter/pom.xml index f954988f..20fca765 100644 --- a/exporter/pom.xml +++ b/exporter/pom.xml @@ -5,7 +5,7 @@ io.zeebe zeebe-test-container-root - 3.4.1-SNAPSHOT + 3.5.0-SNAPSHOT ../pom.xml From 05ed005b77eff351dd34bc8537bcb04b90412b85 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Fri, 22 Jul 2022 17:36:51 +0200 Subject: [PATCH 7/7] test: disable auto acknowledging --- .../io/zeebe/containers/exporter/ExporterIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java b/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java index 5bf8bc81..76d4ec73 100644 --- a/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java +++ b/exporter-test/src/test/java/io/zeebe/containers/exporter/ExporterIntegrationTest.java @@ -32,7 +32,7 @@ final class ExporterIntegrationTest { private final ProtocolFactory recordFactory = new ProtocolFactory(); private final List> exportedRecords = new CopyOnWriteArrayList<>(); - private final DebugReceiver receiver = new DebugReceiver(exportedRecords::add, 0); + private final DebugReceiver receiver = new DebugReceiver(exportedRecords::add, 0, false); @BeforeEach void beforeEach() {