Skip to content

Commit

Permalink
CAMEL-18661: clean-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
bvahdat committed Dec 16, 2022
1 parent 002c9ca commit ae25327
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.apache.camel.cdi.Uri;
import org.apache.camel.cdi.bean.SimpleCamelRoute;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.CamelContextInitializedEvent;
import org.apache.camel.spi.CamelEvent.CamelContextInitializingEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStartedEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStartingEvent;
import org.apache.camel.spi.CamelEvent.CamelContextStoppedEvent;
Expand All @@ -55,6 +56,7 @@
import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;

@RunWith(Arquillian.class)
public class CamelEventNotifierTest {
Expand All @@ -71,12 +73,12 @@ public class CamelEventNotifierTest {
@ApplicationScoped
private List<Class> firedEvents = new ArrayList<>();

private void onCamelContextStartingEvent(@Observes CamelEvent.CamelContextInitializingEvent event, List<Class> events) {
events.add(CamelEvent.CamelContextInitializingEvent.class);
private void onCamelContextStartingEvent(@Observes CamelContextInitializingEvent event, List<Class> events) {
events.add(CamelContextInitializingEvent.class);
}

private void onCamelContextStartingEvent(@Observes CamelEvent.CamelContextInitializedEvent event, List<Class> events) {
events.add(CamelEvent.CamelContextInitializedEvent.class);
private void onCamelContextStartingEvent(@Observes CamelContextInitializedEvent event, List<Class> events) {
events.add(CamelContextInitializedEvent.class);
}

private void onCamelContextStartingEvent(@Observes CamelContextStartingEvent event, List<Class> events) {
Expand Down Expand Up @@ -115,8 +117,8 @@ public static Archive<?> deployment() {
public void startedCamelContext(List<Class> events) {
assertThat("Events fired are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class));
}
Expand All @@ -131,10 +133,11 @@ public void sendMessageToInbound(List<Class> events) throws InterruptedException

assertIsSatisfied(2L, TimeUnit.SECONDS, outbound);

assertThat("Events fired are incorrect!", events,
assertThat("Events count is incorrect!", events.size(), equalTo(12));
assertThat("Events types are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class,
ExchangeSendingEvent.class,
Expand All @@ -152,10 +155,11 @@ public void sendMessageToInbound(List<Class> events) throws InterruptedException
public void stopCamelContext(CamelContext context, List<Class> events) {
context.stop();

assertThat("Events fired are incorrect!", events,
assertThat("Events count is incorrect!", events.size(), equalTo(14));
assertThat("Events types are incorrect!", events,
contains(
CamelEvent.CamelContextInitializingEvent.class,
CamelEvent.CamelContextInitializedEvent.class,
CamelContextInitializingEvent.class,
CamelContextInitializedEvent.class,
CamelContextStartingEvent.class,
CamelContextStartedEvent.class,
ExchangeSendingEvent.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

Expand All @@ -29,6 +28,7 @@
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
Expand All @@ -49,18 +49,28 @@
import static org.junit.jupiter.api.Assertions.assertFalse;

class CurrentSpanTest extends CamelOpenTelemetryTestSupport {
private static final Executor DELAYED = CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3));

CurrentSpanTest() {
super(new SpanTestData[0]);
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
context.addComponent("asyncmock1", new AsyncMockComponent());
context.addComponent("asyncmock2", new AsyncMockComponent());
context.addComponent("asyncmock3", new AsyncMockComponent());
context.addComponent("syncmock", new SyncMockComponent());

return context;
}

@Test
void testSync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:bar").setUri("direct://bar").setOperation("bar").setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("direct:bar").setUri("direct://bar").setOperation("bar")
};

// sync pipeline
Expand All @@ -78,7 +88,7 @@ void testSyncToAsync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:foo").setUri("direct://foo").setOperation("foo").setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("direct:foo").setUri("direct://foo").setOperation("foo")
};

// sync to async pipeline
Expand All @@ -97,8 +107,7 @@ void testAsyncToSync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1")
.setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1"),
new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1")
.setKind(SpanKind.CLIENT),
};
Expand All @@ -116,8 +125,7 @@ void testAsyncToAsync() {
SpanTestData[] expectedSpans = {
new SpanTestData().setLabel("asyncmock2:result").setUri("asyncmock2://result").setOperation("asyncmock2")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2")
.setKind(SpanKind.INTERNAL),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2"),
new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2")
.setKind(SpanKind.CLIENT),
};
Expand All @@ -140,7 +148,6 @@ void testMulticastAsync() {
new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock")
.setKind(SpanKind.CLIENT),
new SpanTestData().setLabel("direct:start").setUri("direct://start").setOperation("start")
.setKind(SpanKind.INTERNAL)
};

// sync pipeline
Expand Down Expand Up @@ -168,11 +175,6 @@ protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
context.addComponent("asyncmock1", new AsyncMockComponent());
context.addComponent("asyncmock2", new AsyncMockComponent());
context.addComponent("asyncmock3", new AsyncMockComponent());
context.addComponent("syncmock", new SyncMockComponent());

// sync pipeline
from("direct:bar").to("syncmock:result");

Expand Down Expand Up @@ -221,8 +223,7 @@ public void configure() {
}
return newExchange;
})
.executorService(Executors.newFixedThreadPool(10))
.parallelProcessing()
.executorService(context.getExecutorServiceManager().newFixedThreadPool(this, "CurrentSpanTest", 10))
.streaming()
.delay(10)
.to("log:line", "asyncmock1:start")
Expand All @@ -233,7 +234,7 @@ public void configure() {
};
}

private class AsyncMockComponent extends MockComponent {
private static class AsyncMockComponent extends MockComponent {

@Override
protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) {
Expand All @@ -242,6 +243,9 @@ protected Endpoint createEndpoint(String uri, String key, Map<String, Object> pa
}

private static class AsyncMockEndpoint extends MockEndpoint {
private static final Executor DELAYED
= CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3));

private Consumer consumer;
private final String key;

Expand Down Expand Up @@ -299,15 +303,15 @@ private Consumer getConsumer(long timeout) throws InterruptedException {
}
}

private class SyncMockComponent extends MockComponent {
private static class SyncMockComponent extends MockComponent {

@Override
protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) {
return new SyncMockEndpoint(this, uri, key);
}
}

private class SyncMockEndpoint extends MockEndpoint {
private static class SyncMockEndpoint extends MockEndpoint {
public SyncMockEndpoint(SyncMockComponent component, String uri, String key) {
super(uri, component);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void notify(CamelEvent event) throws Exception {
if (event instanceof CamelEvent.ExchangeSendingEvent) {
CamelEvent.ExchangeSendingEvent ese = (CamelEvent.ExchangeSendingEvent) event;
SpanDecorator sd = getSpanDecorator(ese.getEndpoint());
if (exclude(sd, ese.getExchange(), ese.getEndpoint())) {
if (shouldExclude(sd, ese.getExchange(), ese.getEndpoint())) {
return;
}

Expand All @@ -254,7 +254,7 @@ public void notify(CamelEvent event) throws Exception {
} else if (event instanceof CamelEvent.ExchangeSentEvent) {
CamelEvent.ExchangeSentEvent ese = (CamelEvent.ExchangeSentEvent) event;
SpanDecorator sd = getSpanDecorator(ese.getEndpoint());
if (exclude(sd, ese.getExchange(), ese.getEndpoint())) {
if (shouldExclude(sd, ese.getExchange(), ese.getEndpoint())) {
return;
}

Expand Down Expand Up @@ -283,7 +283,7 @@ public void notify(CamelEvent event) throws Exception {
}
}

private boolean exclude(SpanDecorator sd, Exchange exchange, Endpoint endpoint) {
private boolean shouldExclude(SpanDecorator sd, Exchange exchange, Endpoint endpoint) {
return sd instanceof AbstractInternalSpanDecorator || !sd.newSpan()
|| isExcluded(exchange, endpoint);
}
Expand Down

0 comments on commit ae25327

Please sign in to comment.