Skip to content

Commit

Permalink
feat: implement send of events in kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreaGiulianelli committed Apr 23, 2023
1 parent bc3d5d0 commit 046ee50
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/env/infrastructure/events/KafkaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import application.controller.manager.EventManager;
import application.controller.manager.EventSender;
import application.presenter.event.model.Event;
import application.presenter.event.model.automation.proposal.MedicalTechnologyAutomationProposalEvent;
import application.presenter.event.serialization.EventDeserializer;
import application.presenter.event.serialization.EventDeserializerImpl;
import application.presenter.event.serialization.EventSerializerImpl;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -21,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;

/**
Expand All @@ -31,6 +34,7 @@ public class KafkaClient implements EventManager, EventSender {
private static final String SCHEMA_REGISTRY_URL_VARIABLE = "SCHEMA_REGISTRY_URL";
private static final String ROOM_EVENT_TOPIC = "room-events";
private static final String MEDICAL_TECHNOLOGY_EVENT_TOPIC = "process-events";
private static final String AUTOMATION_PROPOSAL_EVENT_TOPIC = "automation-proposal-events";
private static final long POLLING_TIME = 100L;

private static KafkaClient instance;
Expand Down Expand Up @@ -68,11 +72,15 @@ protected KafkaClient() {

@Override
public final void notify(final Event<?> eventToSend) {
this.kafkaProducer.send(new ProducerRecord<>(
getTopicFromEventKey(eventToSend.getKey()),
eventToSend.getKey(),
eventToSend)
);
this.getTopicFromEventKey(eventToSend.getKey()).ifPresent(topic ->
new EventSerializerImpl().eventToString(eventToSend).ifPresent(serializedEvent ->
this.kafkaProducer.send(new ProducerRecord<>(
topic,
eventToSend.getKey(),
serializedEvent
)
)
));
}

@Override
Expand All @@ -95,8 +103,11 @@ private Map<String, Object> loadConfiguration(final String boostrapServerUrl, fi
);
}

private String getTopicFromEventKey(final String eventKey) {
// todo: switch with the key of the event
return eventKey;
private Optional<String> getTopicFromEventKey(final String eventKey) {
if (MedicalTechnologyAutomationProposalEvent.MEDICAL_TECHNOLOGY_AUTOMATION_PROPOSAL_EVENT_KEY.equals(eventKey)) {
return Optional.of(AUTOMATION_PROPOSAL_EVENT_TOPIC);
} else {
return Optional.empty();
}
}
}

0 comments on commit 046ee50

Please sign in to comment.