From 700950cea39da0b787944731f38e0ff8053fb2ff Mon Sep 17 00:00:00 2001 From: imperatorx Date: Tue, 22 Nov 2022 21:08:17 +0100 Subject: [PATCH] Fix concurrency issues in TransactionScope #29157 --- .../quarkus/QuarkusTransactionTest.java | 61 +++++++++++++++++++ .../runtime/context/TransactionContext.java | 56 +++++++++++++---- 2 files changed, 106 insertions(+), 11 deletions(-) diff --git a/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java index 83b77c91ea8a0..b25237f4b2baf 100644 --- a/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java +++ b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java @@ -3,17 +3,24 @@ import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions; import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.enterprise.context.ContextNotActiveException; import javax.enterprise.context.control.ActivateRequestContext; +import javax.enterprise.inject.Produces; import javax.inject.Inject; +import javax.inject.Singleton; import javax.transaction.RollbackException; import javax.transaction.Status; import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.TransactionManager; +import javax.transaction.TransactionScoped; +import org.eclipse.microprofile.context.ThreadContext; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Assertions; @@ -28,9 +35,17 @@ public class QuarkusTransactionTest { + private static AtomicInteger counter = new AtomicInteger(); + @Inject TransactionManager transactionManager; + @Inject + ThreadContext threadContext; + + @Inject + TransactionScopedTestBean testBean; + @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest() .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)); @@ -224,6 +239,31 @@ public void testCallJoinExisting() throws SystemException { }); } + @Test + public void testConcurrentTransactionScopedBeanCreation() { + // 1. A Transaction is activated in a parent thread. + QuarkusTransaction.run(() -> { + ExecutorService executor = Executors.newCachedThreadPool(); + try { + // 2. The parent thread starts 2 child threads, and propagates the transaction. + // 3. The child threads access a @TransactionScoped bean concurrently, + // which has resource intensive producer simulated by sleep. + var f1 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork())); + var f2 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork())); + + f1.get(); + f2.get(); + } catch (Throwable e) { + throw new AssertionError("Should not have thrown", e); + } finally { + executor.shutdownNow(); + } + }); + + // 4. The race condition is handled correctly, the bean is only created once. + Assertions.assertEquals(1, counter.get()); + } + TestSync register() { TestSync t = new TestSync(); try { @@ -249,4 +289,25 @@ public void afterCompletion(int status) { } } + static class TransactionScopedTestBean { + public void doWork() { + + } + } + + @Singleton + static class TransactionScopedTestBeanCreator { + + @Produces + @TransactionScoped + TransactionScopedTestBean transactionScopedTestBean() { + try { + Thread.sleep(100); + } catch (Exception e) { + + } + counter.incrementAndGet(); + return new TransactionScopedTestBean(); + } + } } diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/context/TransactionContext.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/context/TransactionContext.java index 9553ead1858ee..f8fe96f29ffd3 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/context/TransactionContext.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/context/TransactionContext.java @@ -4,6 +4,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -34,6 +36,8 @@ public class TransactionContext implements InjectableContext { // marker object to be put as a key for SynchronizationRegistry to gather all beans created in the scope private static final Object TRANSACTION_CONTEXT_MARKER = new Object(); + private final Lock transactionLock = new ReentrantLock(); + private final LazyValue transactionSynchronizationRegistry = new LazyValue<>( new Supplier() { @Override @@ -107,26 +111,45 @@ public T get(Contextual contextual, CreationalContext creationalContex throw new IllegalArgumentException("Contextual parameter must not be null"); } + TransactionSynchronizationRegistry registryInstance = transactionSynchronizationRegistry.get(); TransactionContextState contextState; - contextState = (TransactionContextState) transactionSynchronizationRegistry.get() - .getResource(TRANSACTION_CONTEXT_MARKER); - if (contextState == null) { - contextState = new TransactionContextState(getCurrentTransaction()); - transactionSynchronizationRegistry.get().putResource(TRANSACTION_CONTEXT_MARKER, contextState); + // Prevent concurrent contextState creation from multiple threads sharing the same transaction, + // since TransactionSynchronizationRegistry has no atomic compute if absent mechanism. + transactionLock.lock(); + + try { + contextState = (TransactionContextState) registryInstance.getResource(TRANSACTION_CONTEXT_MARKER); + + if (contextState == null) { + contextState = new TransactionContextState(getCurrentTransaction()); + registryInstance.putResource(TRANSACTION_CONTEXT_MARKER, contextState); + } + + } finally { + transactionLock.unlock(); } ContextInstanceHandle instanceHandle = contextState.get(contextual); if (instanceHandle != null) { return instanceHandle.get(); } else if (creationalContext != null) { - T createdInstance = contextual.create(creationalContext); - instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean) contextual, createdInstance, - creationalContext); - - contextState.put(contextual, instanceHandle); + Lock beanLock = contextState.getLock(); + beanLock.lock(); + try { + instanceHandle = contextState.get(contextual); + if (instanceHandle != null) { + return instanceHandle.get(); + } - return createdInstance; + T createdInstance = contextual.create(creationalContext); + instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean) contextual, createdInstance, + creationalContext); + contextState.put(contextual, instanceHandle); + return createdInstance; + } finally { + beanLock.unlock(); + } } else { return null; } @@ -175,6 +198,8 @@ private Transaction getCurrentTransaction() { */ private static class TransactionContextState implements ContextState, Synchronization { + private final Lock lock = new ReentrantLock(); + private final ConcurrentMap, ContextInstanceHandle> mapBeanToInstanceHandle = new ConcurrentHashMap<>(); TransactionContextState(Transaction transaction) { @@ -246,5 +271,14 @@ public void beforeCompletion() { public void afterCompletion(int status) { this.destroy(); } + + /** + * Gets the lock associated with this ContextState for synchronization purposes + * + * @return the lock for this ContextState + */ + public Lock getLock() { + return lock; + } } }