Skip to content

Commit

Permalink
Merge pull request #329 from n1hility/ejbclient-285
Browse files Browse the repository at this point in the history
Fixes for EJBCLIENT-285 and EJBCLIENT-284
  • Loading branch information
n1hility authored Oct 16, 2017
2 parents 6a88b98 + e342815 commit d2c6432
Show file tree
Hide file tree
Showing 18 changed files with 734 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
<version.org.wildfly.common>1.2.0.Final</version.org.wildfly.common>
<version.org.wildfly.naming.client>1.0.4.Final</version.org.wildfly.naming.client>
<version.org.wildfly.discovery>1.0.0.Final</version.org.wildfly.discovery>
<version.org.wildfly.security.elytron>1.1.0.Final</version.org.wildfly.security.elytron>
<version.org.wildfly.security.elytron>1.1.5.Final</version.org.wildfly.security.elytron>
<version.org.wildfly.transaction-client>1.0.2.Final</version.org.wildfly.transaction-client>

<version.org.jboss.bridger>1.4.Final</version.org.jboss.bridger>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/jboss/ejb/_private/Logs.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ public interface Logs extends BasicLogger {
@Message(id = 80, value = "Request not sent")
IllegalStateException requestNotSent();

@Message(id = 81, value = "Failed to instantiate cluster node selector class \"%s\"")
IllegalArgumentException cannotInstantiateClustertNodeSelector(String name, @Cause ReflectiveOperationException e);

// Proxy API errors

@Message(id = 100, value = "Object '%s' is not a valid proxy object")
Expand Down Expand Up @@ -416,6 +419,9 @@ public interface Logs extends BasicLogger {
@Message(id = 509, value = "Unexpected exception processing EJB request")
void unexpectedException(@Cause Throwable t);

@Message(id = 510, value = "Failed to configure SSL context")
IOException failedToConfigureSslContext(@Cause Throwable cause);

// Remote messages; no ID for brevity but should be translated

@Message(value = "No such EJB: %s")
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/jboss/ejb/client/AbstractInvocationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.LinkedHashMap;
import java.util.Map;

import javax.transaction.Transaction;

import org.wildfly.common.Assert;
import org.wildfly.transaction.client.AbstractTransaction;

Expand All @@ -41,6 +39,21 @@ public abstract class AbstractInvocationContext extends Attachable {
private Affinity weakAffinity = Affinity.NONE;
private URI destination;
private Affinity targetAffinity;
private String initialCluster;

/**
* Gets the initial cluster assignment by discovery, if any
*
* @return the initial cluster if assigned
*/
public String getInitialCluster() {
return initialCluster;
}

void setInitialCluster(String initialCluster) {
this.initialCluster = initialCluster;
}

private Map<String, Object> contextData;
private AbstractTransaction transaction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

import javax.ejb.NoSuchEJBException;
Expand Down Expand Up @@ -336,31 +338,36 @@ private List<Throwable> doFirstMatchDiscovery(AbstractInvocationContext context,
if (fallbackFilterSpec != null) {
assert context.getLocator().getAffinity() instanceof ClusterAffinity;
Logs.INVOCATION.tracef("Performed first-match discovery, no match, falling back to cluster discovery");
final List<Throwable> problems2 = doClusterDiscovery(context, fallbackFilterSpec);
if (problems2.isEmpty()) {
return problems;
} else if (problems.isEmpty()) {
return problems2;
} else {
final ArrayList<Throwable> problems3 = new ArrayList<>(problems.size() + problems2.size());
problems3.addAll(problems);
problems3.addAll(problems2);
return problems3;
}
return merge(problems, doClusterDiscovery(context, fallbackFilterSpec));
} else {
// no match!
Logs.INVOCATION.tracef("Performed first-match discovery, no match");
}
return problems;
}

private static List<Throwable> merge(List<Throwable> problems, List<Throwable> problems2) {
if (problems2.isEmpty()) {
return problems;
} else if (problems.isEmpty()) {
return problems2;
} else {
final ArrayList<Throwable> problems3 = new ArrayList<>(problems.size() + problems2.size());
problems3.addAll(problems);
problems3.addAll(problems2);
return problems3;
}
}

private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final FilterSpec filterSpec, final EJBLocator<?> locator) {
Logs.INVOCATION.tracef("Performing any discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
final List<Throwable> problems;
// blacklist
final Set<URI> blacklist = context.getAttachment(BL_KEY);
final Map<URI, String> nodes = new HashMap<>();
final Map<String, URI> uris = new HashMap<>();
final Map<URI, List<String>> clusterAssociations = new HashMap<>();

int nodeless = 0;
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
Expand All @@ -382,7 +389,24 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
nodeless++;
}
}
context.setDestination(location);

// Handle multiple cluster specifications per entry, and also multiple entries with
// cluster specifications that refer to the same URI. Currently multi-membership is
// represented in the latter form, however, handle the first form as well, just in
// case this changes in the future.
final List<AttributeValue> clusters = serviceURL.getAttributeValues(FILTER_ATTR_CLUSTER);
if (clusters != null) {
for (AttributeValue cluster : clusters) {
List<String> list = clusterAssociations.putIfAbsent(location, Collections.singletonList(cluster.toString()));
if (list != null) {
if (!(list instanceof ArrayList)) {
list = new ArrayList<>(list);
clusterAssociations.put(location, list);
}
list.add(cluster.toString());
}
}
}
}
}
problems = queue.getProblems();
Expand All @@ -405,8 +429,7 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
Logs.INVOCATION.tracef("Performed first-match discovery(target affinity(node) = %s, destination = %s)", nodeName, location);
} else if (nodeless == 0) {
// use the deployment node selector
// todo: configure on client context
DeploymentNodeSelector selector = DeploymentNodeSelector.RANDOM;
DeploymentNodeSelector selector = context.getClientContext().getDeploymentNodeSelector();
nodeName = selector.selectNode(nodes.values().toArray(NO_STRINGS), locator.getAppName(), locator.getModuleName(), locator.getDistinctName());
if (nodeName == null) {
throw Logs.INVOCATION.selectorReturnedNull(selector);
Expand All @@ -430,11 +453,31 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
Logs.INVOCATION.tracef("Performed first-match discovery, nodes > 1, URI selector used(target affinity(node) = %s, destination = %s)", nodeName, location);
}

// TODO DeploymentNodeSelector should be enhanced to handle URIs that are members of more than one cluster

// Clients typically do not have an auth policy for nodes which are dynamically discovered
// from cluster topology info. Anytime such a node is selected, we must register the
// associated cluster with the invocation, so that an effective auth config can be
// determined. Randomly pick a cluster if there is more than one.
selectCluster(context, clusterAssociations, location);
context.setDestination(location);
if (nodeName != null) context.setTargetAffinity(new NodeAffinity(nodeName));
return problems;
}

private void selectCluster(AbstractInvocationContext context, Map<URI, List<String>> clusterAssociations, URI location) {
List<String> associations = clusterAssociations.get(location);
String cluster = null;
if (associations != null) {
cluster = (associations.size() == 1) ? associations.get(0) :
associations.get(ThreadLocalRandom.current().nextInt(associations.size()));

}
if (cluster != null) {
context.setInitialCluster(cluster);
}
}

private List<Throwable> doClusterDiscovery(AbstractInvocationContext context, final FilterSpec filterSpec) {
Logs.INVOCATION.tracef("Performing cluster discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
Map<String, URI> nodes = new HashMap<>();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/jboss/ejb/client/EJBClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ ClusterNodeSelector getClusterNodeSelector() {
return clusterNodeSelector;
}

DeploymentNodeSelector getDeploymentNodeSelector() {
return deploymentNodeSelector;
}

static final class ClassInterceptor {
private final String className;
private final EJBClientInterceptorInformation interceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jboss.ejb._private.Logs;
import org.jboss.ejb.client.ClusterNodeSelector;
import org.jboss.ejb.client.DeploymentNodeSelector;
import org.jboss.ejb.client.EJBClientConnection;
import org.jboss.ejb.client.EJBClientContext;
Expand Down Expand Up @@ -72,6 +74,22 @@ public static void configure(final EJBClientContext.Builder builder) {
builder.setDeploymentNodeSelector(deploymentNodeSelector);
}

Map<String, JBossEJBProperties.ClusterConfiguration> clusters = properties.getClusterConfigurations();
if (clusters != null) {
for (JBossEJBProperties.ClusterConfiguration cluster : clusters.values()) {
ExceptionSupplier<ClusterNodeSelector, ReflectiveOperationException> selectorSupplier = cluster.getClusterNodeSelectorSupplier();
if (selectorSupplier != null) {
try {
builder.setClusterNodeSelector(selectorSupplier.get());
} catch (ReflectiveOperationException e) {
throw Logs.MAIN.cannotInstantiateClustertNodeSelector(cluster.getClusterNodeSelectorClassName(), e);
}
// We only support one selector currently
break;
}
}
}

if (properties.getInvocationTimeout() != -1L) {
builder.setInvocationTimeout(properties.getInvocationTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.jboss.ejb.protocol.remote;

import java.net.URI;
import java.util.List;

/**
Expand All @@ -29,7 +30,7 @@ interface DiscoveredNodeRegistry {

List<NodeInformation> getAllNodeInformation();

void addNode(String clusterName, String nodeName);
void addNode(String clusterName, String nodeName, URI registeredBy);

void removeNode(String clusterName, String nodeName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private void processMessage(final MessageInputStream message) {
int memberCount = StreamUtils.readPackedSignedInt32(message);
for (int j = 0; j < memberCount; j ++) {
final String nodeName = message.readUTF();
discoveredNodeRegistry.addNode(clusterName, nodeName);
discoveredNodeRegistry.addNode(clusterName, nodeName, channel.getConnection().getPeerURI());
final NodeInformation nodeInformation = discoveredNodeRegistry.getNodeInformation(nodeName);
Logs.INVOCATION.debugf("Received CLUSTER_TOPOLOGY(%x) message, registering cluster %s to node %s", msg, clusterName, nodeName);

Expand Down
17 changes: 14 additions & 3 deletions src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

import javax.ejb.CreateException;

import org.jboss.ejb.client.AbstractInvocationContext;
import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.AttachmentKey;
import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.EJBReceiver;
import org.jboss.ejb.client.EJBReceiverContext;
import org.jboss.ejb.client.EJBReceiverInvocationContext;
Expand Down Expand Up @@ -122,7 +125,7 @@ EJBClientChannel getClientChannel(final Connection connection) throws IOExceptio

protected void processInvocation(final EJBReceiverInvocationContext receiverContext) throws Exception {
final AuthenticationContext authenticationContext = receiverContext.getAuthenticationContext();
final IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(receiverContext.getClientInvocationContext().getDestination(), authenticationContext);
final IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(receiverContext.getClientInvocationContext(), receiverContext.getClientInvocationContext().getDestination(), authenticationContext);
// this actually causes the invocation to move forward
futureConnection.addNotifier(notifier, receiverContext);
}
Expand All @@ -140,7 +143,7 @@ protected SessionID createSession(final EJBReceiverSessionCreationContext contex
final StatelessEJBLocator<?> statelessLocator = context.getClientInvocationContext().getLocator().asStateless();
final AuthenticationContext authenticationContext = context.getAuthenticationContext();
try {
IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(context.getClientInvocationContext().getDestination(), authenticationContext);
IoFuture<ConnectionPeerIdentity> futureConnection = getConnection(context.getClientInvocationContext(), context.getClientInvocationContext().getDestination(), authenticationContext);
final ConnectionPeerIdentity identity = futureConnection.getInterruptibly();
final EJBClientChannel ejbClientChannel = getClientChannel(identity.getConnection());
final StatefulEJBLocator<?> result = ejbClientChannel.openSession(statelessLocator, identity, context.getClientInvocationContext());
Expand Down Expand Up @@ -170,7 +173,15 @@ protected boolean isConnected(final URI uri) {
}
}

private IoFuture<ConnectionPeerIdentity> getConnection(final URI target, @NotNull AuthenticationContext authenticationContext) throws Exception {
private IoFuture<ConnectionPeerIdentity> getConnection(final AbstractInvocationContext context, final URI target, @NotNull AuthenticationContext authenticationContext) throws Exception {
Affinity affinity = context.getLocator().getAffinity();
String cluster = (affinity instanceof ClusterAffinity) ? ((ClusterAffinity) affinity).getClusterName() : context.getInitialCluster();

if (cluster != null) {
return doPrivileged((PrivilegedAction<IoFuture<ConnectionPeerIdentity>>) () ->
discoveredNodeRegistry.getConnectedIdentityUsingClusterEffective(Endpoint.getCurrent(), target, "ejb", "jboss", authenticationContext, cluster));
}

return doPrivileged((PrivilegedAction<IoFuture<ConnectionPeerIdentity>>) () -> Endpoint.getCurrent().getConnectedIdentity(target, "ejb", "jboss", authenticationContext));
}
}
Loading

0 comments on commit d2c6432

Please sign in to comment.