Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HHH-14326 + HHH-14404 Connection closing fixes #3693

Closed
wants to merge 6 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.hibernate.query.spi.QueryImplementor;
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.resource.jdbc.spi.JdbcSessionContext;
import org.hibernate.resource.jdbc.spi.PhysicalConnectionHandlingMode;
import org.hibernate.resource.jdbc.spi.StatementInspector;
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorImpl;
import org.hibernate.resource.transaction.spi.TransactionCoordinator;
Expand Down Expand Up @@ -134,6 +135,7 @@ public abstract class AbstractSharedSessionContract implements SharedSessionCont

private FlushMode flushMode;
private boolean autoJoinTransactions;
private final PhysicalConnectionHandlingMode connectionHandlingMode;

private CacheMode cacheMode;

Expand Down Expand Up @@ -181,11 +183,9 @@ public AbstractSharedSessionContract(SessionFactoryImpl factory, SessionCreation
sessionEventsManager = new SessionEventListenerManagerImpl( customSessionEventListener.toArray( new SessionEventListener[0] ) );
}

final StatementInspector statementInspector = interpret( options.getStatementInspector() );
this.jdbcSessionContext = new JdbcSessionContextImpl( this, statementInspector, fastSessionServices );

this.entityNameResolver = new CoordinatingEntityNameResolver( factory, interceptor );

final StatementInspector statementInspector = interpret( options.getStatementInspector() );
if ( options instanceof SharedSessionCreationOptions && ( (SharedSessionCreationOptions) options ).isTransactionCoordinatorShared() ) {
if ( options.getConnection() != null ) {
throw new SessionException( "Cannot simultaneously share transaction context and specify connection" );
Expand All @@ -207,18 +207,27 @@ public AbstractSharedSessionContract(SessionFactoryImpl factory, SessionCreation
);
autoJoinTransactions = false;
}
if ( sharedOptions.getPhysicalConnectionHandlingMode() != this.jdbcCoordinator.getLogicalConnection().getConnectionHandlingMode() ) {
this.connectionHandlingMode = this.jdbcCoordinator.getLogicalConnection().getConnectionHandlingMode();
if ( sharedOptions.getPhysicalConnectionHandlingMode() != this.connectionHandlingMode ) {
log.debug(
"Session creation specified 'PhysicalConnectionHandlingMode which is invalid in conjunction " +
"with sharing JDBC connection between sessions; ignoring"
);
}

this.jdbcSessionContext = new JdbcSessionContextImpl( this, statementInspector,
connectionHandlingMode, fastSessionServices );

addSharedSessionTransactionObserver( transactionCoordinator );
}
else {
this.isTransactionCoordinatorShared = false;
this.autoJoinTransactions = options.shouldAutoJoinTransactions();
this.connectionHandlingMode = options.getPhysicalConnectionHandlingMode();
this.jdbcSessionContext = new JdbcSessionContextImpl( this, statementInspector,
connectionHandlingMode, fastSessionServices );
// This must happen *after* the JdbcSessionContext was initialized,
// because some of the calls below retrieve this context indirectly through Session getters.
this.jdbcCoordinator = new JdbcCoordinatorImpl( options.getConnection(), this, fastSessionServices.jdbcServices );
this.transactionCoordinator = fastSessionServices.transactionCoordinatorBuilder.buildTransactionCoordinator( jdbcCoordinator, this );
}
Expand Down Expand Up @@ -1236,7 +1245,8 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound
factory = SessionFactoryImpl.deserialize( ois );
fastSessionServices = factory.getFastSessionServices();
sessionEventsManager = new SessionEventListenerManagerImpl( fastSessionServices.defaultSessionEventListeners.buildBaseline() );
jdbcSessionContext = new JdbcSessionContextImpl( this, (StatementInspector) ois.readObject(), fastSessionServices );
jdbcSessionContext = new JdbcSessionContextImpl( this, (StatementInspector) ois.readObject(),
connectionHandlingMode, fastSessionServices );
jdbcCoordinator = JdbcCoordinatorImpl.deserialize( ois, this );

cacheTransactionSync = factory.getCache().getRegionFactory().createTransactionContext( this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class JdbcSessionContextImpl implements JdbcSessionContext {
public JdbcSessionContextImpl(
SharedSessionContractImplementor session,
StatementInspector statementInspector,
PhysicalConnectionHandlingMode connectionHandlingMode,
FastSessionServices fastSessionServices) {
this.sessionFactory = session.getFactory();
this.statementInspector = statementInspector;
this.connectionHandlingMode = settings().getPhysicalConnectionHandlingMode();
this.connectionHandlingMode = connectionHandlingMode;
this.serviceRegistry = sessionFactory.getServiceRegistry();
this.jdbcObserver = new JdbcObserverImpl( session, fastSessionServices );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ public StatementInspector getStatementInspector() {

@Override
public PhysicalConnectionHandlingMode getPhysicalConnectionHandlingMode() {
return null;
return sessionFactory.getSessionFactoryOptions().getPhysicalConnectionHandlingMode();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,34 @@ public void manualReconnect(Connection suppliedConnection) {
}

private void releaseConnection() {
//Some managed containers might trigger this release concurrently:
//this is not how they should do things, still we make a local
//copy of the variable to prevent confusing errors due to a race conditions
//(to trigger a more clear error, if any).
final Connection localVariableConnection = this.physicalConnection;
if ( localVariableConnection == null ) {
return;
}

// We need to set the connection to null before we release resources,
// in order to prevent recursion into this method.
// Recursion can happen when we release resources and when batch statements are in progress:
// when releasing resources, we'll abort the batch statement,
// which will trigger "logicalConnection.afterStatement()",
// which in some configurations will release the connection.
this.physicalConnection = null;
try {
if ( ! localVariableConnection.isClosed() ) {
sqlExceptionHelper.logAndClearWarnings( localVariableConnection );
try {
getResourceRegistry().releaseResources();
if ( !localVariableConnection.isClosed() ) {
sqlExceptionHelper.logAndClearWarnings( localVariableConnection );
}
}
finally {
jdbcConnectionAccess.releaseConnection( localVariableConnection );
}
jdbcConnectionAccess.releaseConnection( localVariableConnection );
}
catch (SQLException e) {
throw sqlExceptionHelper.convert( e, "Unable to release JDBC Connection" );
}
finally {
observer.jdbcConnectionReleaseEnd();
boolean concurrentUsageDetected = ( this.physicalConnection == null );
this.physicalConnection = null;
getResourceRegistry().releaseResources();
if ( concurrentUsageDetected ) {
throw new HibernateException( "Detected concurrent management of connection resources." +
" This might indicate a multi-threaded use of Hibernate in combination with managed resources, which is not supported." );
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,98 @@

package org.hibernate.test.connections;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

import org.hibernate.Session;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.dialect.H2Dialect;
import org.hibernate.engine.jdbc.connections.internal.UserSuppliedConnectionProviderImpl;
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.jpa.test.BaseEntityManagerFunctionalTestCase;
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
import org.hibernate.resource.jdbc.spi.PhysicalConnectionHandlingMode;

import org.hibernate.testing.RequiresDialect;
import org.hibernate.testing.TestForIssue;
import org.hibernate.testing.env.ConnectionProviderBuilder;
import org.hibernate.testing.jta.TestingJtaBootstrap;
import org.hibernate.testing.jta.TestingJtaPlatformImpl;
import org.hibernate.testing.transaction.TransactionUtil;
import org.hibernate.testing.junit4.CustomParameterized;
import org.hibernate.testing.transaction.TransactionUtil2;
import org.junit.Rule;
import org.junit.Test;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

/**
* @author Luis Barreiro
*/
@RequiresDialect( H2Dialect.class )
@RunWith(CustomParameterized.class)
public class BeforeCompletionReleaseTest extends BaseEntityManagerFunctionalTestCase {

@Parameterized.Parameters(name = "{0}")
public static List<Object[]> params() {
return Arrays.asList( new Object[][] {
{
"Setting connection handling mode from properties",
PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_RELEASE_BEFORE_TRANSACTION_COMPLETION,
null
},
{
"Setting connection handling mode through SessionBuilder",
PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_RELEASE_AFTER_STATEMENT,
PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_RELEASE_BEFORE_TRANSACTION_COMPLETION
}
} );
}

@Rule
public MockitoRule mockito = MockitoJUnit.rule().strictness( Strictness.STRICT_STUBS );

private final PhysicalConnectionHandlingMode connectionHandlingModeInProperties;
private final PhysicalConnectionHandlingMode connectionHandlingModeInSessionBuilder;

public BeforeCompletionReleaseTest(
String ignoredTestLabel, PhysicalConnectionHandlingMode connectionHandlingModeInProperties,
PhysicalConnectionHandlingMode connectionHandlingModeInSessionBuilder) {
this.connectionHandlingModeInProperties = connectionHandlingModeInProperties;
this.connectionHandlingModeInSessionBuilder = connectionHandlingModeInSessionBuilder;
}

@Override
protected Map getConfig() {
Map config = super.getConfig();
TestingJtaBootstrap.prepare( config );
config.put( AvailableSettings.CONNECTION_PROVIDER, new ConnectionProviderDecorator() );
config.put( AvailableSettings.CONNECTION_HANDLING, PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_RELEASE_BEFORE_TRANSACTION_COMPLETION );
if ( connectionHandlingModeInProperties != null ) {
config.put( AvailableSettings.CONNECTION_HANDLING, connectionHandlingModeInProperties );
}
return config;
}

Expand All @@ -56,12 +108,49 @@ protected Class<?>[] getAnnotatedClasses() {
}

@Test
public void testConnectionAcquisitionCount() {
TransactionUtil.doInJPA( this::entityManagerFactory, entityManager -> {
Thing thing = new Thing();
thing.setId( 1 );
entityManager.persist( thing );
});
@TestForIssue(jiraKey = {"HHH-13976", "HHH-14326"})
public void testResourcesReleasedThenConnectionClosedThenCommit() throws SQLException, XAException {
XAResource transactionSpy = mock( XAResource.class );
Connection[] connectionSpies = new Connection[1];
Statement statementMock = Mockito.mock( Statement.class );

try (SessionImplementor s = (SessionImplementor) openSession()) {
TransactionUtil2.inTransaction( s, session -> {
spyOnTransaction( transactionSpy );

Thing thing = new Thing();
thing.setId( 1 );
session.persist( thing );

LogicalConnectionImplementor logicalConnection = session.getJdbcCoordinator().getLogicalConnection();
logicalConnection.getResourceRegistry().register( statementMock, true );
connectionSpies[0] = logicalConnection.getPhysicalConnection();
} );
}

Connection connectionSpy = connectionSpies[0];

// Must close the resources, then the connection, then commit
InOrder inOrder = inOrder( statementMock, connectionSpy, transactionSpy );
inOrder.verify( statementMock ).close();
inOrder.verify( connectionSpy ).close();
inOrder.verify( transactionSpy ).commit( any(), anyBoolean() );
}
yrodiere marked this conversation as resolved.
Show resolved Hide resolved

private void spyOnTransaction(XAResource xaResource) {
try {
TestingJtaPlatformImpl.transactionManager().getTransaction().enlistResource( xaResource );
}
catch (RollbackException | SystemException e) {
throw new IllegalStateException( e );
}
}

private Session openSession() {
return connectionHandlingModeInSessionBuilder == null
? entityManagerFactory().openSession()
: entityManagerFactory().withOptions().connectionHandlingMode( connectionHandlingModeInSessionBuilder )
.openSession();
}

// --- //
Expand Down Expand Up @@ -94,63 +183,7 @@ public ConnectionProviderDecorator() {

@Override
public Connection getConnection() throws SQLException {
Connection connection = dataSource.getConnection();

try {
Transaction tx = TestingJtaPlatformImpl.transactionManager().getTransaction();
if ( tx != null) {
tx.enlistResource( new XAResource() {

@Override public void commit(Xid xid, boolean onePhase) {
try {
assertTrue( "Connection should be closed prior to commit", connection.isClosed() );
} catch ( SQLException e ) {
fail( "Unexpected SQLException: " + e.getMessage() );
}
}

@Override public void end(Xid xid, int flags) {
}

@Override public void forget(Xid xid) {
}

@Override public int getTransactionTimeout() {
return 0;
}

@Override public boolean isSameRM(XAResource xares) {
return false;
}

@Override public int prepare(Xid xid) {
return 0;
}

@Override public Xid[] recover(int flag) {
return new Xid[0];
}

@Override public void rollback(Xid xid) {
try {
assertTrue( "Connection should be closed prior to rollback", connection.isClosed() );
} catch ( SQLException e ) {
fail( "Unexpected SQLException: " + e.getMessage() );
}
}

@Override public boolean setTransactionTimeout(int seconds) {
return false;
}

@Override public void start(Xid xid, int flags) {
}
});
}
} catch ( SystemException | RollbackException e ) {
fail( e.getMessage() );
}
return connection;
return spy( dataSource.getConnection() );
}

@Override
Expand All @@ -159,3 +192,4 @@ public void closeConnection(Connection connection) throws SQLException {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* The key of a JIRA issue tested.
* @return The jira issue key
*/
String jiraKey();
String[] jiraKey();
}