Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-18661: clean-ups #8911

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.apache.camel.cdi.Uri;
import org.apache.camel.cdi.bean.SimpleCamelRoute;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.CamelContextInitializedEvent;
import org.apache.camel.spi.CamelEvent.CamelContextInitializingEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStartedEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStartingEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStoppedEvent;
Expand All @@ -55,6 +56,7 @@
import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;

@RunWith(Arquillian.class)
public class CamelEventNotifierTest {
Expand All @@ -71,12 +73,12 @@ public class CamelEventNotifierTest {
@ApplicationScoped
private List<Class> firedEvents = new ArrayList<>();

private void onCamelContextStartingEvent(@Observes CamelEvent.CamelContextInitializingEvent event, List<Class> events) {
events.add(CamelEvent.CamelContextInitializingEvent.class);
private void onCamelContextStartingEvent(@Observes CamelContextInitializingEvent event, List<Class> events) {
events.add(CamelContextInitializingEvent.class);
}

private void onCamelContextStartingEvent(@Observes CamelEvent.CamelContextInitializedEvent event, List<Class> events) {
events.add(CamelEvent.CamelContextInitializedEvent.class);
private void onCamelContextStartingEvent(@Observes CamelContextInitializedEvent event, List<Class> events) {
events.add(CamelContextInitializedEvent.class);
}

private void onCamelContextStartingEvent(@Observes CamelContextStartingEvent event, List<Class> events) {
Expand Down Expand Up @@ -115,8 +117,8 @@ public static Archive<?> deployment() {
public void startedCamelContext(List<Class> events) {
assertThat("Events fired are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class));
}
Expand All @@ -131,10 +133,11 @@ public void sendMessageToInbound(List<Class> events) throws InterruptedException

assertIsSatisfied(2L, TimeUnit.SECONDS, outbound);

assertThat("Events fired are incorrect!", events,
assertThat("Events count is incorrect!", events.size(), equalTo(12));
assertThat("Events types are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class,
ExchangeSendingEvent.class,
Expand All @@ -152,10 +155,11 @@ public void sendMessageToInbound(List<Class> events) throws InterruptedException
public void stopCamelContext(CamelContext context, List<Class> events) {
context.stop();

assertThat("Events fired are incorrect!", events,
assertThat("Events count is incorrect!", events.size(), equalTo(14));
assertThat("Events types are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class,
ExchangeSendingEvent.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

Expand All @@ -29,6 +28,7 @@
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
Expand All @@ -49,18 +49,28 @@
import static org.junit.jupiter.api.Assertions.assertFalse;

class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
private static final Executor DELAYED = CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3));

CurrentSpanTest() {
super(new SpanTestData[0]);
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
context.addComponent("asyncmock1", new AsyncMockComponent());
context.addComponent("asyncmock2", new AsyncMockComponent());
context.addComponent("asyncmock3", new AsyncMockComponent());
context.addComponent("syncmock", new SyncMockComponent());

return context;
}

@Test
void testSync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:bar").setUri("direct://bar").setOperation("bar").setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("direct:bar").setUri("direct://bar").setOperation("bar")
};

// sync pipeline
Expand All @@ -78,7 +88,7 @@ void testSyncToAsync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:foo").setUri("direct://foo").setOperation("foo").setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("direct:foo").setUri("direct://foo").setOperation("foo")
};

// sync to async pipeline
Expand All @@ -97,8 +107,7 @@ void testAsyncToSync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1")
.setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1"),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1")
.setKind(SpanKind.CLIENT),
};
Expand All @@ -116,8 +125,7 @@ void testAsyncToAsync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("asyncmock2:result").setUri("asyncmock2://result").setOperation("asyncmock2")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2")
.setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2"),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2")
.setKind(SpanKind.CLIENT),
};
Expand All @@ -140,7 +148,6 @@ void testMulticastAsync() {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:start").setUri("direct://start").setOperation("start")
.setKind(SpanKind.INTERNAL)
};

// sync pipeline
Expand Down Expand Up @@ -168,11 +175,6 @@ protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
context.addComponent("asyncmock1", new AsyncMockComponent());
context.addComponent("asyncmock2", new AsyncMockComponent());
context.addComponent("asyncmock3", new AsyncMockComponent());
context.addComponent("syncmock", new SyncMockComponent());

// sync pipeline
from("direct:bar").to("syncmock:result");

Expand Down Expand Up @@ -221,8 +223,7 @@ public void configure() {
}
return newExchange;
})
.executorService(Executors.newFixedThreadPool(10))
.parallelProcessing()
.executorService(context.getExecutorServiceManager().newFixedThreadPool(this, "CurrentSpanTest", 10))
.streaming()
.delay(10)
.to("log:line", "asyncmock1:start")
Expand All @@ -233,7 +234,7 @@ public void configure() {
};
}

private class AsyncMockComponent extends MockComponent {
private static class AsyncMockComponent extends MockComponent {

@Override
protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) {
Expand All @@ -242,6 +243,9 @@ protected Endpoint createEndpoint(String uri, String key, Map<String, Object> pa
}

private static class AsyncMockEndpoint extends MockEndpoint {
private static final Executor DELAYED
= CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3));

private Consumer consumer;
private final String key;

Expand Down Expand Up @@ -299,15 +303,15 @@ private Consumer getConsumer(long timeout) throws InterruptedException {
}
}

private class SyncMockComponent extends MockComponent {
private static class SyncMockComponent extends MockComponent {

@Override
protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) {
return new SyncMockEndpoint(this, uri, key);
}
}

private class SyncMockEndpoint extends MockEndpoint {
private static class SyncMockEndpoint extends MockEndpoint {
public SyncMockEndpoint(SyncMockComponent component, String uri, String key) {
super(uri, component);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void notify(CamelEvent event) throws Exception {
if (event instanceof CamelEvent.ExchangeSendingEvent) {
CamelEvent.ExchangeSendingEvent ese = (CamelEvent.ExchangeSendingEvent) event;
SpanDecorator sd = getSpanDecorator(ese.getEndpoint());
if (exclude(sd, ese.getExchange(), ese.getEndpoint())) {
if (shouldExclude(sd, ese.getExchange(), ese.getEndpoint())) {
return;
}

Expand All @@ -254,7 +254,7 @@ public void notify(CamelEvent event) throws Exception {
} else if (event instanceof CamelEvent.ExchangeSentEvent) {
CamelEvent.ExchangeSentEvent ese = (CamelEvent.ExchangeSentEvent) event;
SpanDecorator sd = getSpanDecorator(ese.getEndpoint());
if (exclude(sd, ese.getExchange(), ese.getEndpoint())) {
if (shouldExclude(sd, ese.getExchange(), ese.getEndpoint())) {
return;
}

Expand Down Expand Up @@ -283,7 +283,7 @@ public void notify(CamelEvent event) throws Exception {
}
}

private boolean exclude(SpanDecorator sd, Exchange exchange, Endpoint endpoint) {
private boolean shouldExclude(SpanDecorator sd, Exchange exchange, Endpoint endpoint) {
return sd instanceof AbstractInternalSpanDecorator || !sd.newSpan()
|| isExcluded(exchange, endpoint);
}
Expand Down