Skip to content

Commit

Permalink
chore(#1281): reproducer for parallel access into kafka consumer
Browse files Browse the repository at this point in the history
added a reproducer for #1281.
indeed the test which is part of the `KafkaEndpointJavaIT` throws:

```shell
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125)
```
  • Loading branch information
bbortt committed Jan 9, 2025
1 parent eae8054 commit ec41afa
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.citrusframework.internal;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.SOURCE;

/**
* Annotation to reference a GitHub issue number associated with a test class or method.
* It serves as a linking mechanism between reproducers (tests proving certain behavior) and their corresponding GitHub issues.
* <p>
* Example usage:
* <pre>
* {@code @GitHubIssue(1234)
* public class MyTest {
* // Class implementation
* }
*
* {@code @GitHubIssue(5678)
* public void testMethod() {
* // Method implementation
* }}
* </pre>
*/
@Retention(SOURCE)
@Target({METHOD, TYPE})
public @interface GitHubIssue {

/**
* The GitHub issue number to reference.
*
* @return the issue number in <a href="https://github.com/citrusframework/citrus/issues">the GitHub repository</a>
*/
int value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.time.Instant.now;
import static java.util.Collections.singletonList;
import static java.util.Objects.isNull;
Expand Down Expand Up @@ -140,13 +140,13 @@ public Message receive(String selector, TestContext testContext, long timeout) {
private List<ConsumerRecord<Object, Object>> findMessageWithTimeout(String topic, long timeout) {
logger.trace("Applied timeout is {} ms", timeout);

ExecutorService executorService = newSingleThreadExecutor();
var executorService = newSingleThreadExecutor();
final Future<List<ConsumerRecord<Object, Object>>> handler = executorService.submit(() -> findMessagesSatisfyingMatcher(topic));

try {
return handler.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
currentThread().interrupt();
throw new CitrusRuntimeException("Thread was interrupted while waiting for Kafka message", e);
} catch (ExecutionException e) {
throw new CitrusRuntimeException(format("Failed to receive message on Kafka topic '%s'", topic), e);
Expand All @@ -157,8 +157,8 @@ private List<ConsumerRecord<Object, Object>> findMessageWithTimeout(String topic

throw new MessageTimeoutException(timeout, topic, e);
} finally {
executorService.shutdownNow();
consumer.unsubscribe();
executorService.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.citrusframework.annotations.CitrusTest;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.exceptions.TestCaseFailedException;
import org.citrusframework.internal.GitHubIssue;
import org.citrusframework.kafka.endpoint.KafkaEndpoint;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector;
import org.citrusframework.kafka.message.KafkaMessage;
Expand All @@ -33,6 +34,7 @@
import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive;
import static org.citrusframework.actions.SendMessageAction.Builder.send;
import static org.citrusframework.actions.SleepAction.Builder.sleep;
import static org.citrusframework.container.Parallel.Builder.parallel;
import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH;
Expand Down Expand Up @@ -274,4 +276,35 @@ public void findKafkaEvent_headerEquals_java_DSL() {
);
}

@CitrusTest
@GitHubIssue(1281)
public void parallel_access_thread_safety() {
var body = "parallel_access_thread_safety";

var key = "Name";

var brother1 = "Elladan";
var brother2 = "Elrohir";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother1))
);

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother2))
);


then(
parallel()
.actions(
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother1)
.body(body),
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother2)
.body(body)
)
);
}
}

0 comments on commit ec41afa

Please sign in to comment.