Skip to content

Commit

Permalink
Fix concurrency issues in TransactionScope #29157
Browse files Browse the repository at this point in the history
  • Loading branch information
imperatorx committed Nov 22, 2022
1 parent 38acaed commit 700950c
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@
import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions;
import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.enterprise.context.ContextNotActiveException;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionScoped;

import org.eclipse.microprofile.context.ThreadContext;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
Expand All @@ -28,9 +35,17 @@

public class QuarkusTransactionTest {

private static AtomicInteger counter = new AtomicInteger();

@Inject
TransactionManager transactionManager;

@Inject
ThreadContext threadContext;

@Inject
TransactionScopedTestBean testBean;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));
Expand Down Expand Up @@ -224,6 +239,31 @@ public void testCallJoinExisting() throws SystemException {
});
}

@Test
public void testConcurrentTransactionScopedBeanCreation() {
// 1. A Transaction is activated in a parent thread.
QuarkusTransaction.run(() -> {
ExecutorService executor = Executors.newCachedThreadPool();
try {
// 2. The parent thread starts 2 child threads, and propagates the transaction.
// 3. The child threads access a @TransactionScoped bean concurrently,
// which has resource intensive producer simulated by sleep.
var f1 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork()));
var f2 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork()));

f1.get();
f2.get();
} catch (Throwable e) {
throw new AssertionError("Should not have thrown", e);
} finally {
executor.shutdownNow();
}
});

// 4. The race condition is handled correctly, the bean is only created once.
Assertions.assertEquals(1, counter.get());
}

TestSync register() {
TestSync t = new TestSync();
try {
Expand All @@ -249,4 +289,25 @@ public void afterCompletion(int status) {
}
}

static class TransactionScopedTestBean {
public void doWork() {

}
}

@Singleton
static class TransactionScopedTestBeanCreator {

@Produces
@TransactionScoped
TransactionScopedTestBean transactionScopedTestBean() {
try {
Thread.sleep(100);
} catch (Exception e) {

}
counter.incrementAndGet();
return new TransactionScopedTestBean();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -34,6 +36,8 @@ public class TransactionContext implements InjectableContext {
// marker object to be put as a key for SynchronizationRegistry to gather all beans created in the scope
private static final Object TRANSACTION_CONTEXT_MARKER = new Object();

private final Lock transactionLock = new ReentrantLock();

private final LazyValue<TransactionSynchronizationRegistry> transactionSynchronizationRegistry = new LazyValue<>(
new Supplier<TransactionSynchronizationRegistry>() {
@Override
Expand Down Expand Up @@ -107,26 +111,45 @@ public <T> T get(Contextual<T> contextual, CreationalContext<T> creationalContex
throw new IllegalArgumentException("Contextual parameter must not be null");
}

TransactionSynchronizationRegistry registryInstance = transactionSynchronizationRegistry.get();
TransactionContextState contextState;
contextState = (TransactionContextState) transactionSynchronizationRegistry.get()
.getResource(TRANSACTION_CONTEXT_MARKER);

if (contextState == null) {
contextState = new TransactionContextState(getCurrentTransaction());
transactionSynchronizationRegistry.get().putResource(TRANSACTION_CONTEXT_MARKER, contextState);
// Prevent concurrent contextState creation from multiple threads sharing the same transaction,
// since TransactionSynchronizationRegistry has no atomic compute if absent mechanism.
transactionLock.lock();

try {
contextState = (TransactionContextState) registryInstance.getResource(TRANSACTION_CONTEXT_MARKER);

if (contextState == null) {
contextState = new TransactionContextState(getCurrentTransaction());
registryInstance.putResource(TRANSACTION_CONTEXT_MARKER, contextState);
}

} finally {
transactionLock.unlock();
}

ContextInstanceHandle<T> instanceHandle = contextState.get(contextual);
if (instanceHandle != null) {
return instanceHandle.get();
} else if (creationalContext != null) {
T createdInstance = contextual.create(creationalContext);
instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean<T>) contextual, createdInstance,
creationalContext);

contextState.put(contextual, instanceHandle);
Lock beanLock = contextState.getLock();
beanLock.lock();
try {
instanceHandle = contextState.get(contextual);
if (instanceHandle != null) {
return instanceHandle.get();
}

return createdInstance;
T createdInstance = contextual.create(creationalContext);
instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean<T>) contextual, createdInstance,
creationalContext);
contextState.put(contextual, instanceHandle);
return createdInstance;
} finally {
beanLock.unlock();
}
} else {
return null;
}
Expand Down Expand Up @@ -175,6 +198,8 @@ private Transaction getCurrentTransaction() {
*/
private static class TransactionContextState implements ContextState, Synchronization {

private final Lock lock = new ReentrantLock();

private final ConcurrentMap<Contextual<?>, ContextInstanceHandle<?>> mapBeanToInstanceHandle = new ConcurrentHashMap<>();

TransactionContextState(Transaction transaction) {
Expand Down Expand Up @@ -246,5 +271,14 @@ public void beforeCompletion() {
public void afterCompletion(int status) {
this.destroy();
}

/**
* Gets the lock associated with this ContextState for synchronization purposes
*
* @return the lock for this ContextState
*/
public Lock getLock() {
return lock;
}
}
}

0 comments on commit 700950c

Please sign in to comment.