From 31592c7f6a3669b18bcb476cd540a9a22f86abce Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 12 Sep 2017 11:47:35 +1000 Subject: [PATCH] EJBCLIENT-266 Only retry a single request at a time --- .../java/org/jboss/ejb/_private/Logs.java | 4 + .../ejb/protocol/remote/EJBClientChannel.java | 14 +- .../protocol/remote/RemoteEJBReceiver.java | 10 +- .../protocol/remote/RetryExecutorWrapper.java | 62 ++++++ .../ClusteredInvocationFailOverTestCase.java | 202 ++++++++++++++++++ 5 files changed, 283 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/jboss/ejb/protocol/remote/RetryExecutorWrapper.java create mode 100644 src/test/java/org/jboss/ejb/client/test/ClusteredInvocationFailOverTestCase.java diff --git a/src/main/java/org/jboss/ejb/_private/Logs.java b/src/main/java/org/jboss/ejb/_private/Logs.java index 70c08b393..f6bbf64de 100644 --- a/src/main/java/org/jboss/ejb/_private/Logs.java +++ b/src/main/java/org/jboss/ejb/_private/Logs.java @@ -408,6 +408,10 @@ public interface Logs extends BasicLogger { @Message(id = 507, value = "Internal server error occurred while processing a transaction") SystemException internalSystemErrorWithTx(@Cause Throwable t); + @LogMessage(level = ERROR) + @Message(id = 508, value = "Failed to execute Runnable %s") + void taskFailed(Runnable runnable, @Cause Throwable t); + // Remote messages; no ID for brevity but should be translated @Message(value = "No such EJB: %s") diff --git a/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java b/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java index 5a0ccc7f0..33d044870 100644 --- a/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java +++ b/src/main/java/org/jboss/ejb/protocol/remote/EJBClientChannel.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -120,10 +121,13 @@ class EJBClientChannel { private final AtomicInteger finishedParts = new AtomicInteger(0); private final AtomicReference> futureResultRef; - EJBClientChannel(final Channel channel, final int version, final DiscoveredNodeRegistry discoveredNodeRegistry, final FutureResult futureResult) { + private final RetryExecutorWrapper retryExecutorWrapper; + + EJBClientChannel(final Channel channel, final int version, final DiscoveredNodeRegistry discoveredNodeRegistry, final FutureResult futureResult, RetryExecutorWrapper retryExecutorWrapper) { this.channel = channel; this.version = version; this.discoveredNodeRegistry = discoveredNodeRegistry; + this.retryExecutorWrapper = retryExecutorWrapper; marshallerFactory = Marshalling.getProvidedMarshallerFactory("river"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setClassResolver(ProtocolClassResolver.INSTANCE); @@ -618,7 +622,7 @@ private static void writeRawIdentifier(final EJBLocator statelessLocator, out.writeUTF(statelessLocator.getBeanName()); } - static IoFuture construct(final Channel channel, final DiscoveredNodeRegistry discoveredNodeRegistry) { + static IoFuture construct(final Channel channel, final DiscoveredNodeRegistry discoveredNodeRegistry, RetryExecutorWrapper retryExecutorWrapper) { FutureResult futureResult = new FutureResult<>(); // now perform opening negotiation: receive server greeting channel.receiveMessage(new Channel.Receiver() { @@ -644,7 +648,7 @@ public void handleMessage(final Channel channel, final MessageInputStream messag out.writeUTF("river"); } // almost done; wait for initial module available report - final EJBClientChannel ejbClientChannel = new EJBClientChannel(channel, version, discoveredNodeRegistry, futureResult); + final EJBClientChannel ejbClientChannel = new EJBClientChannel(channel, version, discoveredNodeRegistry, futureResult, retryExecutorWrapper); channel.receiveMessage(new Channel.Receiver() { public void handleError(final Channel channel, final IOException error) { safeClose(channel); @@ -1213,8 +1217,8 @@ public void discardResult() { } } - private XnioWorker getRetryExecutor() { - return getChannel().getConnection().getEndpoint().getXnioWorker(); + private Executor getRetryExecutor() { + return retryExecutorWrapper.getExecutor(getChannel().getConnection().getEndpoint().getXnioWorker()); } static class ResponseMessageInputStream extends MessageInputStream implements ByteInput { diff --git a/src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java b/src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java index ec239dd50..2a62d9e79 100644 --- a/src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java +++ b/src/main/java/org/jboss/ejb/protocol/remote/RemoteEJBReceiver.java @@ -62,11 +62,13 @@ class RemoteEJBReceiver extends EJBReceiver { final ClientServiceHandle serviceHandle; + private final RetryExecutorWrapper retryExecutorWrapper = new RetryExecutorWrapper(); + RemoteEJBReceiver(final RemoteTransportProvider remoteTransportProvider, final EJBReceiverContext receiverContext, final RemotingEJBDiscoveryProvider discoveredNodeRegistry) { this.remoteTransportProvider = remoteTransportProvider; this.receiverContext = receiverContext; this.discoveredNodeRegistry = discoveredNodeRegistry; - serviceHandle = new ClientServiceHandle<>("jboss.ejb", channel -> EJBClientChannel.construct(channel, this.discoveredNodeRegistry)); + serviceHandle = new ClientServiceHandle<>("jboss.ejb", channel -> EJBClientChannel.construct(channel, this.discoveredNodeRegistry, retryExecutorWrapper)); } final IoFuture.HandlingNotifier notifier = new IoFuture.HandlingNotifier() { @@ -78,11 +80,11 @@ public void handleDone(final ConnectionPeerIdentity peerIdentity, final EJBRecei ejbClientChannel = ioFuture.getInterruptibly(); } catch (IOException e) { // should generally not be possible but we should handle it cleanly regardless - attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), peerIdentity.getConnection().getEndpoint().getXnioWorker()); + attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), retryExecutorWrapper.getExecutor(peerIdentity.getConnection().getEndpoint().getXnioWorker())); return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), peerIdentity.getConnection().getEndpoint().getXnioWorker()); + attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), retryExecutorWrapper.getExecutor(peerIdentity.getConnection().getEndpoint().getXnioWorker())); return; } attachment1.getClientInvocationContext().putAttachment(EJBCC_KEY, ejbClientChannel); @@ -96,7 +98,7 @@ public void handleCancelled(final EJBReceiverInvocationContext attachment) { } public void handleFailed(final IOException exception, final EJBReceiverInvocationContext attachment) { - attachment.requestFailed(new RequestSendFailedException(exception, false), Endpoint.getCurrent().getXnioWorker()); + attachment.requestFailed(new RequestSendFailedException(exception, false), retryExecutorWrapper.getExecutor(Endpoint.getCurrent().getXnioWorker())); } }; diff --git a/src/main/java/org/jboss/ejb/protocol/remote/RetryExecutorWrapper.java b/src/main/java/org/jboss/ejb/protocol/remote/RetryExecutorWrapper.java new file mode 100644 index 000000000..add93e4b1 --- /dev/null +++ b/src/main/java/org/jboss/ejb/protocol/remote/RetryExecutorWrapper.java @@ -0,0 +1,62 @@ +package org.jboss.ejb.protocol.remote; + +import org.jboss.ejb._private.Logs; + +import java.util.concurrent.Executor; + +/** + * @author Stuart Douglas + */ +public class RetryExecutorWrapper { + + private final Object lock = new Object(); + private Task last = null; + + public Executor getExecutor(Executor executor) { + return runnable -> { + synchronized (lock) { + Task task = new Task(runnable, executor); + if (last != null) { + last.next = task; + last = task; + } else { + last = task; + executor.execute(task); + } + } + + }; + + } + + + private class Task implements Runnable { + + private final Runnable runnable; + private final Executor delegate; + private Task next; + + private Task(Runnable runnable, Executor delegate) { + this.runnable = runnable; + this.delegate = delegate; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + Logs.MAIN.taskFailed(runnable, t); + } finally { + synchronized (lock) { + if (last == this) { + last = null; + } + if (next != null) { + next.delegate.execute(next); + } + } + } + } + } +} diff --git a/src/test/java/org/jboss/ejb/client/test/ClusteredInvocationFailOverTestCase.java b/src/test/java/org/jboss/ejb/client/test/ClusteredInvocationFailOverTestCase.java new file mode 100644 index 000000000..7d7d82a12 --- /dev/null +++ b/src/test/java/org/jboss/ejb/client/test/ClusteredInvocationFailOverTestCase.java @@ -0,0 +1,202 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2017 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jboss.ejb.client.test; + +import org.jboss.ejb.client.ClusterAffinity; +import org.jboss.ejb.client.EJBClient; +import org.jboss.ejb.client.EJBClientCluster; +import org.jboss.ejb.client.EJBClientConnection; +import org.jboss.ejb.client.EJBClientContext; +import org.jboss.ejb.client.StatefulEJBLocator; +import org.jboss.ejb.client.StatelessEJBLocator; +import org.jboss.ejb.client.legacy.JBossEJBProperties; +import org.jboss.ejb.client.test.common.DummyServer; +import org.jboss.ejb.client.test.common.Echo; +import org.jboss.ejb.client.test.common.EchoBean; +import org.jboss.ejb.server.ClusterTopologyListener.ClusterInfo; +import org.jboss.ejb.server.ClusterTopologyListener.NodeInfo; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests basic invocation of a bean deployed on a single server node. + * + * @author Richard Achmatowicz + */ +public class ClusteredInvocationFailOverTestCase { + + public static AtomicInteger SENT = new AtomicInteger(); + + private static final Logger logger = Logger.getLogger(ClusteredInvocationFailOverTestCase.class); + private static final String PROPERTIES_FILE = "clustered-jboss-ejb-client.properties"; + + // servers + private static final String SERVER1_NAME = "node1"; + private static final String SERVER2_NAME = "node2"; + private static final int THREADS = 40; + + private DummyServer[] servers = new DummyServer[2]; + private static String[] serverNames = {SERVER1_NAME, SERVER2_NAME}; + private boolean[] serversStarted = new boolean[2] ; + + // module + private static final String APP_NAME = "my-foo-app"; + private static final String MODULE_NAME = "my-bar-module"; + private static final String DISTINCT_NAME = ""; + + // cluster + // note: node names and server names should match! + private static final String CLUSTER_NAME = "ejb"; + private static final String NODE1_NAME = "node1"; + private static final String NODE2_NAME = "node2"; + + private static final NodeInfo NODE1 = DummyServer.getNodeInfo(NODE1_NAME, "localhost",6999,"0.0.0.0",0); + private static final NodeInfo NODE2 = DummyServer.getNodeInfo(NODE2_NAME, "localhost",7099,"0.0.0.0",0); + private static final ClusterInfo CLUSTER = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2); + + private static ExecutorService executorService; + private volatile boolean runInvocations = true; + + /** + * Do any general setup here + * @throws Exception + */ + @BeforeClass + public static void beforeClass() throws Exception { + // trigger the static init of the correct properties file - this also depends on running in forkMode=always + JBossEJBProperties ejbProperties = JBossEJBProperties.fromClassPath(SimpleInvocationTestCase.class.getClassLoader(), PROPERTIES_FILE); + JBossEJBProperties.getContextManager().setGlobalDefault(ejbProperties); + + executorService = Executors.newFixedThreadPool(THREADS); + } + + /** + * Do any test specific setup here + */ + @Before + public void beforeTest() throws Exception { + + //startServer(0); + startServer(1); + } + + + /** + * Test a basic invocation on clustered SLSB + */ + @Test + public void testClusteredSLSBInvocation() throws Exception { + List> retList = new ArrayList<>(); + + for(int i = 0; i < THREADS; ++i) { + retList.add(executorService.submit((Callable) () -> { + while (runInvocations) { + final StatelessEJBLocator statelessEJBLocator = new StatelessEJBLocator(Echo.class, APP_NAME, MODULE_NAME, Echo.class.getSimpleName(), DISTINCT_NAME); + final Echo proxy = EJBClient.createProxy(statelessEJBLocator); + + EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb")); + Assert.assertNotNull("Received a null proxy", proxy); + logger.info("Created proxy for Echo: " + proxy.toString()); + + logger.info("Invoking on proxy..."); + // invoke on the proxy (use a ClusterAffinity for now) + final String message = "hello!"; + SENT.incrementAndGet(); + final String echo = proxy.echo(message); + Assert.assertEquals("Got an unexpected echo", echo, message); + } + return "ok"; + })); + } + + Thread.sleep(500); + stopServer(0); + //startServer(0); + //Thread.sleep(500); + //stopServer(1); + + Thread.sleep(500); + runInvocations = false; + for(Future i : retList) { + i.get(); + } + + } + + private void stopServer(int server) { + if (serversStarted[server]) { + try { + servers[server].unregister(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getName()); + servers[server].removeCluster(CLUSTER_NAME); + logger.info("Unregistered module from " + serverNames[0]); + this.servers[server].stop(); + logger.info("Stopped server " + serverNames[server]); + } catch (Throwable t) { + logger.info("Could not stop server", t); + } finally { + serversStarted[server] = false; + } + } + } + + private void startServer(int server) throws Exception { + servers[server] = new DummyServer("localhost", 6999 + (server * 100), serverNames[server]); + servers[server].start(); + serversStarted[server] = true; + logger.info("Started server " + serverNames[server]); + + servers[server].register(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getSimpleName(), new EchoBean()); + logger.info("Registered module on server " + servers[server]); + + servers[server].addCluster(CLUSTER); + logger.info("Added node to cluster " + CLUSTER_NAME + ": server " + servers[server]); + + } + + /** + * Do any test-specific tear down here. + */ + @After + public void afterTest() { + stopServer(0); + stopServer(1); + } + + /** + * Do any general tear down here. + */ + @AfterClass + public static void afterClass() { + executorService.shutdownNow(); + } + +}