Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving 2 tests from KafkaMP to KafkaSE #1918

Merged
merged 2 commits into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,43 @@

package io.helidon.messaging.connectors.kafka;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;

public abstract class AbstractKafkaTest {

private static final Logger LOGGER = Logger.getLogger(AbstractKafkaTest.class.getName());

static String KAFKA_SERVER;

@RegisterExtension
Expand All @@ -36,4 +67,67 @@ public abstract class AbstractKafkaTest {
static void prepareTopics() {
KAFKA_SERVER = kafkaResource.getKafkaConnectString();
}

static <T> void produceSync(String topic, Map<String, Object> config, List<T> testData) {
try (Producer<Object, T> producer = new KafkaProducer<>(config)) {
LOGGER.fine(() -> "Producing " + testData.size() + " events");
//Send all test messages(async send means order is not guaranteed) and in parallel
List<Future<RecordMetadata>> sent = testData.parallelStream()
.map(s -> producer.send(new ProducerRecord<>(topic, s))).collect(Collectors.toList());
sent.stream().forEach(future -> {
try {
future.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail("Some of next messages were not sent in time: " + testData, e);
}
});
}
}

static void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String> testData, String topic,
List<String> expected) {
produceAndCheck(kafkaConsumingBean, testData, topic, expected, expected.size());
}

static void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String> testData, String topic,
List<String> expected, long requested) {
kafkaConsumingBean.expectedRequests(requested);
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", KAFKA_SERVER);
config.put("key.serializer", LongSerializer.class.getName());
config.put("value.serializer", StringSerializer.class.getName());

produceSync(topic, config, testData);
if (requested > 0) {
// Wait till records are delivered
boolean done = kafkaConsumingBean.await();
assertTrue(done, String.format("Timeout waiting for results.\nExpected: %s \nBut was: %s",
expected.toString(), kafkaConsumingBean.consumed().toString()));
}
Collections.sort(kafkaConsumingBean.consumed());
Collections.sort(expected);
if (!expected.isEmpty()) {
assertEquals(expected, kafkaConsumingBean.consumed());
}
}

static List<String> readTopic(String topic, int expected, String group){
final long timeout = 30000;
List<String> events = new LinkedList<>();
Map<String, Object> config = new HashMap<>();
config.put("enable.auto.commit", Boolean.toString(true));
config.put("auto.offset.reset", "earliest");
config.put("bootstrap.servers", KAFKA_SERVER);
config.put("group.id", group);
config.put("key.deserializer", LongDeserializer.class.getName());
config.put("value.deserializer", StringDeserializer.class.getName());
try (Consumer<Object, String> consumer = new KafkaConsumer<>(config)) {
consumer.subscribe(Arrays.asList(topic));
long current = System.currentTimeMillis();
while (events.size() < expected && System.currentTimeMillis() - current < timeout) {
consumer.poll(Duration.ofSeconds(5)).forEach(c -> events.add(c.value()));
}
}
return events;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,10 @@ public void onComplete() {
}
}

@ApplicationScoped
public static class Channel6 extends AbstractSampleBean {
static final String NO_ACK = "noAck";

@Incoming("test-channel-6")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<String> channel6(KafkaMessage<Long, String> msg) {
public CompletionStage<String> onMsg(KafkaMessage<Long, String> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload()));
consumed().add(msg.getPayload());
// Certain messages are not ACK. We can check later that they will be sent again.
Expand All @@ -221,12 +218,11 @@ public CompletionStage<String> channel6(KafkaMessage<Long, String> msg) {
} else {
LOGGER.fine(() -> "ACK is not sent");
}
countDown("channel6()");
countDown("channel6()");
return CompletableFuture.completedFuture(null);
}
}

@ApplicationScoped

public static class Channel8 extends AbstractSampleBean {
static final String NO_ACK = "noAck";
static final int LIMIT = 10;
Expand All @@ -236,9 +232,7 @@ public static class Channel8 extends AbstractSampleBean {
// Limit is for one scenario that Kafka rebalances and sends again same data in different partition
private final AtomicInteger limit = new AtomicInteger();

@Incoming("test-channel-8")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<String> channel6(KafkaMessage<Long, String> msg) {
public CompletionStage<String> onMsg(KafkaMessage<Long, String> msg) {
ConsumerRecord<Long, String> record = msg.unwrap(ConsumerRecord.class);
consumed().add(record.value());
// Certain messages are not ACK. We can check later that they will be sent again.
Expand Down
Loading