diff --git a/README.md b/README.md
index 8dc044b8..64f2cd58 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@ A monitoring application for [Zeebe](https://zeebe.io). It is designed for devel
* test workflows manually
* provide insides on how workflows are executed
-The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter) or [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.
+The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter), [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter) or [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.
![how-it-works](docs/how-it-works.png)
@@ -59,6 +59,19 @@ By default, the Zeebe Simple Monitor imports Zeebe events through Hazelcast, but
* In order to import events efficiently and quickly, Zeebe brokers partitions and Kafka topic partitions should be correlated in a special way: [reference to the exporter docs](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#partitioning)
* Configure the environment variables in the Zeebe Simple Monitor as described in the "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" section
+**Switch to the Redis exporter/importer**
+
+* Ensure that a Zeebe broker is running with a [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter)
+* Adjust the following environment variables in Zeebe:
+ ```
+ - ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379
+ - ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900
+ - ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true
+ ```
+* Configure the connection to the Zeebe broker by setting `zeebe.client.broker.gateway-address` (default: `localhost:26500`)
+* Configure the connection to Redis by setting `zeebe.client.worker.redis.connection` (default: `redis://localhost:6379`)
+* Activate Redis by setting `zeebe-importer: redis`
+
If the Zeebe broker runs on your local machine with the default configs then start the container with the following command:
@@ -66,7 +79,7 @@ If the Zeebe broker runs on your local machine with the default configs then sta
docker run --network="host" ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
```
-For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka exporter and the application.
+For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka/Redis exporter and the application.
There are several Docker Compose profiles, setting by a file [.env](docker/.env), by passing multiple --profile flags or a comma-separated list for the COMPOSE_PROFILES environment variable:
* ```docker compose --profile hazelcast --profile hazelcast_in_memory up```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory docker compose up```
@@ -74,6 +87,7 @@ There are several Docker Compose profiles, setting by a file [.env](docker/.env)
Existing presets:
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory``` (by default)
* ```COMPOSE_PROFILES=kafka,kafka_in_memory```
+* ```COMPOSE_PROFILES=redis,redis_in_memory```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_mysql,mysql```
@@ -89,6 +103,7 @@ Go to http://localhost:8082
To change the database see "[Change the Database](#change-the-database)"
To change Zeebe importer see "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)"
+or "[Change the default Zeebe importer to Redis](#change-the-default-zeebe-importer-to-redis)"
```
docker-compose --profile postgres up
@@ -252,7 +267,29 @@ See the [docker-compose file](docker/docker-compose.yml) for a sample configurat
* `spring.kafka.custom.concurrency` (default: `3`) is the number of threads for the Kafka listener that will import events from Zeebe
* `spring.kafka.custom.retry.intervalMs` (default: `30000`) and `spring.kafka.custom.retry.max-attempts` (default: `3`) are the retry configurations for a retryable exception in the listener
-Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profiles presets: `kafka,kafka_in_memory`
+Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profile presets: `kafka,kafka_in_memory`
+
+#### Change the default Zeebe importer to Redis
+
+* set the `zeebe-importer` (default: `hazelcast`) configuration property to `redis`
+* adjust the importer settings under `zeebe.client.worker.redis` (complete default values below):
+ ```
+ zeebe:
+ client:
+ broker.gatewayAddress: 127.0.0.1:26500
+ security.plaintext: true
+
+ worker:
+ redis:
+ connection: redis://localhost:6379
+ consumer-group: simple-monitor
+ xread-count: 500
+ xread-block-millis: 2000
+
+ zeebe-importer: redis
+ ```
+
+Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Redis importer. Profile presets: `redis,redis_in_memory`
## Code of Conduct
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 4a736193..f76e90a4 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -43,6 +43,23 @@ services:
profiles:
- kafka
+ zeebe-redis:
+ container_name: zeebe-broker-redis
+ image: ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:8.4.0
+ environment:
+ - ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379
+ - ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900
+ - ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true
+ ports:
+ - "26500:26500"
+ - "9600:9600"
+ networks:
+ - zeebe_network
+ volumes:
+ - ./redis/application.yaml:/usr/local/zeebe/config/application.yaml
+ profiles:
+ - redis
+
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
@@ -73,6 +90,16 @@ services:
profiles:
- kafka
+ redis:
+ container_name: redis_cache
+ image: redis:7-alpine
+ ports:
+ - "6379:6379"
+ networks:
+ - zeebe_network
+ profiles:
+ - redis
+
simple-monitor-in-memory:
container_name: zeebe-simple-monitor-in-memory
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
@@ -106,6 +133,22 @@ services:
profiles:
- kafka_in_memory
+ simple-monitor-in-memory-redis:
+ container_name: zeebe-simple-monitor-in-memory-redis
+ image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.6.4
+ environment:
+ - zeebe.client.broker.gateway-address=zeebe:26500
+ - zeebe-importer=redis
+ - zeebe.client.worker.redis.connection=redis://redis:6379
+ ports:
+ - "8082:8082"
+ depends_on:
+ - zeebe-redis
+ networks:
+ - zeebe_network
+ profiles:
+ - redis_in_memory
+
simple-monitor-postgres:
container_name: zeebe-simple-monitor-postgres
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
diff --git a/docker/redis/application.yaml b/docker/redis/application.yaml
new file mode 100644
index 00000000..9c720f15
--- /dev/null
+++ b/docker/redis/application.yaml
@@ -0,0 +1,6 @@
+zeebe:
+ broker:
+ exporters:
+ redis:
+ className: io.zeebe.redis.exporter.RedisExporter
+ jarPath: exporters/zeebe-redis-exporter-jar-with-dependencies.jar
diff --git a/docs/how-it-works.png b/docs/how-it-works.png
index d4e5da7c..c04e6d6d 100644
Binary files a/docs/how-it-works.png and b/docs/how-it-works.png differ
diff --git a/pom.xml b/pom.xml
index 67a29a3e..88999243 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,7 @@
8.3.4
8.4.0
1.4.0
+ 0.9.9
5.0.0
@@ -73,6 +74,12 @@
${hazelcast.exporter.version}
+
+ io.zeebe.redis
+ zeebe-redis-connector
+ ${redis.exporter.version}
+
+
com.h2database
h2
@@ -142,6 +149,11 @@
zeebe-hazelcast-connector
+
+ io.zeebe.redis
+ zeebe-redis-connector
+
+
io.camunda
zeebe-protocol-jackson
diff --git a/src/main/java/io/zeebe/monitor/config/RedisConfig.java b/src/main/java/io/zeebe/monitor/config/RedisConfig.java
new file mode 100644
index 00000000..e147237b
--- /dev/null
+++ b/src/main/java/io/zeebe/monitor/config/RedisConfig.java
@@ -0,0 +1,38 @@
+package io.zeebe.monitor.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+@ConditionalOnProperty(name = "zeebe-importer", havingValue = "redis")
+@Configuration
+public class RedisConfig {
+
+ @Value("${zeebe.client.worker.redis.connection}")
+ private String redisConnection;
+
+ @Value("${zeebe.client.worker.redis.consumer-group:simple-monitor}")
+ private String redisConumerGroup;
+
+ @Value("${zeebe.client.worker.redis.xread-count:500}")
+ private int redisXreadCount;
+
+ @Value("${zeebe.client.worker.redis.xread-block-millis:2000}")
+ private int redisXreadBlockMillis;
+
+ public String getRedisConnection() {
+ return redisConnection;
+ }
+
+ public String getRedisConumerGroup() {
+ return redisConumerGroup;
+ }
+
+ public int getRedisXreadCount() {
+ return redisXreadCount;
+ }
+
+ public int getRedisXreadBlockMillis() {
+ return redisXreadBlockMillis;
+ }
+}
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java
index 2ec8ae35..f0fb258d 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java
@@ -5,14 +5,14 @@
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
-import io.zeebe.monitor.zeebe.hazelcast.importers.ErrorHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.IncidentHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.JobHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.MessageHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.MessageSubscriptionHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.ProcessAndElementHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.TimerHazelcastImporter;
-import io.zeebe.monitor.zeebe.hazelcast.importers.VariableHazelcastImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.MessageProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.MessageSubscriptionProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.ProcessAndElementProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.TimerProtobufImporter;
+import io.zeebe.monitor.zeebe.protobuf.importers.VariableProtobufImporter;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
@@ -21,14 +21,14 @@
@Component
public class HazelcastImportService {
- @Autowired private ProcessAndElementHazelcastImporter processAndElementImporter;
- @Autowired private VariableHazelcastImporter variableImporter;
- @Autowired private JobHazelcastImporter jobImporter;
- @Autowired private IncidentHazelcastImporter incidentImporter;
- @Autowired private MessageHazelcastImporter messageImporter;
- @Autowired private MessageSubscriptionHazelcastImporter messageSubscriptionImporter;
- @Autowired private TimerHazelcastImporter timerImporter;
- @Autowired private ErrorHazelcastImporter errorImporter;
+ @Autowired private ProcessAndElementProtobufImporter processAndElementImporter;
+ @Autowired private VariableProtobufImporter variableImporter;
+ @Autowired private JobProtobufImporter jobImporter;
+ @Autowired private IncidentProtobufImporter incidentImporter;
+ @Autowired private MessageProtobufImporter messageImporter;
+ @Autowired private MessageSubscriptionProtobufImporter messageSubscriptionImporter;
+ @Autowired private TimerProtobufImporter timerImporter;
+ @Autowired private ErrorProtobufImporter errorImporter;
@Autowired private HazelcastConfigRepository hazelcastConfigRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java
similarity index 92%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java
index 01cf948f..136fe5bc 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.ErrorEntity;
@@ -7,7 +7,7 @@
import org.springframework.stereotype.Component;
@Component
-public class ErrorHazelcastImporter {
+public class ErrorProtobufImporter {
@Autowired private ErrorRepository errorRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java
similarity index 94%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java
index 4a8625f5..79057c1b 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.zeebe.exporter.proto.Schema;
@@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;
@Component
-public class IncidentHazelcastImporter {
+public class IncidentProtobufImporter {
@Autowired private IncidentRepository incidentRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java
similarity index 93%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java
index ca84e8c6..401a5ae5 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.exporter.proto.Schema;
@@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;
@Component
-public class JobHazelcastImporter {
+public class JobProtobufImporter {
@Autowired private JobRepository jobRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java
similarity index 93%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java
index 99506c3c..cd3154b6 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.exporter.proto.Schema;
@@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;
@Component
-public class MessageHazelcastImporter {
+public class MessageProtobufImporter {
@Autowired private MessageRepository messageRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java
similarity index 96%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java
index 18d39d05..7d513661 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
@@ -10,7 +10,7 @@
import org.springframework.stereotype.Component;
@Component
-public class MessageSubscriptionHazelcastImporter {
+public class MessageSubscriptionProtobufImporter {
@Autowired private MessageSubscriptionRepository messageSubscriptionRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java
similarity index 98%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java
index 515ec58f..df8c3c7a 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.intent.Intent;
@@ -15,7 +15,7 @@
import org.springframework.stereotype.Component;
@Component
-public class ProcessAndElementHazelcastImporter {
+public class ProcessAndElementProtobufImporter {
@Autowired private ProcessRepository processRepository;
@Autowired private ProcessInstanceRepository processInstanceRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java
similarity index 94%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java
index 47fdd026..06bd1973 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.zeebe.exporter.proto.Schema;
@@ -8,7 +8,7 @@
import org.springframework.stereotype.Component;
@Component
-public class TimerHazelcastImporter {
+public class TimerProtobufImporter {
@Autowired private TimerRepository timerRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java
similarity index 92%
rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java
rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java
index 4b056741..3cdef87c 100644
--- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java
+++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java
@@ -1,4 +1,4 @@
-package io.zeebe.monitor.zeebe.hazelcast.importers;
+package io.zeebe.monitor.zeebe.protobuf.importers;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.VariableEntity;
@@ -7,7 +7,7 @@
import org.springframework.stereotype.Component;
@Component
-public class VariableHazelcastImporter {
+public class VariableProtobufImporter {
@Autowired private VariableRepository variableRepository;
diff --git a/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java b/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java
new file mode 100644
index 00000000..efe093f2
--- /dev/null
+++ b/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java
@@ -0,0 +1,85 @@
+package io.zeebe.monitor.zeebe.redis;
+
+import com.hazelcast.core.HazelcastInstance;
+import io.lettuce.core.RedisClient;
+import io.zeebe.exporter.proto.Schema;
+import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
+import io.zeebe.monitor.config.RedisConfig;
+import io.zeebe.monitor.entity.HazelcastConfig;
+import io.zeebe.monitor.repository.HazelcastConfigRepository;
+import io.zeebe.monitor.zeebe.protobuf.importers.*;
+import io.zeebe.redis.connect.java.ZeebeRedis;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+@Component
+public class RedisImportService {
+
+ @Autowired private ProcessAndElementProtobufImporter processAndElementImporter;
+ @Autowired private VariableProtobufImporter variableImporter;
+ @Autowired private JobProtobufImporter jobImporter;
+ @Autowired private IncidentProtobufImporter incidentImporter;
+ @Autowired private MessageProtobufImporter messageImporter;
+ @Autowired private MessageSubscriptionProtobufImporter messageSubscriptionImporter;
+ @Autowired private TimerProtobufImporter timerImporter;
+ @Autowired private ErrorProtobufImporter errorImporter;
+
+ public ZeebeRedis importFrom(final RedisClient redisClient, RedisConfig redisConfig) {
+
+ final var builder =
+ ZeebeRedis.newBuilder(redisClient)
+ .consumerGroup(redisConfig.getRedisConumerGroup())
+ .xreadCount(redisConfig.getRedisXreadCount()).xreadBlockMillis(redisConfig.getRedisXreadBlockMillis())
+ .addProcessListener(
+ record -> ifEvent(record, Schema.ProcessRecord::getMetadata, processAndElementImporter::importProcess))
+ .addProcessInstanceListener(
+ record ->
+ ifEvent(
+ record,
+ Schema.ProcessInstanceRecord::getMetadata,
+ processAndElementImporter::importProcessInstance))
+ .addIncidentListener(
+ record -> ifEvent(record, Schema.IncidentRecord::getMetadata, incidentImporter::importIncident))
+ .addJobListener(
+ record -> ifEvent(record, Schema.JobRecord::getMetadata, jobImporter::importJob))
+ .addVariableListener(
+ record -> ifEvent(record, Schema.VariableRecord::getMetadata, variableImporter::importVariable))
+ .addTimerListener(
+ record -> ifEvent(record, Schema.TimerRecord::getMetadata, timerImporter::importTimer))
+ .addMessageListener(
+ record -> ifEvent(record, Schema.MessageRecord::getMetadata, messageImporter::importMessage))
+ .addMessageSubscriptionListener(
+ record ->
+ ifEvent(
+ record,
+ Schema.MessageSubscriptionRecord::getMetadata,
+ messageSubscriptionImporter::importMessageSubscription))
+ .addMessageStartEventSubscriptionListener(
+ record ->
+ ifEvent(
+ record,
+ Schema.MessageStartEventSubscriptionRecord::getMetadata,
+ messageSubscriptionImporter::importMessageStartEventSubscription))
+ .addErrorListener(errorImporter::importError);
+
+ return builder.build();
+ }
+
+ private void ifEvent(
+ final T record,
+ final Function extractor,
+ final Consumer consumer) {
+ final var metadata = extractor.apply(record);
+ if (isEvent(metadata)) {
+ consumer.accept(record);
+ }
+ }
+
+ private boolean isEvent(final Schema.RecordMetadata metadata) {
+ return metadata.getRecordType() == Schema.RecordMetadata.RecordType.EVENT;
+ }
+
+}
diff --git a/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java b/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java
new file mode 100644
index 00000000..83f2df07
--- /dev/null
+++ b/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java
@@ -0,0 +1,49 @@
+package io.zeebe.monitor.zeebe.redis;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.zeebe.monitor.config.RedisConfig;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(name = "zeebe-importer", havingValue = "redis")
+@Component
+public class ZeebeRedisService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZeebeRedisService.class);
+
+ private RedisConfig config;
+
+ public ZeebeRedisService(RedisConfig redisConfig) {
+ this.config = redisConfig;
+ }
+
+ @Autowired
+ private RedisImportService importService;
+
+ private AutoCloseable closeable;
+
+ @PostConstruct
+ public void start() {
+ var redisUri = RedisURI.create(config.getRedisConnection());
+
+ LOG.info("Connecting to Redis {}, consumer group {}", redisUri, config.getRedisConumerGroup());
+ var redisClient = RedisClient.create(redisUri);
+
+ LOG.info("Importing records from Redis...");
+ closeable = importService.importFrom(redisClient, config);
+ }
+
+ @PreDestroy
+ public void close() throws Exception {
+ if (closeable != null) {
+ closeable.close();
+ }
+ }
+}
diff --git a/src/main/resources/application-kafka.yaml b/src/main/resources/application-kafka.yaml
index 0bef7fff..60ea0dfb 100644
--- a/src/main/resources/application-kafka.yaml
+++ b/src/main/resources/application-kafka.yaml
@@ -1,5 +1,5 @@
-# Options: hazelcast | kafka
+# Options: hazelcast | kafka | redis
# This config switches importers between the provided
-# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively
-# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast and services.zeebe-kafka
+# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively
+# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis
zeebe-importer: kafka
diff --git a/src/main/resources/application-redis.yaml b/src/main/resources/application-redis.yaml
new file mode 100644
index 00000000..03872b9d
--- /dev/null
+++ b/src/main/resources/application-redis.yaml
@@ -0,0 +1,5 @@
+# Options: hazelcast | kafka | redis
+# This config switches importers between the provided
+# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively
+# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis
+zeebe-importer: redis
diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml
index c42cca04..8b7b30da 100644
--- a/src/main/resources/application.yaml
+++ b/src/main/resources/application.yaml
@@ -8,11 +8,13 @@ zeebe:
connection: localhost:5701
clusterName: dev
connectionTimeout: PT30S
+ redis:
+ connection: redis://localhost:6379
-# Options: hazelcast | kafka
+# Options: hazelcast | kafka | redis
# This config switches importers between the provided
-# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively
-# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast and services.zeebe-kafka
+# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively
+# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis
zeebe-importer: hazelcast
spring:
diff --git a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java
index 6dcadaec..063ab431 100644
--- a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java
+++ b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java
@@ -8,6 +8,7 @@
import io.zeebe.monitor.repository.ElementInstanceRepository;
import io.zeebe.monitor.repository.ZeebeRepositoryTest;
import io.zeebe.monitor.zeebe.ZeebeNotificationService;
+import io.zeebe.monitor.zeebe.protobuf.importers.ProcessAndElementProtobufImporter;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
@@ -15,13 +16,13 @@
import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration(
- classes = {ProcessAndElementHazelcastImporter.class,
+ classes = {ProcessAndElementProtobufImporter.class,
ZeebeNotificationService.class}
)
public class ProcessAndElementHazelcastImporterTest extends ZeebeRepositoryTest {
@Autowired
- ProcessAndElementHazelcastImporter processAndElementImporter;
+ ProcessAndElementProtobufImporter processAndElementImporter;
@Autowired
ElementInstanceRepository elementInstanceRepository;
diff --git a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java
index 674ec203..100e9fac 100644
--- a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java
+++ b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java
@@ -6,17 +6,18 @@
import io.zeebe.monitor.entity.VariableEntity;
import io.zeebe.monitor.repository.VariableRepository;
import io.zeebe.monitor.repository.ZeebeRepositoryTest;
+import io.zeebe.monitor.zeebe.protobuf.importers.VariableProtobufImporter;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration(
- classes = {VariableHazelcastImporter.class}
+ classes = {VariableProtobufImporter.class}
)
public class VariableHazelcastImporterTest extends ZeebeRepositoryTest {
@Autowired
- VariableHazelcastImporter variableImporter;
+ VariableProtobufImporter variableImporter;
@Autowired
VariableRepository variableRepository;