Skip to content

Commit

Permalink
JBTM-3929 Participant failover support
Browse files Browse the repository at this point in the history
  • Loading branch information
mmusgrov committed Oct 1, 2024
1 parent 751867c commit 3acf8b9
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ private Response joinLRA(URI lraId, long timeLimit, String linkHeader, StringBui
}

StringBuilder recoveryUrl = new StringBuilder();
int status = lraService.joinLRA(recoveryUrl, lraId, timeLimit, null, linkHeader, recoveryUrlBase, userData);
int status = lraService.joinLRA(recoveryUrl, lraId, timeLimit, null, linkHeader, recoveryUrlBase, userData, version);

try {
return Response.status(status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@

@Tag(name = "LRA Recovery")
public class RecoveryCoordinator {
@Context
private UriInfo context;

private final LRAService lraService;

public RecoveryCoordinator() {
Expand All @@ -67,9 +64,11 @@ public String getCompensator(
@Parameter(name = "RecCoordId",
description = "An identifier that was returned by the coordinator when a participant joined the LRA",
required = true)
@PathParam("RecCoordId") String rcvCoordId) throws NotFoundException {
@PathParam("RecCoordId") String rcvCoordId,
@Context UriInfo uriInfo) throws NotFoundException {

String compensatorUrl = lraService.getParticipant(rcvCoordId);
String context = uriInfo.getRequestUri().toASCIIString();
String compensatorUrl = lraService.getParticipant(context);

if (compensatorUrl == null) {
String errorMsg = LRALogger.i18nLogger.warn_cannotFoundCompensatorUrl(rcvCoordId, lraId);
Expand Down Expand Up @@ -102,8 +101,10 @@ public String replaceCompensator(
description = "An identifier that was returned by the coordinator when a participant joined the LRA",
required = true)
@PathParam("RecCoordId") String rcvCoordId,
@Context UriInfo uriInfo,
String newCompensatorUrl) throws NotFoundException {
String compensatorUrl = lraService.getParticipant(rcvCoordId);
String context = uriInfo.getRequestUri().toASCIIString();
String compensatorUrl = lraService.getParticipant(context);

if (compensatorUrl != null) {
URI lra;
Expand All @@ -117,9 +118,9 @@ public String replaceCompensator(
Response.status(INTERNAL_SERVER_ERROR.getStatusCode()).entity(errMsg).build());
}

lraService.updateRecoveryURI(lra, newCompensatorUrl, rcvCoordId, true);
lraService.updateRecoveryURI(lra, newCompensatorUrl, context, true);

return context.getRequestUri().toASCIIString();
return context;
}

String errorMsg = LRALogger.i18nLogger.warn_cannotFoundCompensatorUrl(rcvCoordId, lraId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,21 +1014,35 @@ public String getParticipantURI() {
return participantPath;
}

// the participant is asking to be called back on different URLs
void updateCallbacks(String linkStr) {
Exception e = parseLink(linkStr);

if (e != null) {
String errorMsg = LRALogger.i18nLogger.warn_invalid_compensator(e.getMessage(), linkStr);

throw new WebApplicationException(errorMsg, e,
Response.status(BAD_REQUEST).entity(errorMsg).build());
}
}

void setRecoveryURI(String recoveryURI) {
try {
this.recoveryURI = new URI(recoveryURI);
} catch (URISyntaxException e) {
String errorMsg = LRALogger.i18nLogger.error_invalidRecoveryUrlToJoinLRAURI(recoveryURI, lraId);

LRALogger.logger.info(errorMsg);
if (LRALogger.logger.isDebugEnabled()) {
LRALogger.logger.debugf(errorMsg);
}

throw new WebApplicationException(errorMsg, e,
Response.status(BAD_REQUEST).entity(errorMsg).build());
}
}

void setRecoveryURI(String recoveryUrlBase, String txId, String coordinatorId) {
setRecoveryURI(recoveryUrlBase + txId + '/' + coordinatorId);
void setRecoveryURI(String recoveryUrlBase, String txId, String participantId) {
setRecoveryURI(String.format("%s/%s/%s", recoveryUrlBase, txId, participantId));
}

public String getCompensator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.arjuna.ats.arjuna.coordinator.RecordListIterator;
import com.arjuna.ats.arjuna.coordinator.RecordType;
import io.narayana.lra.Current;
import io.narayana.lra.LRAConstants;
import io.narayana.lra.LRAData;
import io.narayana.lra.logging.LRALogger;
import com.arjuna.ats.arjuna.state.InputObjectState;
Expand Down Expand Up @@ -725,7 +726,8 @@ protected LRAStatus toLRAStatus(int atomicActionStatus) {
}

public LRAParticipantRecord enlistParticipant(URI coordinatorUrl, String participantUrl, String recoveryUrlBase,
long timeLimit, String compensatorData) throws UnsupportedEncodingException {
long timeLimit, String compensatorData, String version)
throws UnsupportedEncodingException {
ReentrantLock lock = tryLockTransaction();
if (lock == null) {
LRALogger.i18nLogger.warn_enlistment();
Expand All @@ -739,7 +741,7 @@ public LRAParticipantRecord enlistParticipant(URI coordinatorUrl, String partici
return participant; // must have already been enlisted
}
participant = doEnlistParticipant(coordinatorUrl, participantUrl, recoveryUrlBase, timeLimit,
compensatorData);
compensatorData, version);
if (participant != null) {
// need to remember that there is a new participant
deactivate(); // if it fails the superclass will have logged a warning
Expand All @@ -755,13 +757,33 @@ public LRAParticipantRecord enlistParticipant(URI coordinatorUrl, String partici
}

private LRAParticipantRecord doEnlistParticipant(URI coordinatorUrl, String participantUrl, String recoveryUrlBase,
long timeLimit, String compensatorData) {
long timeLimit, String compensatorData, String version) {
LRAParticipantRecord p = new LRAParticipantRecord(this, lraService, participantUrl, compensatorData);
String pid = p.get_uid().fileStringForm();

String txId = URLEncoder.encode(coordinatorUrl.toASCIIString(), StandardCharsets.UTF_8);
/*
* versions are specific to the participant so only update the one used by this participant (ie different
* participants are allowed to be on different versions).
*
* From API version 1.2 onwards, the recovery URI is constructed from the RecoveryCoordinator path followed
* by segments for the Uid of the LRA and the Uid of the participant. If the passed in version is null
* then assume the latest. In previous versions the recovery URI was broken.
*/
if (version != null && (version.equals(LRAConstants.API_VERSION_1_0) || version.equals(LRAConstants.API_VERSION_1_1))) {
// use the old broken method
String txId = URLEncoder.encode(coordinatorUrl.toASCIIString(), StandardCharsets.UTF_8);

p.setRecoveryURI(recoveryUrlBase, txId, pid);
if (LRALogger.logger.isDebugEnabled()) {
LRALogger.logger.debugf(
"LongRunningAction enlist: using old style recovery URL (txId=%s participantId=%s)",
coordinatorUrl, txId);
}

p.setRecoveryURI(recoveryUrlBase, txId, pid);
} else {
// use the shiny new working method
p.setRecoveryURI(recoveryUrlBase, this.get_uid().fileStringForm(), pid);
}

if (add(p) != AddOutcome.AR_REJECTED) {
setTimeLimit(timeLimit);
Expand Down Expand Up @@ -1101,13 +1123,13 @@ private void abortLRA() {
}
}

public void updateRecoveryURI(String compensatorUri, String recoveryUri) {
LRAParticipantRecord lraRecord = findLRAParticipant(compensatorUri, false);
public void updateRecoveryURI(String linkHeader, String recoveryUri) {
LRAParticipantRecord lraRecord = findLRAParticipant(recoveryUri, false);

if (lraRecord != null) {
try {
lraRecord.setRecoveryURI(recoveryUri);

lraRecord.updateCallbacks(linkHeader);

if (!deactivate()) {
if (LRALogger.logger.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,15 @@ public int leave(URI lraId, String compensatorUrl) {
.entity(errorMsg).build());
}
}

public int joinLRA(StringBuilder recoveryUrl, URI lra, long timeLimit,
String compensatorUrl, String linkHeader, String recoveryUrlBase,
StringBuilder compensatorData) {
return joinLRA(recoveryUrl, lra,timeLimit, compensatorUrl, linkHeader, recoveryUrlBase, compensatorData, null);
}

public int joinLRA(StringBuilder recoveryUrl, URI lra, long timeLimit,
String compensatorUrl, String linkHeader, String recoveryUrlBase,
StringBuilder compensatorData, String version) {
if (lra == null) {
lraTrace(null, "Error missing LRA header in join request");
} else {
Expand Down Expand Up @@ -374,7 +379,7 @@ public int joinLRA(StringBuilder recoveryUrl, URI lra, long timeLimit,
if (compensatorData != null) {
participant = transaction.enlistParticipant(lra,
linkHeader != null ? linkHeader : compensatorUrl, recoveryUrlBase,
timeLimit, compensatorData.toString());
timeLimit, compensatorData.toString(), version);
// return any previously registered data
compensatorData.setLength(0);

Expand All @@ -384,7 +389,7 @@ public int joinLRA(StringBuilder recoveryUrl, URI lra, long timeLimit,
} else {
participant = transaction.enlistParticipant(lra,
linkHeader != null ? linkHeader : compensatorUrl, recoveryUrlBase,
timeLimit, null);
timeLimit, null, version);
}
} catch (UnsupportedEncodingException e) {
return Response.Status.PRECONDITION_FAILED.getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.narayana.lra.LRAConstants.COORDINATOR_PATH_NAME;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_RECOVERY_HEADER;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand All @@ -17,6 +18,8 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -26,7 +29,9 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import io.narayana.lra.LRAConstants;
import org.eclipse.microprofile.lra.annotation.LRAStatus;
import org.hamcrest.MatcherAssert;
import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer;
import org.jboss.resteasy.test.TestPortProvider;
import org.junit.After;
Expand Down Expand Up @@ -58,7 +63,8 @@
import jakarta.ws.rs.core.Response;

public class LRATest extends LRATestBase {

static final String LRA_API_VERSION_HEADER_NAME = "Narayana-LRA-API-version";
static final String RECOVERY_HEADER_NAME = "Long-Running-Action-Recovery";
private static LRAService service;

private NarayanaLRAClient lraClient;
Expand Down Expand Up @@ -124,6 +130,71 @@ public void after() {
server.stop();
}

@Test
public void joinWithVersionTest() {
URI lraId = lraClient.startLRA("joinLRAWithBody");
String version = LRAConstants.API_VERSION_1_2;
String encodedLraId = URLEncoder.encode(lraId.toString(), StandardCharsets.UTF_8); // must be valid

try (Response response = client.target(coordinatorPath)
.path(encodedLraId)
.request()
.header(LRA_API_VERSION_HEADER_NAME, version)
// the request body should correspond to a valid compensator or be empty
.put(Entity.text(""))) {
Assert.assertEquals("Expected joining LRA succeeded, PUT/200 is expected.",
Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals("Expected API header to be returned with the version provided in request",
version, response.getHeaderString(LRA_API_VERSION_HEADER_NAME));
String recoveryHeaderUrlMessage = response.getHeaderString(RECOVERY_HEADER_NAME);
String recoveryUrlBody = response.readEntity(String.class);
URI recoveryUrlLocation = response.getLocation();
Assert.assertEquals("Expecting returned body and recovery header have got the same content",
recoveryUrlBody, recoveryHeaderUrlMessage);
Assert.assertEquals("Expecting returned body and location have got the same content",
recoveryUrlBody, recoveryUrlLocation.toString());
MatcherAssert.assertThat("Expected returned message contains the sub-path of LRA recovery URL",
recoveryUrlBody, containsString("lra-coordinator/recovery"));
// the new format just contains the Uid of the LRA
MatcherAssert.assertThat("Expected returned message contains the LRA id",
recoveryUrlBody, containsString(LRAConstants.getLRAUid(lraId)));
} finally {
lraClient.cancelLRA(lraId);
}
}

@Test
public void joinWithOldVersionTest() {
URI lraId = lraClient.startLRA("joinLRAWithBody");
String version = LRAConstants.API_VERSION_1_1;
String encodedLraId = URLEncoder.encode(lraId.toString(), StandardCharsets.UTF_8); // must be valid

try (Response response = client.target(coordinatorPath)
.path(encodedLraId)
.request()
.header(LRA_API_VERSION_HEADER_NAME, version)
// the request body should correspond to a valid compensator or be empty
.put(Entity.text(""))) {
Assert.assertEquals("Expected joining LRA succeeded, PUT/200 is expected.",
Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals("Expected API header to be returned with the version provided in request",
version, response.getHeaderString(LRA_API_VERSION_HEADER_NAME));
String recoveryHeaderUrlMessage = response.getHeaderString(RECOVERY_HEADER_NAME);
String recoveryUrlBody = response.readEntity(String.class);
URI recoveryUrlLocation = response.getLocation();
Assert.assertEquals("Expecting returned body and recovery header have got the same content",
recoveryUrlBody, recoveryHeaderUrlMessage);
Assert.assertEquals("Expecting returned body and location have got the same content",
recoveryUrlBody, recoveryUrlLocation.toString());
MatcherAssert.assertThat("Expected returned message contains the sub-path of LRA recovery URL",
recoveryUrlBody, containsString("lra-coordinator/recovery"));
MatcherAssert.assertThat("Expected returned message contains the LRA id",
recoveryUrlBody, containsString(encodedLraId));
} finally {
lraClient.cancelLRA(lraId);
}
}

/**
* sanity check: test that a participant is notified when an LRA closes
*/
Expand Down Expand Up @@ -165,6 +236,74 @@ public void testComplete() throws URISyntaxException {
assertTrue("LRA should have closed", status == null || status == LRAStatus.Closed);
}

/*
* Participants can update their callbacks to facilitate recovery.
* Test that the compensate endpoint can be changed:
*/
@Test
public void testReplaceCompensator() throws URISyntaxException {
// verify that participants can change their callback endpoints
int fallbackCompensations = fallbackCompensateCount.get();
// call a participant method that starts an LRA and returns the lra and the recovery id in the response
String urls = client.target(TestPortProvider.generateURL("/base/test/start-with-recovery")).request().get(String.class);
String[] tokens = urls.split(",");
assertTrue("response is missing components for the lraId and/or recoveryId",
tokens.length >= 2);
// the service method returns the lra and recovery ids in a comma separated response:
String lraUrl = tokens[tokens.length - 2];
String recoveryUrl = tokens[tokens.length - 1];

// change the participant compensate endpoint (or change the resource completely to showcase migrating
// responsibility for the participant to a different microservice
String newCompensateCallback = TestPortProvider.generateURL("/base/test/fallback-compensate");
// define the new link header for the new compensate endpoint
String newCompensator = String.format("<%s>; rel=compensate", newCompensateCallback);

// check that performing a GET on the recovery url returns the participant callbacks:
try (Response r1 = client.target(recoveryUrl).request().get()) {
int res = r1.getStatus();
if (res != Response.Status.OK.getStatusCode()) {
// clean up and fail
fail("get recovery url failed: " + res);
}

String linkHeader = r1.readEntity(String.class);
// the link header should be a standard link header corresponding to the participant callbacks,
// just sanity check that the mandatory compensate rel type is present
String compensateRelationType = "rel=\"compensate\"";

MatcherAssert.assertThat("Compensator link header is missing the compensate rel type",
linkHeader, containsString(compensateRelationType));
}

// use the recovery url to ask the coordinator to compensate on a different endpoint
try (Response r1 = client.target(recoveryUrl).request().put(Entity.text(newCompensator))) {
int res = r1.getStatus();
if (res != Response.Status.OK.getStatusCode()) {
// clean up and fail
try (Response r = client.target(String.format("%s/cancel", lraUrl)).request().put(null)) {
if (r.getStatus() != Response.Status.OK.getStatusCode()) {
fail("move and cancel failed");
}
}
fail("move failed");
}
}

// cancel the LRA
try (Response r2 = client.target(String.format("%s/cancel", lraUrl)).request().put(null)) {
int res = r2.getStatus();
if (res != Response.Status.OK.getStatusCode()) {
fail("unable to cleanup: " + res);
}
}

// verify that the participant was called on the new endpoint and that the LRA cancelled
assertEquals(fallbackCompensations + 1, fallbackCompensateCount.get());
LRAStatus status = getStatus(new URI(lraUrl));
assertTrue("LRA should have cancelled", status == null || status == LRAStatus.Cancelled);
}

/**
* Run a loop of LRAs so that a debugger can watch memory
* @throws URISyntaxException
Expand Down
Loading

0 comments on commit 3acf8b9

Please sign in to comment.