Skip to content

Commit

Permalink
Issue #3136 fixed handling of concurrent batch bundles when offloadin…
Browse files Browse the repository at this point in the history
…g is enabled (#3531)

* issue #3136 handle multiple transactions for batch bundles when offload is enabled

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>

* issue #3136 added system test for concurrent batch bundle processing

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>

* issue #3136 addressed review comments and translate transaction status code

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious authored Mar 30, 2022
1 parent e4731bf commit 0e9d6b3
Show file tree
Hide file tree
Showing 10 changed files with 560 additions and 57 deletions.
11 changes: 9 additions & 2 deletions fhir-client/src/main/java/com/ibm/fhir/client/FHIRClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2016, 2021
* (C) Copyright IBM Corp. 2016, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand All @@ -8,12 +8,13 @@

import java.security.KeyStore;

import jakarta.json.JsonObject;
import javax.ws.rs.client.WebTarget;

import com.ibm.fhir.model.resource.Bundle;
import com.ibm.fhir.model.resource.Resource;

import jakarta.json.JsonObject;

/**
* This interface provides a client API for invoking the FHIR Server's REST API.
*/
Expand Down Expand Up @@ -480,4 +481,10 @@ public interface FHIRClient {
* Allow the client consumer to be able to get and reuse the same TrustStore if necessary.
*/
KeyStore getTrustStore();

/**
* Get the value of the tenant name the client is currently configured to use
* @return
*/
String getTenantId();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2016, 2021
* (C) Copyright IBM Corp. 2016, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -1129,4 +1129,9 @@ public HTTPReturnPreference getHttpReturnPref() {
public void setHttpReturnPref(HTTPReturnPreference returnPref) {
this.httpReturnPref = returnPref;
}

@Override
public String getTenantId() {
return this.tenantId == null ? "default" : this.tenantId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.ibm.fhir.persistence.jdbc.connection;

import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -47,25 +48,25 @@ public class FHIRUserTransactionAdapter implements FHIRPersistenceTransaction {
// support nesting by tracking the number of begin/end requests
private int startCount;

// A handler to be called after a transaction has been rolled back
private final Runnable rolledBackHandler;
// A handler to be called after a transaction has completed
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
* @param tx
* @param syncRegistry
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler
*/
public FHIRUserTransactionAdapter(UserTransaction tx, TransactionSynchronizationRegistry syncRegistry, FHIRPersistenceJDBCCache cache,
String transactionDataKey, Runnable rolledBackHandler) {
String transactionDataKey, Consumer<Boolean> afterTransactionHandler) {
this.userTransaction = tx;
this.syncRegistry = syncRegistry;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
startedByThis = false;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

/**
Expand All @@ -91,7 +92,7 @@ public void begin() throws FHIRPersistenceException {
// On starting a new transaction, we need to register a callback so that
// the cache is informed when the transaction commits it can promote thread-local
// ids to the shared caches.
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler));
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler));

} catch (Exception x) {
log.log(Level.SEVERE, "failed to start transaction", x);
Expand All @@ -105,7 +106,7 @@ public void begin() throws FHIRPersistenceException {
// On starting a bulk transaction, we need to register a callback so that
// the cache is informed when the transaction commits it can promote thread-local
// ids to the shared caches.
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler));
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler));

// transaction is already active, so this is a nested request
this.startCount++;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* (C) Copyright IBM Corp. 2020, 2021
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.persistence.jdbc.connection;

import java.util.function.Consumer;

import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.UserTransaction;

Expand All @@ -26,7 +28,8 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory {

private final String transactionDataKey;

private final Runnable rolledBackHandler;
// Called after the transaction completes (true == committed; false == rolled back)
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
Expand All @@ -35,19 +38,19 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory {
* @param syncReg
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler a handler called after the transaction completes (true == committed; false == rolled back)
*/
public FHIRUserTransactionFactory(UserTransaction tx, TransactionSynchronizationRegistry syncReg, FHIRPersistenceJDBCCache cache, String transactionDataKey,
Runnable rolledBackHandler) {
Consumer<Boolean> afterTransactionHandler) {
this.userTransaction = tx;
this.syncRegistry = syncReg;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

@Override
public FHIRPersistenceTransaction create() {
return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, rolledBackHandler);
return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, afterTransactionHandler);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/*
* (C) Copyright IBM Corp. 2020
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.persistence.jdbc.impl;

import java.util.function.Consumer;
import java.util.logging.Logger;

import javax.transaction.Status;
Expand Down Expand Up @@ -33,23 +34,23 @@ public class CacheTransactionSync implements Synchronization {

private final String transactionDataKey;

// A callback when we hit a rollback
private final Runnable rolledBackHandler;
// Called after the transaction completes (true == committed; false == rolled back)
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
*
* @param txSyncRegistry
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler a handler called after the transaction completes (true == committed; false == rolled back)
*/
public CacheTransactionSync(TransactionSynchronizationRegistry txSyncRegistry, FHIRPersistenceJDBCCache cache, String transactionDataKey,
Runnable rolledBackHandler) {
Consumer<Boolean> afterTransactionHandler) {
this.txSyncRegistry = txSyncRegistry;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

@Override
Expand All @@ -71,14 +72,62 @@ public void beforeCompletion() {
public void afterCompletion(int status) {
if (status == Status.STATUS_COMMITTED) {
cache.transactionCommitted();
if (afterTransactionHandler != null) {
afterTransactionHandler.accept(Boolean.TRUE);
}
} else {
// probably a rollback, so throw away everything
logger.info("Transaction failed - afterCompletion(status = " + status + ")");
logger.info("Transaction failed - afterCompletion(status = " + translateStatus(status) + ")");
cache.transactionRolledBack();

if (rolledBackHandler != null) {
rolledBackHandler.run();
if (afterTransactionHandler != null) {
afterTransactionHandler.accept(Boolean.FALSE);
}
}
}

/**
* Translate the transaction Status value to a meaningful name
* @param status a value from {@link javax.transaction.Status}
* @return a string describing the transaction status
*/
public static String translateStatus(int status) {
String result;
switch (status) {
case Status.STATUS_ACTIVE:
result = "STATUS_ACTIVE";
break;
case Status.STATUS_MARKED_ROLLBACK:
result = "STATUS_MARKED_ROLLBACK";
break;
case Status.STATUS_PREPARED:
result = "STATUS_PREPARED";
break;
case Status.STATUS_COMMITTED:
result = "STATUS_COMMITTED";
break;
case Status.STATUS_ROLLEDBACK:
result = "STATUS_ROLLEDBACK";
break;
case Status.STATUS_UNKNOWN:
result = "STATUS_UNKNOWN";
break;
case Status.STATUS_NO_TRANSACTION:
result = "STATUS_NO_TRANSACTION";
break;
case Status.STATUS_PREPARING:
result = "STATUS_PREPARING";
break;
case Status.STATUS_COMMITTING:
result = "STATUS_COMMITTING";
break;
case Status.STATUS_ROLLING_BACK:
result = "STATUS_ROLLING_BACK";
break;
default:
result = "INVALID_" + status;
break;
}
return result;
}
}
Loading

0 comments on commit 0e9d6b3

Please sign in to comment.