Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MP Reactive Messaging impl #1287

Merged
merged 26 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
abdb9c8
MP Reactive Messaging impl
danielkec Jan 10, 2020
253efc1
Arquillian manually managed deploy fix after native compatibility cdi…
danielkec Jan 16, 2020
5186a41
Routing assembly at proper time
danielkec Feb 25, 2020
5a2e648
Generic vararg warning cleanup
danielkec Feb 25, 2020
4e441de
Fix test container tear-down
danielkec Feb 26, 2020
a62998c
Error collecting
danielkec Feb 27, 2020
405ed38
Flatten packages
danielkec Mar 3, 2020
aa2e74e
Fix Arquillian tests pollution
danielkec Mar 3, 2020
dccd26b
Register feature
danielkec Mar 3, 2020
ecd1e07
Rename AbstractMethod
danielkec Mar 3, 2020
852c38c
Clean up raw generics
danielkec Mar 3, 2020
c6c4c77
Config merging cleanup
danielkec Mar 4, 2020
ceb722f
Final fields cleanup
danielkec Mar 4, 2020
00d8a4e
Proper acknowledgement
danielkec Mar 4, 2020
1883360
Param assigment
danielkec Mar 4, 2020
3e8aefe
Add module-info
danielkec Mar 4, 2020
2dbfb84
Get rid of intermediate blocking by queuing completables and keeping …
danielkec Mar 4, 2020
db5ef08
Fix racing tests
danielkec Mar 5, 2020
be87033
CompletableQueue has maximum size
danielkec Mar 5, 2020
9c183f8
Move MP api dependency to dependencies/pom.xml
danielkec Mar 5, 2020
f6a60ea
Wait for whole stream in async tests
danielkec Mar 5, 2020
011fe39
Reactive Streams module-info
danielkec Mar 5, 2020
46992ab
Clean up tck poms
danielkec Mar 5, 2020
e1c9111
Werker build fix
danielkec Mar 5, 2020
26830e4
Copyright fix
danielkec Mar 5, 2020
8e38830
Unify exceptions
danielkec Mar 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,18 @@

<!-- Reactive Streams Operators -->
<dependency>
<groupId>io.helidon.microprofile</groupId>
<groupId>io.helidon.microprofile.reactive-streams</groupId>
<artifactId>helidon-microprofile-reactive-streams</artifactId>
<version>${helidon.version}</version>
</dependency>

<!-- Reactive Messaging -->
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging</artifactId>
<version>${helidon.version}</version>
</dependency>

<!-- integrations -->
<dependency>
<groupId>io.helidon.serviceconfiguration</groupId>
Expand Down
10 changes: 9 additions & 1 deletion common/common/src/main/java/io/helidon/common/Errors.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Errors utility used to file processing messages (e.g. validation, provider, resource building errors, hint).
Expand Down Expand Up @@ -156,6 +157,13 @@ public boolean log(Logger logger) {
return true;
}

@Override
public String toString() {
return this.stream()
.map(ErrorMessage::toString)
.collect(Collectors.joining("\n"));
}

/**
* Check if these messages are a valid result.
*
Expand Down
18 changes: 18 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
<version.lib.microprofile-fault-tolerance-api>2.0.2</version.lib.microprofile-fault-tolerance-api>
<version.lib.microprofile-tracing>1.3.1</version.lib.microprofile-tracing>
<version.lib.microprofile-rest-client>1.3.3</version.lib.microprofile-rest-client>
<version.lib.microprofile-reactive-messaging-api>1.0</version.lib.microprofile-reactive-messaging-api>
<version.lib.microprofile-reactive-streams-operators-api>1.0.1</version.lib.microprofile-reactive-streams-operators-api>
<version.lib.microprofile-reactive-streams-operators-core>1.0.1</version.lib.microprofile-reactive-streams-operators-core>
<version.lib.mockito>2.23.4</version.lib.mockito>
<version.lib.mongodb.reactivestreams>1.11.0</version.lib.mongodb.reactivestreams>
<version.lib.mysql-connector-java>8.0.11</version.lib.mysql-connector-java>
Expand Down Expand Up @@ -485,6 +488,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
<version>${version.lib.microprofile-reactive-streams-operators-api}</version>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-core</artifactId>
<version>${version.lib.microprofile-reactive-streams-operators-core}</version>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>${version.lib.microprofile-reactive-messaging-api}</version>
</dependency>
danielkec marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
Expand Down
73 changes: 73 additions & 0 deletions microprofile/messaging/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2020 Oracle and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.microprofile</groupId>
<artifactId>helidon-microprofile-project</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>

<groupId>io.helidon.microprofile.messaging</groupId>
<artifactId>helidon-microprofile-messaging</artifactId>
<name>Helidon MicroProfile Reactive Messaging</name>
<description>
Helidon MicroProfile Reactive Messaging
</description>

<dependencies>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<exclusions>
<exclusion>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.config</groupId>
<artifactId>helidon-microprofile-config</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.server</groupId>
<artifactId>helidon-microprofile-server</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.reactive-streams</groupId>
<artifactId>helidon-microprofile-reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>javax.interceptor</groupId>
<artifactId>javax.interceptor-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>internal-test-libs</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.microprofile.messaging;

import java.lang.reflect.Method;
import java.util.Optional;

import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;

import io.helidon.common.Errors;
import io.helidon.config.Config;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;

abstract class AbstractMessagingMethod {

private String incomingChannelName;
private String outgoingChannelName;

private Bean<?> bean;
private Object beanInstance;
private MethodSignatureType type;
private final Method method;
private final Errors.Collector errors;
private Acknowledgment.Strategy ackStrategy;


AbstractMessagingMethod(Method method, Errors.Collector errors) {
this.method = method;
this.errors = errors;
Optional<MethodSignatureType> signatureType = MethodSignatureResolver
.create(method)
.resolve();
if (signatureType.isPresent()) {
this.type = signatureType.get();
resolveAckStrategy();
} else {
errors.fatal("Unsupported method signature " + method);
}
}

void validate() {
Optional.ofNullable(method.getAnnotation(Acknowledgment.class))
.map(Acknowledgment::value)
.filter(s -> !type.getSupportedAckStrategies().contains(s))
.ifPresent(strategy -> {
errors.fatal(String.format("Acknowledgment strategy %s is not supported for method signature: %s",
strategy, type));
});
}

void init(BeanManager beanManager, Config config) {
this.beanInstance = ChannelRouter.lookup(bean, beanManager);
}

Method getMethod() {
return method;
}

Errors.Collector errors() {
return errors;
}

Object getBeanInstance() {
return beanInstance;
}

void setDeclaringBean(Bean<?> bean) {
this.bean = bean;
}

Class<?> getDeclaringType() {
return method.getDeclaringClass();
}

String getIncomingChannelName() {
return incomingChannelName;
}

String getOutgoingChannelName() {
return outgoingChannelName;
}

void setIncomingChannelName(String incomingChannelName) {
this.incomingChannelName = incomingChannelName;
}

void setOutgoingChannelName(String outgoingChannelName) {
this.outgoingChannelName = outgoingChannelName;
}

MethodSignatureType getType() {
return type;
}

void setType(MethodSignatureType type) {
this.type = type;
}

Acknowledgment.Strategy getAckStrategy() {
return ackStrategy;
}

private void resolveAckStrategy() {
ackStrategy =
Optional.ofNullable(method.getAnnotation(Acknowledgment.class))
.map(Acknowledgment::value)
.orElse(type.getDefaultAckType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.microprofile.messaging;

import java.util.HashMap;
import java.util.Map;

import io.helidon.config.Config;
import io.helidon.config.ConfigSources;

/**
* Detached configuration of a single connector.
*/
class AdHocConfigBuilder {
private final Map<String, String> configuration = new HashMap<>();

private AdHocConfigBuilder() {
}

static AdHocConfigBuilder from(Config config) {
AdHocConfigBuilder result = new AdHocConfigBuilder();
result.putAll(config);
return result;
}

AdHocConfigBuilder put(String key, String value) {
configuration.put(key, value);
return this;
}

AdHocConfigBuilder putAll(Config configToPut) {
configuration.putAll(configToPut.detach().asMap().orElse(Map.of()));
return this;
}

org.eclipse.microprofile.config.Config build() {
Config newConfig = Config.builder(ConfigSources.create(configuration))
.disableEnvironmentVariablesSource()
.disableSystemPropertiesSource()
.disableFilterServices()
.disableSourceServices()
.disableParserServices()
.build();
return (org.eclipse.microprofile.config.Config) newConfig;
}
}
Loading