Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency bug in TransactionScoped beans initialization #29159

Merged
merged 1 commit into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@
import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions;
import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.enterprise.context.ContextNotActiveException;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionScoped;

import org.eclipse.microprofile.context.ThreadContext;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
Expand All @@ -28,9 +35,17 @@

public class QuarkusTransactionTest {

private static AtomicInteger counter = new AtomicInteger();

@Inject
TransactionManager transactionManager;

@Inject
ThreadContext threadContext;

@Inject
TransactionScopedTestBean testBean;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));
Expand Down Expand Up @@ -224,6 +239,31 @@ public void testCallJoinExisting() throws SystemException {
});
}

@Test
public void testConcurrentTransactionScopedBeanCreation() {
// 1. A Transaction is activated in a parent thread.
QuarkusTransaction.run(() -> {
ExecutorService executor = Executors.newCachedThreadPool();
try {
// 2. The parent thread starts 2 child threads, and propagates the transaction.
// 3. The child threads access a @TransactionScoped bean concurrently,
// which has resource intensive producer simulated by sleep.
var f1 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork()));
var f2 = executor.submit(threadContext.contextualRunnable(() -> testBean.doWork()));

f1.get();
f2.get();
} catch (Throwable e) {
throw new AssertionError("Should not have thrown", e);
} finally {
executor.shutdownNow();
}
});

// 4. The race condition is handled correctly, the bean is only created once.
Assertions.assertEquals(1, counter.get());
}

TestSync register() {
TestSync t = new TestSync();
try {
Expand All @@ -249,4 +289,25 @@ public void afterCompletion(int status) {
}
}

static class TransactionScopedTestBean {
public void doWork() {

}
}

@Singleton
static class TransactionScopedTestBeanCreator {

@Produces
@TransactionScoped
TransactionScopedTestBean transactionScopedTestBean() {
try {
Thread.sleep(100);
} catch (Exception e) {

}
counter.incrementAndGet();
return new TransactionScopedTestBean();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -34,6 +36,8 @@ public class TransactionContext implements InjectableContext {
// marker object to be put as a key for SynchronizationRegistry to gather all beans created in the scope
private static final Object TRANSACTION_CONTEXT_MARKER = new Object();

private final Lock transactionLock = new ReentrantLock();

private final LazyValue<TransactionSynchronizationRegistry> transactionSynchronizationRegistry = new LazyValue<>(
new Supplier<TransactionSynchronizationRegistry>() {
@Override
Expand Down Expand Up @@ -107,26 +111,45 @@ public <T> T get(Contextual<T> contextual, CreationalContext<T> creationalContex
throw new IllegalArgumentException("Contextual parameter must not be null");
}

TransactionSynchronizationRegistry registryInstance = transactionSynchronizationRegistry.get();
TransactionContextState contextState;
contextState = (TransactionContextState) transactionSynchronizationRegistry.get()
.getResource(TRANSACTION_CONTEXT_MARKER);

if (contextState == null) {
contextState = new TransactionContextState(getCurrentTransaction());
transactionSynchronizationRegistry.get().putResource(TRANSACTION_CONTEXT_MARKER, contextState);
// Prevent concurrent contextState creation from multiple threads sharing the same transaction,
// since TransactionSynchronizationRegistry has no atomic compute if absent mechanism.
transactionLock.lock();

try {
contextState = (TransactionContextState) registryInstance.getResource(TRANSACTION_CONTEXT_MARKER);

if (contextState == null) {
contextState = new TransactionContextState(getCurrentTransaction());
registryInstance.putResource(TRANSACTION_CONTEXT_MARKER, contextState);
}

} finally {
transactionLock.unlock();
}

ContextInstanceHandle<T> instanceHandle = contextState.get(contextual);
if (instanceHandle != null) {
return instanceHandle.get();
} else if (creationalContext != null) {
T createdInstance = contextual.create(creationalContext);
instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean<T>) contextual, createdInstance,
creationalContext);

contextState.put(contextual, instanceHandle);
Lock beanLock = contextState.getLock();
beanLock.lock();
try {
instanceHandle = contextState.get(contextual);
if (instanceHandle != null) {
return instanceHandle.get();
}

return createdInstance;
T createdInstance = contextual.create(creationalContext);
instanceHandle = new ContextInstanceHandleImpl<>((InjectableBean<T>) contextual, createdInstance,
creationalContext);
contextState.put(contextual, instanceHandle);
return createdInstance;
} finally {
beanLock.unlock();
}
} else {
return null;
}
Expand Down Expand Up @@ -175,6 +198,8 @@ private Transaction getCurrentTransaction() {
*/
private static class TransactionContextState implements ContextState, Synchronization {

private final Lock lock = new ReentrantLock();

private final ConcurrentMap<Contextual<?>, ContextInstanceHandle<?>> mapBeanToInstanceHandle = new ConcurrentHashMap<>();

TransactionContextState(Transaction transaction) {
Expand Down Expand Up @@ -246,5 +271,14 @@ public void beforeCompletion() {
public void afterCompletion(int status) {
this.destroy();
}

/**
* Gets the lock associated with this ContextState for synchronization purposes
*
* @return the lock for this ContextState
*/
public Lock getLock() {
return lock;
}
}
}