Skip to content

Commit

Permalink
Merge pull request #314 from stuartwdouglas/serial-retry
Browse files Browse the repository at this point in the history
EJBCLIENT-266 Only retry a single request at a time
  • Loading branch information
dmlloyd authored Sep 13, 2017
2 parents 7c9c275 + 31592c7 commit 2e93e74
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/main/java/org/jboss/ejb/_private/Logs.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public interface Logs extends BasicLogger {
@Message(id = 507, value = "Internal server error occurred while processing a transaction")
SystemException internalSystemErrorWithTx(@Cause Throwable t);

@LogMessage(level = ERROR)
@Message(id = 508, value = "Failed to execute Runnable %s")
void taskFailed(Runnable runnable, @Cause Throwable t);

// Remote messages; no ID for brevity but should be translated

@Message(value = "No such EJB: %s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -121,10 +122,13 @@ class EJBClientChannel {
private final AtomicInteger finishedParts = new AtomicInteger(0);
private final AtomicReference<FutureResult<EJBClientChannel>> futureResultRef;

EJBClientChannel(final Channel channel, final int version, final DiscoveredNodeRegistry discoveredNodeRegistry, final FutureResult<EJBClientChannel> futureResult) {
private final RetryExecutorWrapper retryExecutorWrapper;

EJBClientChannel(final Channel channel, final int version, final DiscoveredNodeRegistry discoveredNodeRegistry, final FutureResult<EJBClientChannel> futureResult, RetryExecutorWrapper retryExecutorWrapper) {
this.channel = channel;
this.version = version;
this.discoveredNodeRegistry = discoveredNodeRegistry;
this.retryExecutorWrapper = retryExecutorWrapper;
marshallerFactory = Marshalling.getProvidedMarshallerFactory("river");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setClassResolver(ProtocolClassResolver.INSTANCE);
Expand Down Expand Up @@ -619,7 +623,7 @@ private static <T> void writeRawIdentifier(final EJBLocator<T> statelessLocator,
out.writeUTF(statelessLocator.getBeanName());
}

static IoFuture<EJBClientChannel> construct(final Channel channel, final DiscoveredNodeRegistry discoveredNodeRegistry) {
static IoFuture<EJBClientChannel> construct(final Channel channel, final DiscoveredNodeRegistry discoveredNodeRegistry, RetryExecutorWrapper retryExecutorWrapper) {
FutureResult<EJBClientChannel> futureResult = new FutureResult<>();
// now perform opening negotiation: receive server greeting
channel.receiveMessage(new Channel.Receiver() {
Expand All @@ -645,7 +649,7 @@ public void handleMessage(final Channel channel, final MessageInputStream messag
out.writeUTF("river");
}
// almost done; wait for initial module available report
final EJBClientChannel ejbClientChannel = new EJBClientChannel(channel, version, discoveredNodeRegistry, futureResult);
final EJBClientChannel ejbClientChannel = new EJBClientChannel(channel, version, discoveredNodeRegistry, futureResult, retryExecutorWrapper);
channel.receiveMessage(new Channel.Receiver() {
public void handleError(final Channel channel, final IOException error) {
futureResult.setException(error);
Expand Down Expand Up @@ -1216,8 +1220,8 @@ public void discardResult() {
}
}

private XnioWorker getRetryExecutor() {
return getChannel().getConnection().getEndpoint().getXnioWorker();
private Executor getRetryExecutor() {
return retryExecutorWrapper.getExecutor(getChannel().getConnection().getEndpoint().getXnioWorker());
}

static class ResponseMessageInputStream extends MessageInputStream implements ByteInput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ class RemoteEJBReceiver extends EJBReceiver {

final ClientServiceHandle<EJBClientChannel> serviceHandle;

private final RetryExecutorWrapper retryExecutorWrapper = new RetryExecutorWrapper();

RemoteEJBReceiver(final RemoteTransportProvider remoteTransportProvider, final EJBReceiverContext receiverContext, final RemotingEJBDiscoveryProvider discoveredNodeRegistry) {
this.remoteTransportProvider = remoteTransportProvider;
this.receiverContext = receiverContext;
this.discoveredNodeRegistry = discoveredNodeRegistry;
serviceHandle = new ClientServiceHandle<>("jboss.ejb", channel -> EJBClientChannel.construct(channel, this.discoveredNodeRegistry));
serviceHandle = new ClientServiceHandle<>("jboss.ejb", channel -> EJBClientChannel.construct(channel, this.discoveredNodeRegistry, retryExecutorWrapper));
}

final IoFuture.HandlingNotifier<ConnectionPeerIdentity, EJBReceiverInvocationContext> notifier = new IoFuture.HandlingNotifier<ConnectionPeerIdentity, EJBReceiverInvocationContext>() {
Expand All @@ -78,11 +80,11 @@ public void handleDone(final ConnectionPeerIdentity peerIdentity, final EJBRecei
ejbClientChannel = ioFuture.getInterruptibly();
} catch (IOException e) {
// should generally not be possible but we should handle it cleanly regardless
attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), peerIdentity.getConnection().getEndpoint().getXnioWorker());
attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), retryExecutorWrapper.getExecutor(peerIdentity.getConnection().getEndpoint().getXnioWorker()));
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), peerIdentity.getConnection().getEndpoint().getXnioWorker());
attachment1.requestFailed(new RequestSendFailedException(e + "@" + peerIdentity.getConnection().getPeerURI(), false), retryExecutorWrapper.getExecutor(peerIdentity.getConnection().getEndpoint().getXnioWorker()));
return;
}
attachment1.getClientInvocationContext().putAttachment(EJBCC_KEY, ejbClientChannel);
Expand All @@ -96,7 +98,7 @@ public void handleCancelled(final EJBReceiverInvocationContext attachment) {
}

public void handleFailed(final IOException exception, final EJBReceiverInvocationContext attachment) {
attachment.requestFailed(new RequestSendFailedException(exception, false), Endpoint.getCurrent().getXnioWorker());
attachment.requestFailed(new RequestSendFailedException(exception, false), retryExecutorWrapper.getExecutor(Endpoint.getCurrent().getXnioWorker()));
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.jboss.ejb.protocol.remote;

import org.jboss.ejb._private.Logs;

import java.util.concurrent.Executor;

/**
* @author Stuart Douglas
*/
public class RetryExecutorWrapper {

private final Object lock = new Object();
private Task last = null;

public Executor getExecutor(Executor executor) {
return runnable -> {
synchronized (lock) {
Task task = new Task(runnable, executor);
if (last != null) {
last.next = task;
last = task;
} else {
last = task;
executor.execute(task);
}
}

};

}


private class Task implements Runnable {

private final Runnable runnable;
private final Executor delegate;
private Task next;

private Task(Runnable runnable, Executor delegate) {
this.runnable = runnable;
this.delegate = delegate;
}

@Override
public void run() {
try {
runnable.run();
} catch (Throwable t) {
Logs.MAIN.taskFailed(runnable, t);
} finally {
synchronized (lock) {
if (last == this) {
last = null;
}
if (next != null) {
next.delegate.execute(next);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2017 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jboss.ejb.client.test;

import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.EJBClient;
import org.jboss.ejb.client.EJBClientCluster;
import org.jboss.ejb.client.EJBClientConnection;
import org.jboss.ejb.client.EJBClientContext;
import org.jboss.ejb.client.StatefulEJBLocator;
import org.jboss.ejb.client.StatelessEJBLocator;
import org.jboss.ejb.client.legacy.JBossEJBProperties;
import org.jboss.ejb.client.test.common.DummyServer;
import org.jboss.ejb.client.test.common.Echo;
import org.jboss.ejb.client.test.common.EchoBean;
import org.jboss.ejb.server.ClusterTopologyListener.ClusterInfo;
import org.jboss.ejb.server.ClusterTopologyListener.NodeInfo;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Tests basic invocation of a bean deployed on a single server node.
*
* @author <a href="mailto:rachmato@redhat.com">Richard Achmatowicz</a>
*/
public class ClusteredInvocationFailOverTestCase {

public static AtomicInteger SENT = new AtomicInteger();

private static final Logger logger = Logger.getLogger(ClusteredInvocationFailOverTestCase.class);
private static final String PROPERTIES_FILE = "clustered-jboss-ejb-client.properties";

// servers
private static final String SERVER1_NAME = "node1";
private static final String SERVER2_NAME = "node2";
private static final int THREADS = 40;

private DummyServer[] servers = new DummyServer[2];
private static String[] serverNames = {SERVER1_NAME, SERVER2_NAME};
private boolean[] serversStarted = new boolean[2] ;

// module
private static final String APP_NAME = "my-foo-app";
private static final String MODULE_NAME = "my-bar-module";
private static final String DISTINCT_NAME = "";

// cluster
// note: node names and server names should match!
private static final String CLUSTER_NAME = "ejb";
private static final String NODE1_NAME = "node1";
private static final String NODE2_NAME = "node2";

private static final NodeInfo NODE1 = DummyServer.getNodeInfo(NODE1_NAME, "localhost",6999,"0.0.0.0",0);
private static final NodeInfo NODE2 = DummyServer.getNodeInfo(NODE2_NAME, "localhost",7099,"0.0.0.0",0);
private static final ClusterInfo CLUSTER = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2);

private static ExecutorService executorService;
private volatile boolean runInvocations = true;

/**
* Do any general setup here
* @throws Exception
*/
@BeforeClass
public static void beforeClass() throws Exception {
// trigger the static init of the correct properties file - this also depends on running in forkMode=always
JBossEJBProperties ejbProperties = JBossEJBProperties.fromClassPath(SimpleInvocationTestCase.class.getClassLoader(), PROPERTIES_FILE);
JBossEJBProperties.getContextManager().setGlobalDefault(ejbProperties);

executorService = Executors.newFixedThreadPool(THREADS);
}

/**
* Do any test specific setup here
*/
@Before
public void beforeTest() throws Exception {

//startServer(0);
startServer(1);
}


/**
* Test a basic invocation on clustered SLSB
*/
@Test
public void testClusteredSLSBInvocation() throws Exception {
List<Future<?>> retList = new ArrayList<>();

for(int i = 0; i < THREADS; ++i) {
retList.add(executorService.submit((Callable<Object>) () -> {
while (runInvocations) {
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<Echo>(Echo.class, APP_NAME, MODULE_NAME, Echo.class.getSimpleName(), DISTINCT_NAME);
final Echo proxy = EJBClient.createProxy(statelessEJBLocator);

EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb"));
Assert.assertNotNull("Received a null proxy", proxy);
logger.info("Created proxy for Echo: " + proxy.toString());

logger.info("Invoking on proxy...");
// invoke on the proxy (use a ClusterAffinity for now)
final String message = "hello!";
SENT.incrementAndGet();
final String echo = proxy.echo(message);
Assert.assertEquals("Got an unexpected echo", echo, message);
}
return "ok";
}));
}

Thread.sleep(500);
stopServer(0);
//startServer(0);
//Thread.sleep(500);
//stopServer(1);

Thread.sleep(500);
runInvocations = false;
for(Future<?> i : retList) {
i.get();
}

}

private void stopServer(int server) {
if (serversStarted[server]) {
try {
servers[server].unregister(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getName());
servers[server].removeCluster(CLUSTER_NAME);
logger.info("Unregistered module from " + serverNames[0]);
this.servers[server].stop();
logger.info("Stopped server " + serverNames[server]);
} catch (Throwable t) {
logger.info("Could not stop server", t);
} finally {
serversStarted[server] = false;
}
}
}

private void startServer(int server) throws Exception {
servers[server] = new DummyServer("localhost", 6999 + (server * 100), serverNames[server]);
servers[server].start();
serversStarted[server] = true;
logger.info("Started server " + serverNames[server]);

servers[server].register(APP_NAME, MODULE_NAME, DISTINCT_NAME, Echo.class.getSimpleName(), new EchoBean());
logger.info("Registered module on server " + servers[server]);

servers[server].addCluster(CLUSTER);
logger.info("Added node to cluster " + CLUSTER_NAME + ": server " + servers[server]);

}

/**
* Do any test-specific tear down here.
*/
@After
public void afterTest() {
stopServer(0);
stopServer(1);
}

/**
* Do any general tear down here.
*/
@AfterClass
public static void afterClass() {
executorService.shutdownNow();
}

}

0 comments on commit 2e93e74

Please sign in to comment.