Skip to content

Commit

Permalink
Redesign in Kafka tests (#1847)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
  • Loading branch information
jbescos authored May 25, 2020
1 parent 6e588d4 commit db6dcae
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(org.eclipse.mi
.config(MpConfig.toHelidonConfig(config))
.scheduler(scheduler)
.build();

LOGGER.fine(() -> String.format("Resource %s added", publisher));
resources.add(publisher);
return ReactiveStreams.fromPublisher(publisher);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public void stop() {
}
}
if (failed.isEmpty()) {
LOGGER.fine("KafkaConnectorFactory terminated successfuly");
LOGGER.fine("KafkaConnector terminated successfuly");
} else {
// Inform about the errors
failed.forEach(e -> LOGGER.log(Level.SEVERE, "An error happened closing resource", e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private KafkaPublisher(ScheduledExecutorService scheduler, Supplier<Consumer<K,
* This execution runs in one thread that is triggered by the scheduler.
*/
private void start() {
LOGGER.fine(() -> "KafkaPublisher starts to consume from Kafka");
LOGGER.fine(() -> String.format("%s Start to consume", topics));
try {
kafkaConsumer = consumerSupplier.get();
kafkaConsumer.subscribe(topics, partitionsAssignedLatch);
Expand All @@ -126,16 +126,19 @@ private void start() {
try {
kafkaConsumer.poll(Duration.ofMillis(pollTimeout)).forEach(backPressureBuffer::add);
if (!backPressureBuffer.isEmpty()) {
LOGGER.fine(() -> String.format("Poll from consumer: %s", backPressureBuffer));
LOGGER.fine(() -> String.format("%s Poll: %s", topics, backPressureBuffer));
}
} catch (WakeupException e) {
LOGGER.fine(() -> "It was requested to stop polling from channel");
LOGGER.fine(() -> String.format("%s It was requested to stop polling from channel", topics));
}
} else {
long totalToEmit = requests.get();
// Avoid index out bound exceptions
long eventsToEmit = Math.min(totalToEmit, backPressureBuffer.size());
LOGGER.fine(() -> String.format("%s messages to emit", eventsToEmit));
if (eventsToEmit > 0) {
LOGGER.fine(() -> String.format("%s %s messages to emit. %s in buffer and %s requested",
topics, eventsToEmit, backPressureBuffer.size(), totalToEmit));
}
for (long i = 0; i < eventsToEmit; i++) {
ConsumerRecord<K, V> cr = backPressureBuffer.poll();
CompletableFuture<Void> kafkaCommit = new CompletableFuture<>();
Expand All @@ -162,7 +165,7 @@ private void start() {
processACK();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "KafkaPublisher failed", e);
LOGGER.log(Level.SEVERE, "KafkaPublisher " + topics + " failed", e);
emitter.fail(e);
} finally {
taskLock.unlock();
Expand Down Expand Up @@ -206,12 +209,12 @@ private void processACK() {
}
if (highest != null) {
OffsetAndMetadata offset = new OffsetAndMetadata(highest.getPayload().offset() + 1);
LOGGER.fine(() -> String.format("Will commit %s %s", entry.getKey(), offset));
LOGGER.fine(() -> String.format("%s Will commit %s %s", topics, entry.getKey(), offset));
offsets.put(entry.getKey(), offset);
}
}
if (!messagesToCommit.isEmpty()) {
LOGGER.fine(() -> String.format("Offsets %s", offsets));
LOGGER.fine(() -> String.format("%s Offsets %s", topics, offsets));
try {
kafkaConsumer.commitSync(offsets);
messagesToCommit.stream().forEach(message -> message.kafkaCommit().complete(null));
Expand All @@ -228,14 +231,15 @@ private void processACK() {
* It must be invoked after {@link ScheduledExecutorService} is shutdown.
*/
public void stop() {
LOGGER.fine(() -> String.format("%s Requested to stop", topics));
if (kafkaConsumer != null) {
// Stops pooling
kafkaConsumer.wakeup();
// Wait that current task finishes in case it is still running
try {
taskLock.lock();
cleanResourcesIfTerminated(true);
LOGGER.fine(() -> String.format("Buffered events that were not processed %s", backPressureBuffer));
LOGGER.fine(() -> String.format("%s Buffered events that were not processed %s", topics, backPressureBuffer));
emitter.complete();
} catch (RuntimeException e) {
emitter.fail(e);
Expand All @@ -255,11 +259,11 @@ public void stop() {
private void cleanResourcesIfTerminated(boolean isTerminated) {
if (!stopped && isTerminated) {
stopped = true;
LOGGER.fine(() -> "Pending ACKs: " + pendingCommits.size());
LOGGER.fine(() -> String.format("%s Pending ACKs: %s", topics, pendingCommits.size()));
// Terminate waiting ACKs
pendingCommits.values().stream().flatMap(List::stream)
.forEach(message -> message.kafkaCommit()
.completeExceptionally(new TimeoutException("Aborted because KafkaPublisher is terminated")));
.completeExceptionally(new TimeoutException(topics + " Aborted because KafkaPublisher is terminated")));
kafkaConsumer.close();
}
}
Expand Down Expand Up @@ -292,6 +296,10 @@ void waitForPartitionAssigment(long timeout, TimeUnit unit) throws InterruptedEx
}
}

List<String> topics() {
return topics;
}

/**
* A builder for KafkaPublisher.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public SubscriberBuilder<Message<ConsumerRecord<Long, String>>, Void> channel5()
.to(new Subscriber<Message<ConsumerRecord<Long, String>>>() {
@Override
public void onSubscribe(Subscription subscription) {
LOGGER.fine(() -> "onSubscribe()");
LOGGER.fine(() -> "channel5 onSubscribe()");
subscription.request(3);
}
@Override
Expand Down Expand Up @@ -229,27 +229,35 @@ public CompletionStage<String> channel6(Message<ConsumerRecord<Long, String>> ms
@ApplicationScoped
public static class Channel8 extends AbstractSampleBean {
static final String NO_ACK = "noAck";
static final int LIMIT = 10;

private final AtomicInteger partitionIdOfNoAck = new AtomicInteger(-1);
private final AtomicInteger uncommitted = new AtomicInteger();
// Limit is for one scenario that Kafka rebalances and sends again same data in different partition
private final AtomicInteger limit = new AtomicInteger();

@Incoming("test-channel-8")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<String> channel6(Message<ConsumerRecord<Long, String>> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload().value()));
ConsumerRecord<Long, String> record = msg.unwrap(ConsumerRecord.class);
consumed().add(record.value());
// Certain messages are not ACK. We can check later that they will be sent again.
if (!NO_ACK.equals(msg.getPayload().value())) {
LOGGER.fine(() -> "ACK sent");
msg.ack().whenComplete((a, b) -> countDown("channel8()"));
} else {
if (NO_ACK.equals(msg.getPayload().value())) {
LOGGER.fine(() -> String.format("NO_ACK. Received %s", msg.getPayload().value()));
partitionIdOfNoAck.set(record.partition());
LOGGER.fine(() -> "ACK is not sent");
}
if (record.partition() == partitionIdOfNoAck.get()) {
LOGGER.fine(() -> record.value() + " will not be committed to Kafka");
uncommitted.getAndIncrement();
countDown("channel8()");
countDown("no_ack channel8()");
} else {
if (limit.getAndIncrement() == LIMIT) {
throw new IllegalStateException("Avoid the Kafka rebalance fix");
}else if (record.partition() == partitionIdOfNoAck.get()) {
LOGGER.fine(() -> String.format("NO_ACK. Received %s", msg.getPayload().value()));
uncommitted.getAndIncrement();
countDown("no_ack channel8()");
} else {
LOGGER.fine(() -> String.format("ACK. Received %s", msg.getPayload().value()));
msg.ack().whenComplete((a, b) -> countDown("ack channel8()"));
}
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Loading

0 comments on commit db6dcae

Please sign in to comment.