diff --git a/README.md b/README.md index 8dc044b8..64f2cd58 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ A monitoring application for [Zeebe](https://zeebe.io). It is designed for devel * test workflows manually * provide insides on how workflows are executed -The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter) or [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages. +The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter), [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter) or [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages. ![how-it-works](docs/how-it-works.png) @@ -59,6 +59,19 @@ By default, the Zeebe Simple Monitor imports Zeebe events through Hazelcast, but * In order to import events efficiently and quickly, Zeebe brokers partitions and Kafka topic partitions should be correlated in a special way: [reference to the exporter docs](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#partitioning) * Configure the environment variables in the Zeebe Simple Monitor as described in the "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" section +**Switch to the Redis exporter/importer** + +* Ensure that a Zeebe broker is running with a [Redis exporter](https://github.com/camunda-community-hub/zeebe-redis-exporter) +* Adjust the following environment variables in Zeebe: + ``` + - ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379 + - ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900 + - ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true + ``` +* Configure the connection to the Zeebe broker by setting `zeebe.client.broker.gateway-address` (default: `localhost:26500`) +* Configure the connection to Redis by setting `zeebe.client.worker.redis.connection` (default: `redis://localhost:6379`) +* Activate Redis by setting `zeebe-importer: redis` + If the Zeebe broker runs on your local machine with the default configs then start the container with the following command: @@ -66,7 +79,7 @@ If the Zeebe broker runs on your local machine with the default configs then sta docker run --network="host" ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1 ``` -For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka exporter and the application. +For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka/Redis exporter and the application. There are several Docker Compose profiles, setting by a file [.env](docker/.env), by passing multiple --profile flags or a comma-separated list for the COMPOSE_PROFILES environment variable: * ```docker compose --profile hazelcast --profile hazelcast_in_memory up``` * ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory docker compose up``` @@ -74,6 +87,7 @@ There are several Docker Compose profiles, setting by a file [.env](docker/.env) Existing presets: * ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory``` (by default) * ```COMPOSE_PROFILES=kafka,kafka_in_memory``` +* ```COMPOSE_PROFILES=redis,redis_in_memory``` * ```COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres``` * ```COMPOSE_PROFILES=hazelcast,hazelcast_mysql,mysql``` @@ -89,6 +103,7 @@ Go to http://localhost:8082 To change the database see "[Change the Database](#change-the-database)" To change Zeebe importer see "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" +or "[Change the default Zeebe importer to Redis](#change-the-default-zeebe-importer-to-redis)" ``` docker-compose --profile postgres up @@ -252,7 +267,29 @@ See the [docker-compose file](docker/docker-compose.yml) for a sample configurat * `spring.kafka.custom.concurrency` (default: `3`) is the number of threads for the Kafka listener that will import events from Zeebe * `spring.kafka.custom.retry.intervalMs` (default: `30000`) and `spring.kafka.custom.retry.max-attempts` (default: `3`) are the retry configurations for a retryable exception in the listener -Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profiles presets: `kafka,kafka_in_memory` +Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profile presets: `kafka,kafka_in_memory` + +#### Change the default Zeebe importer to Redis + +* set the `zeebe-importer` (default: `hazelcast`) configuration property to `redis` +* adjust the importer settings under `zeebe.client.worker.redis` (complete default values below): + ``` + zeebe: + client: + broker.gatewayAddress: 127.0.0.1:26500 + security.plaintext: true + + worker: + redis: + connection: redis://localhost:6379 + consumer-group: simple-monitor + xread-count: 500 + xread-block-millis: 2000 + + zeebe-importer: redis + ``` + +Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Redis importer. Profile presets: `redis,redis_in_memory` ## Code of Conduct diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4a736193..f76e90a4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -43,6 +43,23 @@ services: profiles: - kafka + zeebe-redis: + container_name: zeebe-broker-redis + image: ghcr.io/camunda-community-hub/zeebe-with-redis-exporter:8.4.0 + environment: + - ZEEBE_REDIS_REMOTE_ADDRESS=redis://redis:6379 + - ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS=900 + - ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE=true + ports: + - "26500:26500" + - "9600:9600" + networks: + - zeebe_network + volumes: + - ./redis/application.yaml:/usr/local/zeebe/config/application.yaml + profiles: + - redis + zookeeper: image: docker.io/bitnami/zookeeper:3.8 ports: @@ -73,6 +90,16 @@ services: profiles: - kafka + redis: + container_name: redis_cache + image: redis:7-alpine + ports: + - "6379:6379" + networks: + - zeebe_network + profiles: + - redis + simple-monitor-in-memory: container_name: zeebe-simple-monitor-in-memory image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2 @@ -106,6 +133,22 @@ services: profiles: - kafka_in_memory + simple-monitor-in-memory-redis: + container_name: zeebe-simple-monitor-in-memory-redis + image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.6.4 + environment: + - zeebe.client.broker.gateway-address=zeebe:26500 + - zeebe-importer=redis + - zeebe.client.worker.redis.connection=redis://redis:6379 + ports: + - "8082:8082" + depends_on: + - zeebe-redis + networks: + - zeebe_network + profiles: + - redis_in_memory + simple-monitor-postgres: container_name: zeebe-simple-monitor-postgres image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2 diff --git a/docker/redis/application.yaml b/docker/redis/application.yaml new file mode 100644 index 00000000..9c720f15 --- /dev/null +++ b/docker/redis/application.yaml @@ -0,0 +1,6 @@ +zeebe: + broker: + exporters: + redis: + className: io.zeebe.redis.exporter.RedisExporter + jarPath: exporters/zeebe-redis-exporter-jar-with-dependencies.jar diff --git a/docs/how-it-works.png b/docs/how-it-works.png index d4e5da7c..c04e6d6d 100644 Binary files a/docs/how-it-works.png and b/docs/how-it-works.png differ diff --git a/pom.xml b/pom.xml index 67a29a3e..88999243 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 8.3.4 8.4.0 1.4.0 + 0.9.9 5.0.0 @@ -73,6 +74,12 @@ ${hazelcast.exporter.version} + + io.zeebe.redis + zeebe-redis-connector + ${redis.exporter.version} + + com.h2database h2 @@ -142,6 +149,11 @@ zeebe-hazelcast-connector + + io.zeebe.redis + zeebe-redis-connector + + io.camunda zeebe-protocol-jackson diff --git a/src/main/java/io/zeebe/monitor/config/RedisConfig.java b/src/main/java/io/zeebe/monitor/config/RedisConfig.java new file mode 100644 index 00000000..e147237b --- /dev/null +++ b/src/main/java/io/zeebe/monitor/config/RedisConfig.java @@ -0,0 +1,38 @@ +package io.zeebe.monitor.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +@ConditionalOnProperty(name = "zeebe-importer", havingValue = "redis") +@Configuration +public class RedisConfig { + + @Value("${zeebe.client.worker.redis.connection}") + private String redisConnection; + + @Value("${zeebe.client.worker.redis.consumer-group:simple-monitor}") + private String redisConumerGroup; + + @Value("${zeebe.client.worker.redis.xread-count:500}") + private int redisXreadCount; + + @Value("${zeebe.client.worker.redis.xread-block-millis:2000}") + private int redisXreadBlockMillis; + + public String getRedisConnection() { + return redisConnection; + } + + public String getRedisConumerGroup() { + return redisConumerGroup; + } + + public int getRedisXreadCount() { + return redisXreadCount; + } + + public int getRedisXreadBlockMillis() { + return redisXreadBlockMillis; + } +} diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java index 2ec8ae35..f0fb258d 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java @@ -5,14 +5,14 @@ import io.zeebe.hazelcast.connect.java.ZeebeHazelcast; import io.zeebe.monitor.entity.HazelcastConfig; import io.zeebe.monitor.repository.HazelcastConfigRepository; -import io.zeebe.monitor.zeebe.hazelcast.importers.ErrorHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.IncidentHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.JobHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.MessageHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.MessageSubscriptionHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.ProcessAndElementHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.TimerHazelcastImporter; -import io.zeebe.monitor.zeebe.hazelcast.importers.VariableHazelcastImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.MessageProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.MessageSubscriptionProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.ProcessAndElementProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.TimerProtobufImporter; +import io.zeebe.monitor.zeebe.protobuf.importers.VariableProtobufImporter; import java.util.function.Consumer; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; @@ -21,14 +21,14 @@ @Component public class HazelcastImportService { - @Autowired private ProcessAndElementHazelcastImporter processAndElementImporter; - @Autowired private VariableHazelcastImporter variableImporter; - @Autowired private JobHazelcastImporter jobImporter; - @Autowired private IncidentHazelcastImporter incidentImporter; - @Autowired private MessageHazelcastImporter messageImporter; - @Autowired private MessageSubscriptionHazelcastImporter messageSubscriptionImporter; - @Autowired private TimerHazelcastImporter timerImporter; - @Autowired private ErrorHazelcastImporter errorImporter; + @Autowired private ProcessAndElementProtobufImporter processAndElementImporter; + @Autowired private VariableProtobufImporter variableImporter; + @Autowired private JobProtobufImporter jobImporter; + @Autowired private IncidentProtobufImporter incidentImporter; + @Autowired private MessageProtobufImporter messageImporter; + @Autowired private MessageSubscriptionProtobufImporter messageSubscriptionImporter; + @Autowired private TimerProtobufImporter timerImporter; + @Autowired private ErrorProtobufImporter errorImporter; @Autowired private HazelcastConfigRepository hazelcastConfigRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java similarity index 92% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java index 01cf948f..136fe5bc 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ErrorHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.ErrorEntity; @@ -7,7 +7,7 @@ import org.springframework.stereotype.Component; @Component -public class ErrorHazelcastImporter { +public class ErrorProtobufImporter { @Autowired private ErrorRepository errorRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java similarity index 94% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java index 4a8625f5..79057c1b 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/IncidentHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.IncidentIntent; import io.zeebe.exporter.proto.Schema; @@ -8,7 +8,7 @@ import org.springframework.stereotype.Component; @Component -public class IncidentHazelcastImporter { +public class IncidentProtobufImporter { @Autowired private IncidentRepository incidentRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java similarity index 93% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index ca84e8c6..401a5ae5 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/JobHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.JobIntent; import io.zeebe.exporter.proto.Schema; @@ -8,7 +8,7 @@ import org.springframework.stereotype.Component; @Component -public class JobHazelcastImporter { +public class JobProtobufImporter { @Autowired private JobRepository jobRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java similarity index 93% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java index 99506c3c..cd3154b6 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.MessageIntent; import io.zeebe.exporter.proto.Schema; @@ -8,7 +8,7 @@ import org.springframework.stereotype.Component; @Component -public class MessageHazelcastImporter { +public class MessageProtobufImporter { @Autowired private MessageRepository messageRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java similarity index 96% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java index 18d39d05..7d513661 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/MessageSubscriptionHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @Component -public class MessageSubscriptionHazelcastImporter { +public class MessageSubscriptionProtobufImporter { @Autowired private MessageSubscriptionRepository messageSubscriptionRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java similarity index 98% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java index 515ec58f..df8c3c7a 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.Protocol; import io.camunda.zeebe.protocol.record.intent.Intent; @@ -15,7 +15,7 @@ import org.springframework.stereotype.Component; @Component -public class ProcessAndElementHazelcastImporter { +public class ProcessAndElementProtobufImporter { @Autowired private ProcessRepository processRepository; @Autowired private ProcessInstanceRepository processInstanceRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java similarity index 94% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java index 47fdd026..06bd1973 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/TimerHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.TimerIntent; import io.zeebe.exporter.proto.Schema; @@ -8,7 +8,7 @@ import org.springframework.stereotype.Component; @Component -public class TimerHazelcastImporter { +public class TimerProtobufImporter { @Autowired private TimerRepository timerRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java similarity index 92% rename from src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java rename to src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java index 4b056741..3cdef87c 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.protobuf.importers; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.VariableEntity; @@ -7,7 +7,7 @@ import org.springframework.stereotype.Component; @Component -public class VariableHazelcastImporter { +public class VariableProtobufImporter { @Autowired private VariableRepository variableRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java b/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java new file mode 100644 index 00000000..efe093f2 --- /dev/null +++ b/src/main/java/io/zeebe/monitor/zeebe/redis/RedisImportService.java @@ -0,0 +1,85 @@ +package io.zeebe.monitor.zeebe.redis; + +import com.hazelcast.core.HazelcastInstance; +import io.lettuce.core.RedisClient; +import io.zeebe.exporter.proto.Schema; +import io.zeebe.hazelcast.connect.java.ZeebeHazelcast; +import io.zeebe.monitor.config.RedisConfig; +import io.zeebe.monitor.entity.HazelcastConfig; +import io.zeebe.monitor.repository.HazelcastConfigRepository; +import io.zeebe.monitor.zeebe.protobuf.importers.*; +import io.zeebe.redis.connect.java.ZeebeRedis; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; +import java.util.function.Function; + +@Component +public class RedisImportService { + + @Autowired private ProcessAndElementProtobufImporter processAndElementImporter; + @Autowired private VariableProtobufImporter variableImporter; + @Autowired private JobProtobufImporter jobImporter; + @Autowired private IncidentProtobufImporter incidentImporter; + @Autowired private MessageProtobufImporter messageImporter; + @Autowired private MessageSubscriptionProtobufImporter messageSubscriptionImporter; + @Autowired private TimerProtobufImporter timerImporter; + @Autowired private ErrorProtobufImporter errorImporter; + + public ZeebeRedis importFrom(final RedisClient redisClient, RedisConfig redisConfig) { + + final var builder = + ZeebeRedis.newBuilder(redisClient) + .consumerGroup(redisConfig.getRedisConumerGroup()) + .xreadCount(redisConfig.getRedisXreadCount()).xreadBlockMillis(redisConfig.getRedisXreadBlockMillis()) + .addProcessListener( + record -> ifEvent(record, Schema.ProcessRecord::getMetadata, processAndElementImporter::importProcess)) + .addProcessInstanceListener( + record -> + ifEvent( + record, + Schema.ProcessInstanceRecord::getMetadata, + processAndElementImporter::importProcessInstance)) + .addIncidentListener( + record -> ifEvent(record, Schema.IncidentRecord::getMetadata, incidentImporter::importIncident)) + .addJobListener( + record -> ifEvent(record, Schema.JobRecord::getMetadata, jobImporter::importJob)) + .addVariableListener( + record -> ifEvent(record, Schema.VariableRecord::getMetadata, variableImporter::importVariable)) + .addTimerListener( + record -> ifEvent(record, Schema.TimerRecord::getMetadata, timerImporter::importTimer)) + .addMessageListener( + record -> ifEvent(record, Schema.MessageRecord::getMetadata, messageImporter::importMessage)) + .addMessageSubscriptionListener( + record -> + ifEvent( + record, + Schema.MessageSubscriptionRecord::getMetadata, + messageSubscriptionImporter::importMessageSubscription)) + .addMessageStartEventSubscriptionListener( + record -> + ifEvent( + record, + Schema.MessageStartEventSubscriptionRecord::getMetadata, + messageSubscriptionImporter::importMessageStartEventSubscription)) + .addErrorListener(errorImporter::importError); + + return builder.build(); + } + + private void ifEvent( + final T record, + final Function extractor, + final Consumer consumer) { + final var metadata = extractor.apply(record); + if (isEvent(metadata)) { + consumer.accept(record); + } + } + + private boolean isEvent(final Schema.RecordMetadata metadata) { + return metadata.getRecordType() == Schema.RecordMetadata.RecordType.EVENT; + } + +} diff --git a/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java b/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java new file mode 100644 index 00000000..83f2df07 --- /dev/null +++ b/src/main/java/io/zeebe/monitor/zeebe/redis/ZeebeRedisService.java @@ -0,0 +1,49 @@ +package io.zeebe.monitor.zeebe.redis; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.zeebe.monitor.config.RedisConfig; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@ConditionalOnProperty(name = "zeebe-importer", havingValue = "redis") +@Component +public class ZeebeRedisService { + + private static final Logger LOG = LoggerFactory.getLogger(ZeebeRedisService.class); + + private RedisConfig config; + + public ZeebeRedisService(RedisConfig redisConfig) { + this.config = redisConfig; + } + + @Autowired + private RedisImportService importService; + + private AutoCloseable closeable; + + @PostConstruct + public void start() { + var redisUri = RedisURI.create(config.getRedisConnection()); + + LOG.info("Connecting to Redis {}, consumer group {}", redisUri, config.getRedisConumerGroup()); + var redisClient = RedisClient.create(redisUri); + + LOG.info("Importing records from Redis..."); + closeable = importService.importFrom(redisClient, config); + } + + @PreDestroy + public void close() throws Exception { + if (closeable != null) { + closeable.close(); + } + } +} diff --git a/src/main/resources/application-kafka.yaml b/src/main/resources/application-kafka.yaml index 0bef7fff..60ea0dfb 100644 --- a/src/main/resources/application-kafka.yaml +++ b/src/main/resources/application-kafka.yaml @@ -1,5 +1,5 @@ -# Options: hazelcast | kafka +# Options: hazelcast | kafka | redis # This config switches importers between the provided -# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively -# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast and services.zeebe-kafka +# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively +# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis zeebe-importer: kafka diff --git a/src/main/resources/application-redis.yaml b/src/main/resources/application-redis.yaml new file mode 100644 index 00000000..03872b9d --- /dev/null +++ b/src/main/resources/application-redis.yaml @@ -0,0 +1,5 @@ +# Options: hazelcast | kafka | redis +# This config switches importers between the provided +# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively +# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis +zeebe-importer: redis diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index c42cca04..8b7b30da 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -8,11 +8,13 @@ zeebe: connection: localhost:5701 clusterName: dev connectionTimeout: PT30S + redis: + connection: redis://localhost:6379 -# Options: hazelcast | kafka +# Options: hazelcast | kafka | redis # This config switches importers between the provided -# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively -# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast and services.zeebe-kafka +# To use each of them, zeebe must be configured using hazelcast-exporter, kafka-exporter or redis-exporter, respectively +# See the examples in docker/docker-compose.yml in services.zeebe-hazelcast, services.zeebe-kafka, services-zeebe-redis zeebe-importer: hazelcast spring: diff --git a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java index 6dcadaec..063ab431 100644 --- a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java +++ b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/ProcessAndElementHazelcastImporterTest.java @@ -8,6 +8,7 @@ import io.zeebe.monitor.repository.ElementInstanceRepository; import io.zeebe.monitor.repository.ZeebeRepositoryTest; import io.zeebe.monitor.zeebe.ZeebeNotificationService; +import io.zeebe.monitor.zeebe.protobuf.importers.ProcessAndElementProtobufImporter; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.MockBean; @@ -15,13 +16,13 @@ import org.springframework.test.context.ContextConfiguration; @ContextConfiguration( - classes = {ProcessAndElementHazelcastImporter.class, + classes = {ProcessAndElementProtobufImporter.class, ZeebeNotificationService.class} ) public class ProcessAndElementHazelcastImporterTest extends ZeebeRepositoryTest { @Autowired - ProcessAndElementHazelcastImporter processAndElementImporter; + ProcessAndElementProtobufImporter processAndElementImporter; @Autowired ElementInstanceRepository elementInstanceRepository; diff --git a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java index 674ec203..100e9fac 100644 --- a/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java +++ b/src/test/java/io/zeebe/monitor/zeebe/hazelcast/importers/VariableHazelcastImporterTest.java @@ -6,17 +6,18 @@ import io.zeebe.monitor.entity.VariableEntity; import io.zeebe.monitor.repository.VariableRepository; import io.zeebe.monitor.repository.ZeebeRepositoryTest; +import io.zeebe.monitor.zeebe.protobuf.importers.VariableProtobufImporter; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; @ContextConfiguration( - classes = {VariableHazelcastImporter.class} + classes = {VariableProtobufImporter.class} ) public class VariableHazelcastImporterTest extends ZeebeRepositoryTest { @Autowired - VariableHazelcastImporter variableImporter; + VariableProtobufImporter variableImporter; @Autowired VariableRepository variableRepository;