diff --git a/pom.xml b/pom.xml
index 2d3490b3b..818477f38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,12 +44,12 @@
1.5.2.Final
2.0.0.Beta2
1.4.3.Final
- 5.0.0.Beta14
+ 5.0.0.Beta15
1.0.0.Final
1.0.0.Final
3.4.2.Final
1.7
- 1.2.0.Beta4
+ 1.2.0.Beta5
1.0.0.Beta8
1.0.0.Beta5
1.1.0.Beta17
diff --git a/protocol.txt b/protocol.txt
index 9789fa048..dd07c7669 100644
--- a/protocol.txt
+++ b/protocol.txt
@@ -420,7 +420,7 @@ If a session open request is sent for a bean which is not a stateful bean, then
│││mapping count │
││├───────────────┤
│││┌──────────────┴─┐ - for each mapping count
- ││││Client Netmask │ - packed integer, LSB 0 = ipv6, 1 = ipv4; bits 4-n are the netmask bits (only bits 1-8 are used)
+ ││││Client Netmask │ - packed integer, LSB 0 = ipv6, 1 = ipv4; bits 1-n are the netmask bits (only bits 1-8 are used)
│││├────────────────┤
││││Client Source IP│ - IP bytes
│││├────────────────┤
diff --git a/src/main/java/org/jboss/ejb/protocol/remote/NetworkUtil.java b/src/main/java/org/jboss/ejb/_private/NetworkUtil.java
similarity index 72%
rename from src/main/java/org/jboss/ejb/protocol/remote/NetworkUtil.java
rename to src/main/java/org/jboss/ejb/_private/NetworkUtil.java
index a2b73f9b6..43d84c2f1 100644
--- a/src/main/java/org/jboss/ejb/protocol/remote/NetworkUtil.java
+++ b/src/main/java/org/jboss/ejb/_private/NetworkUtil.java
@@ -1,26 +1,22 @@
/*
* JBoss, Home of Professional Open Source.
- * Copyright 2012, Red Hat, Inc., and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * Copyright 2017 Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags.
*
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.jboss.ejb.protocol.remote;
+package org.jboss.ejb._private;
import java.net.Inet4Address;
import java.net.Inet6Address;
@@ -29,7 +25,7 @@
/**
* @author Jaikiran Pai
*/
-class NetworkUtil {
+public class NetworkUtil {
private static int getInt(byte[] b, int offs) {
return (b[offs] & 0xff) << 24 | (b[offs + 1] & 0xff) << 16 | (b[offs + 2] & 0xff) << 8 | b[offs + 3] & 0xff;
@@ -85,9 +81,9 @@ public static boolean belongsToNetwork(final InetAddress address, final InetAddr
}
}
- static String formatPossibleIpv6Address(String address) {
+ public static String formatPossibleIpv6Address(String address) {
if (address == null) {
- return address;
+ return null;
}
if (!address.contains(":")) {
return address;
diff --git a/src/main/java/org/jboss/ejb/client/EJBClientContext.java b/src/main/java/org/jboss/ejb/client/EJBClientContext.java
index 8691373df..e153eaf66 100644
--- a/src/main/java/org/jboss/ejb/client/EJBClientContext.java
+++ b/src/main/java/org/jboss/ejb/client/EJBClientContext.java
@@ -24,6 +24,11 @@
import static java.security.AccessController.doPrivileged;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -33,12 +38,15 @@
import java.util.function.Supplier;
import org.jboss.ejb._private.Logs;
+import org.jboss.ejb._private.NetworkUtil;
import org.wildfly.common.Assert;
import org.wildfly.common.context.ContextManager;
import org.wildfly.common.context.Contextual;
+import org.wildfly.discovery.AttributeValue;
import org.wildfly.discovery.Discovery;
import org.wildfly.discovery.FilterSpec;
import org.wildfly.discovery.ServiceType;
+import org.wildfly.discovery.ServiceURL;
import org.wildfly.discovery.ServicesQueue;
/**
@@ -78,6 +86,10 @@ public final class EJBClientContext extends Attachable implements Contextual, T> R discoverFirst(L locator, final LocatedAction values = serviceURL.getAttributeValues(FILTER_ATTR_SOURCE_IP);
+ boolean matches = values.isEmpty();
+ if (matches) {
+ // we got one!
+ break;
+ } else {
+ SocketAddress sourceAddress = receiver.getSourceAddress(uri);
+ InetAddress inetAddress;
+ if (sourceAddress instanceof InetSocketAddress) {
+ inetAddress = ((InetSocketAddress) sourceAddress).getAddress();
+ } else {
+ inetAddress = null;
+ }
+ int bestNetmask;
+ for (AttributeValue value : values) try {
+ if (! value.isString()) {
+ continue;
+ }
+ final String string = value.toString();
+ if (string.codePointAt(0) != '[') {
+ continue;
+ }
+ int closeBrace = string.indexOf(']', 1);
+ if (closeBrace == -1) {
+ continue;
+ }
+ final InetAddress matchAddress = InetAddress.getByName(string.substring(1, closeBrace));
+ if (string.codePointAt(closeBrace + 1) != '/') {
+ continue;
+ }
+ final int mask = Integer.parseInt(string.substring(closeBrace + 2));
+
+ // now do the test
+ if (inetAddress == null) {
+ if (mask == 0) {
+ // it's zero, so we can just break out now because we have the only match we're gonna get
+ break;
+ }
+ // else fall out because we have no source address to test
+ } else if (NetworkUtil.belongsToNetwork(inetAddress, matchAddress, mask)) {
+ // matched!
+ break;
+ }
+ } catch (RuntimeException ignored) {
+ // it's not a valid entry, so ignore it
+ }
+
+ // try again
}
}
}
+ final URI uri = serviceURL.getLocationURI();
+ final EJBReceiver receiver = getTransportProvider(uri.getScheme());
+ if (receiver != null) {
+ return locatedAction.execute(receiver, locator, Affinity.forUri(uri));
+ } else {
+ throw Logs.MAIN.noEJBReceiverAvailable(locator);
+ }
}
EJBClientInterceptor[] getInterceptors() {
return interceptors;
}
+
+ private static int getInt(byte[] b, int offs) {
+ return (b[offs] & 0xff) << 24 | (b[offs + 1] & 0xff) << 16 | (b[offs + 2] & 0xff) << 8 | b[offs + 3] & 0xff;
+ }
+
+ private static long getLong(byte[] b, int offs) {
+ return (getInt(b, offs) & 0xFFFFFFFFL) << 32 | getInt(b, offs + 4) & 0xFFFFFFFFL;
+ }
+
+ private static int nwsl(int arg, int places) {
+ return places <= 0 ? arg : places >= 32 ? 0 : arg << places;
+ }
+
+ private static long nwsl(long arg, int places) {
+ return places <= 0 ? arg : places >= 64 ? 0L : arg << places;
+ }
}
diff --git a/src/main/java/org/jboss/ejb/client/EJBReceiver.java b/src/main/java/org/jboss/ejb/client/EJBReceiver.java
index cd18b0384..38ca19215 100644
--- a/src/main/java/org/jboss/ejb/client/EJBReceiver.java
+++ b/src/main/java/org/jboss/ejb/client/EJBReceiver.java
@@ -22,6 +22,9 @@
package org.jboss.ejb.client;
+import java.net.SocketAddress;
+import java.net.URI;
+
/**
* A receiver for EJB invocations. Receivers can be associated with one or more client contexts. This interface is
* implemented by providers for EJB invocation services.
@@ -71,4 +74,14 @@ protected boolean cancelInvocation(EJBReceiverInvocationContext receiverContext,
* session bean
*/
protected abstract StatefulEJBLocator createSession(final StatelessEJBLocator statelessLocator) throws Exception;
+
+ /**
+ * Query the expected or actual source IP address configured for the given target URI.
+ *
+ * @param uri the supported URI of the peer (not {@code null})
+ * @return the socket address, or {@code null} if none is known
+ */
+ protected SocketAddress getSourceAddress(final URI uri) {
+ return null;
+ }
}
diff --git a/src/main/java/org/jboss/ejb/client/legacy/CommonLegacyConfiguration.java b/src/main/java/org/jboss/ejb/client/legacy/CommonLegacyConfiguration.java
index ec7d30807..db5c7fda4 100644
--- a/src/main/java/org/jboss/ejb/client/legacy/CommonLegacyConfiguration.java
+++ b/src/main/java/org/jboss/ejb/client/legacy/CommonLegacyConfiguration.java
@@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
+import org.jboss.ejb._private.NetworkUtil;
import org.xnio.OptionMap;
import org.xnio.Options;
@@ -47,7 +48,7 @@ static URI getUri(final JBossEJBProperties.ConnectionConfiguration connectionCon
protocol = "remote+http";
}
try {
- return new URI(protocol, null, host, port, null, null, null);
+ return new URI(protocol, null, NetworkUtil.formatPossibleIpv6Address(host), port, null, null, null);
} catch (URISyntaxException e) {
return null;
}
diff --git a/src/main/java/org/jboss/ejb/client/legacy/DiscoveryLegacyConfiguration.java b/src/main/java/org/jboss/ejb/client/legacy/DiscoveryLegacyConfiguration.java
index 03846f3e6..8aeb8880f 100644
--- a/src/main/java/org/jboss/ejb/client/legacy/DiscoveryLegacyConfiguration.java
+++ b/src/main/java/org/jboss/ejb/client/legacy/DiscoveryLegacyConfiguration.java
@@ -24,7 +24,9 @@
import java.util.function.Consumer;
import org.jboss.ejb._private.Logs;
+import org.jboss.ejb.client.ClusterNodeSelector;
import org.kohsuke.MetaInfServices;
+import org.wildfly.common.function.ExceptionSupplier;
import org.wildfly.discovery.ServiceURL;
import org.wildfly.discovery.impl.StaticDiscoveryProvider;
import org.wildfly.discovery.spi.DiscoveryProvider;
@@ -49,7 +51,13 @@ public void configure(final Consumer discoveryProviderConsume
for (Map.Entry entry : ejbProperties.getClusterConfigurations().entrySet()) {
final String name = entry.getKey();
final JBossEJBProperties.ClusterConfiguration configuration = entry.getValue();
+ final ExceptionSupplier clusterNodeSelectorSupplier = configuration.getClusterNodeSelectorSupplier();
+ final long maximumAllowedConnectedNodes = configuration.getMaximumAllowedConnectedNodes();
+ for (JBossEJBProperties.ClusterNodeConfiguration nodeConfiguration : configuration.getNodeConfigurations()) {
+ final String nodeName = nodeConfiguration.getNodeName();
+
+ }
// todo: construct URI and map cluster:name to it
}
diff --git a/src/main/java/org/jboss/ejb/client/legacy/LegacyPropertiesConfiguration.java b/src/main/java/org/jboss/ejb/client/legacy/LegacyPropertiesConfiguration.java
index 38292aef2..8ceec5775 100644
--- a/src/main/java/org/jboss/ejb/client/legacy/LegacyPropertiesConfiguration.java
+++ b/src/main/java/org/jboss/ejb/client/legacy/LegacyPropertiesConfiguration.java
@@ -53,7 +53,6 @@ public static void configure(final EJBClientContext.Builder builder) {
if (port == -1) {
continue;
}
- final String protocol;
final OptionMap connectionOptions = connectionConfiguration.getConnectionOptions();
final URI uri = CommonLegacyConfiguration.getUri(connectionConfiguration, connectionOptions);
if (uri == null) {
diff --git a/src/main/java/org/jboss/ejb/client/legacy/RemotingLegacyConfiguration.java b/src/main/java/org/jboss/ejb/client/legacy/RemotingLegacyConfiguration.java
index 4f5986baf..6295ff8b0 100644
--- a/src/main/java/org/jboss/ejb/client/legacy/RemotingLegacyConfiguration.java
+++ b/src/main/java/org/jboss/ejb/client/legacy/RemotingLegacyConfiguration.java
@@ -23,7 +23,6 @@
import java.util.List;
import org.jboss.ejb._private.Logs;
-import org.jboss.remoting3.ConnectionBuilder;
import org.jboss.remoting3.Endpoint;
import org.jboss.remoting3.EndpointBuilder;
import org.jboss.remoting3.spi.EndpointConfigurator;
@@ -63,6 +62,12 @@ public Endpoint getConfiguredEndpoint() {
// we ignore the connection provider options
+ final Endpoint endpoint;
+ try {
+ endpoint = endpointBuilder.build();
+ } catch (IOException e) {
+ throw Logs.MAIN.failedToConstructEndpoint(e);
+ }
final List connectionList = properties.getConnectionList();
for (JBossEJBProperties.ConnectionConfiguration connectionConfiguration : connectionList) {
final OptionMap connectionOptions = connectionConfiguration.getConnectionOptions();
@@ -71,17 +76,10 @@ public Endpoint getConfiguredEndpoint() {
if (uri == null) {
continue;
}
-
- final ConnectionBuilder connectionBuilder = endpointBuilder.addConnection(uri);
- connectionBuilder.addAllOptions(connectionOptions);
- connectionBuilder.setImmediate(connectionConfiguration.isConnectEagerly());
- connectionBuilder.setAbstractType("ejb");
- connectionBuilder.setAbstractTypeAuthority("jboss");
- }
- try {
- return endpointBuilder.build();
- } catch (IOException e) {
- throw Logs.MAIN.failedToConstructEndpoint(e);
+ if (connectionConfiguration.isConnectEagerly()) {
+ endpoint.getConnection(uri, "ejb", "jboss");
+ }
}
+ return endpoint;
}
}
diff --git a/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java b/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java
index 41b22cd37..431a092c7 100644
--- a/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java
+++ b/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java
@@ -24,6 +24,8 @@
import static java.lang.Math.min;
import static java.security.AccessController.doPrivileged;
+import static org.wildfly.common.math.HashMath.multiHashOrdered;
+import static org.xnio.Bits.allAreSet;
import static org.xnio.IoUtils.safeClose;
import java.io.DataOutput;
@@ -31,9 +33,13 @@
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
+import java.net.InetAddress;
import java.net.URI;
+import java.net.URISyntaxException;
import java.security.PrivilegedAction;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -53,7 +59,7 @@
import javax.transaction.Transaction;
import javax.transaction.xa.Xid;
-import org.jboss.ejb._private.Logs;
+import org.jboss.ejb._private.*;
import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.AttachmentKey;
import org.jboss.ejb.client.AttachmentKeys;
@@ -120,10 +126,13 @@ class EJBClientChannel {
private final ServiceRegistry serviceRegistry;
private final MarshallingConfiguration configuration;
+ private final ServiceRegistration nodeRegistration;
private final ConcurrentMap registrationsMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap>> clusterRegistrationsMap = new ConcurrentHashMap<>();
private final IntIndexMap userTxnIds = new IntIndexHashMap(UserTransactionID::getId);
private final RemoteTransactionContext transactionContext;
+ private final AtomicInteger finishedParts = new AtomicInteger(0);
private final AtomicReference> futureResultRef;
private final LocalRegistryAndDiscoveryProvider discoveryProvider = new LocalRegistryAndDiscoveryProvider();
@@ -147,6 +156,36 @@ class EJBClientChannel {
this.configuration = configuration;
invocationTracker = new InvocationTracker(this.channel, channel.getOption(RemotingOptions.MAX_OUTBOUND_MESSAGES).intValue(), EJBClientChannel::mask);
futureResultRef = new AtomicReference<>(futureResult);
+ final ServiceURL.Builder builder = new ServiceURL.Builder();
+ builder.setUri(channel.getConnection().getPeerURI());
+ builder.setAbstractType("ejb").setAbstractTypeAuthority("jboss");
+ builder.addAttribute(EJBClientContext.FILTER_ATTR_NODE, AttributeValue.fromString(channel.getConnection().getRemoteEndpointName()));
+ nodeRegistration = serviceRegistry.registerService(builder.create());
+ channel.addCloseHandler((c, e) -> {
+ nodeRegistration.close();
+ Iterator> i1 = registrationsMap.entrySet().iterator();
+ while (i1.hasNext()) {
+ final Map.Entry entry = i1.next();
+ i1.remove();
+ entry.getValue().close();
+ }
+ final Iterator>>> i2 = clusterRegistrationsMap.entrySet().iterator();
+ while (i2.hasNext()) {
+ final Map.Entry>> entry1 = i2.next();
+ i2.remove();
+ final Iterator>> i3 = entry1.getValue().entrySet().iterator();
+ while (i3.hasNext()) {
+ final Map.Entry> entry2 = i3.next();
+ i3.remove();
+ Iterator i4 = entry2.getValue().values().iterator();
+ while (i4.hasNext()) {
+ final ServiceRegistration registration = i4.next();
+ i4.remove();
+ registration.close();
+ }
+ }
+ }
+ });
}
static int mask(int original) {
@@ -179,6 +218,34 @@ public int hashCode() {
}
}
+ static class ClusterDiscKey {
+ private final String clusterName;
+ private final String nodeName;
+ private final byte[] ipBytes;
+ private final int maskBits;
+ private final int hashCode;
+
+ ClusterDiscKey(final String clusterName, final String nodeName, final byte[] ipBytes, final int maskBits) {
+ this.clusterName = clusterName;
+ this.nodeName = nodeName;
+ this.ipBytes = ipBytes;
+ this.maskBits = maskBits;
+ hashCode = multiHashOrdered(multiHashOrdered(multiHashOrdered(maskBits, Arrays.hashCode(ipBytes)), nodeName.hashCode()), clusterName.hashCode());
+ }
+
+ public boolean equals(final Object obj) {
+ return obj instanceof ClusterDiscKey && equals((ClusterDiscKey) obj);
+ }
+
+ private boolean equals(final ClusterDiscKey other) {
+ return other == this || clusterName.equals(other.clusterName) && nodeName.equals(other.nodeName) && Arrays.equals(ipBytes, other.ipBytes) && maskBits == other.maskBits;
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+
private void processMessage(final MessageInputStream message) {
boolean leaveOpen = false;
try {
@@ -233,20 +300,15 @@ private void processMessage(final MessageInputStream message) {
old.close();
}
}
- final FutureResult futureResult = futureResultRef.get();
- if (futureResult != null && futureResultRef.compareAndSet(futureResult, null)) {
- // done!
- futureResult.setResult(this);
- }
+ finishPart(0b01);
break;
}
case Protocol.MODULE_UNAVAILABLE: {
int count = StreamUtils.readPackedSignedInt32(message);
final ConcurrentMap registrationsMap = this.registrationsMap;
for (int i = 0; i < count; i ++) {
- String appName = message.readUTF();
+ final String appName = message.readUTF();
final String moduleName = message.readUTF();
- if (appName.isEmpty()) appName = moduleName;
final String distinctName = message.readUTF();
final DiscKey key = new DiscKey(appName, moduleName, distinctName);
final ServiceRegistration old = registrationsMap.remove(key);
@@ -256,20 +318,97 @@ private void processMessage(final MessageInputStream message) {
}
break;
}
+ case Protocol.CLUSTER_TOPOLOGY_ADDITION:
case Protocol.CLUSTER_TOPOLOGY_COMPLETE: {
- // TODO read message
+ int clusterCount = StreamUtils.readPackedSignedInt32(message);
+ final Connection connection = channel.getConnection();
+ final URI peerURI = connection.getPeerURI();
+ final String scheme = peerURI.getScheme();
+ for (int i = 0; i < clusterCount; i ++) {
+ final String clusterName = message.readUTF();
+ final AttributeValue clusterValue = AttributeValue.fromString(clusterName);
+ int memberCount = StreamUtils.readPackedSignedInt32(message);
+ for (int j = 0; j < memberCount; j ++) {
+ final String nodeName = message.readUTF();
+ final AttributeValue nodeValue = AttributeValue.fromString(nodeName);
+
+ int mappingCount = StreamUtils.readPackedSignedInt32(message);
+ for (int k = 0; k < mappingCount; k ++) {
+ int b = message.readUnsignedByte();
+ final boolean ip6 = allAreSet(b, 1);
+ int netmaskBits = b >>> 1;
+ final byte[] sourceIpBytes = new byte[ip6 ? 16 : 4];
+ message.readFully(sourceIpBytes);
+ final String destHost = message.readUTF();
+ final int destPort = message.readUnsignedShort();
+
+ final ServiceURL.Builder builder = new ServiceURL.Builder();
+ builder.setAbstractType("ejb");
+ builder.setAbstractTypeAuthority("jboss");
+ builder.addAttribute(EJBClientContext.FILTER_ATTR_NODE, nodeValue);
+ builder.addAttribute(EJBClientContext.FILTER_ATTR_CLUSTER, clusterValue);
+ if (netmaskBits != 0) {
+ // do not match all
+ builder.addAttribute(EJBClientContext.FILTER_ATTR_SOURCE_IP, AttributeValue.fromString("[" + InetAddress.getByAddress(sourceIpBytes).getHostAddress() + "]/" + netmaskBits));
+ }
+
+ try {
+ builder.setUri(new URI(
+ scheme,
+ null,
+ NetworkUtil.formatPossibleIpv6Address(destHost),
+ destPort,
+ null,
+ null,
+ null
+ ));
+ } catch (URISyntaxException e) {
+ Logs.REMOTING.trace("Ignoring cluster node because the URI failed to be built", e);
+ continue;
+ }
+ final ServiceRegistration registration = serviceRegistry.registerService(builder.create());
+ final ClusterDiscKey key = new ClusterDiscKey(clusterName, nodeName, sourceIpBytes, netmaskBits);
+ final ServiceRegistration old = clusterRegistrationsMap.computeIfAbsent(clusterName, x -> new ConcurrentHashMap<>()).computeIfAbsent(nodeName, x -> new ConcurrentHashMap<>()).put(key, registration);
+ if (old != null) {
+ old.close();
+ }
+ }
+ }
+ }
+ finishPart(0b10);
break;
}
case Protocol.CLUSTER_TOPOLOGY_REMOVAL: {
- // TODO read message
- break;
- }
- case Protocol.CLUSTER_TOPOLOGY_ADDITION: {
- // TODO read message
+ int clusterCount = StreamUtils.readPackedSignedInt32(message);
+ for (int i = 0; i < clusterCount; i ++) {
+ String clusterName = message.readUTF();
+ final ConcurrentMap> subMap = clusterRegistrationsMap.remove(clusterName);
+ if (subMap != null) {
+ for (ConcurrentMap subSubMap : subMap.values()) {
+ for (ServiceRegistration registration : subSubMap.values()) {
+ registration.close();
+ }
+ }
+ }
+ }
break;
}
case Protocol.CLUSTER_TOPOLOGY_NODE_REMOVAL: {
- // TODO read message
+ int clusterCount = StreamUtils.readPackedSignedInt32(message);
+ for (int i = 0; i < clusterCount; i ++) {
+ String clusterName = message.readUTF();
+ final ConcurrentMap> subMap = clusterRegistrationsMap.get(clusterName);
+ int memberCount = StreamUtils.readPackedSignedInt32(message);
+ for (int j = 0; j < memberCount; j ++) {
+ String nodeName = message.readUTF();
+ if (subMap != null) {
+ final ConcurrentMap subSubMap = subMap.remove(nodeName);
+ if (subSubMap != null) for (ServiceRegistration registration : subSubMap.values()) {
+ registration.close();
+ }
+ }
+ }
+ }
break;
}
default: {
@@ -799,6 +938,24 @@ DiscoveryProvider getDiscoveryProvider() {
return discoveryProvider;
}
+ void finishPart(int bit) {
+ int oldVal, newVal;
+ do {
+ oldVal = finishedParts.get();
+ if (allAreSet(oldVal, bit)) {
+ return;
+ }
+ newVal = oldVal | bit;
+ } while (! finishedParts.compareAndSet(oldVal, newVal));
+ if (newVal == 0b11) {
+ final FutureResult futureResult = futureResultRef.get();
+ if (futureResult != null && futureResultRef.compareAndSet(futureResult, null)) {
+ // done!
+ futureResult.setResult(this);
+ }
+ }
+ }
+
final class MethodInvocation extends Invocation {
private final EJBReceiverInvocationContext receiverInvocationContext;
private final AtomicInteger refCounter = new AtomicInteger(1);
diff --git a/src/test/java/org/jboss/ejb/protocol/remote/NetworkUtilTestCase.java b/src/test/java/org/jboss/ejb/protocol/remote/NetworkUtilTestCase.java
index 64868d677..11f4b12a8 100644
--- a/src/test/java/org/jboss/ejb/protocol/remote/NetworkUtilTestCase.java
+++ b/src/test/java/org/jboss/ejb/protocol/remote/NetworkUtilTestCase.java
@@ -27,6 +27,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
+import org.jboss.ejb._private.NetworkUtil;
import org.junit.Test;
/**