From 919058b801ff7d2afec8e1d91cd2e62ee74c6228 Mon Sep 17 00:00:00 2001 From: John Niang Date: Tue, 16 Jan 2024 23:07:10 +0800 Subject: [PATCH] Ensure isolation level is applied to subsequent transactions after setting --- .../asyncer/r2dbc/mysql/MySqlConnection.java | 17 ++++++++++--- .../message/client/TextQueryMessage.java | 18 +++++++++++++ .../mysql/ConnectionIntegrationTest.java | 25 +++++++++++++++++++ .../r2dbc/mysql/MySqlConnectionTest.java | 20 +++++++++++++++ 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index b24f7edb3..4c1f2cc07 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -129,7 +129,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS private final MySqlConnectionMetadata metadata; - private final IsolationLevel sessionLevel; + private volatile IsolationLevel sessionLevel; private final QueryCache queryCache; @@ -303,13 +303,22 @@ public IsolationLevel getTransactionIsolationLevel() { return currentLevel; } + /** + * Gets session transaction isolation level(Only for testing). + * + * @return session transaction isolation level. + */ + IsolationLevel getSessionTransactionIsolationLevel() { + return sessionLevel; + } + @Override public Mono setTransactionIsolationLevel(IsolationLevel isolationLevel) { requireNonNull(isolationLevel, "isolationLevel must not be null"); - // Set next transaction isolation level. - return QueryFlow.executeVoid(client, "SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()) - .doOnSuccess(ignored -> setIsolationLevel(isolationLevel)); + // Set subsequent transaction isolation level. + return QueryFlow.executeVoid(client, "SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()) + .doOnSuccess(ignored -> this.sessionLevel = isolationLevel); } @Override diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/TextQueryMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/TextQueryMessage.java index e1646aa90..382969f00 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/TextQueryMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/TextQueryMessage.java @@ -19,6 +19,7 @@ import io.asyncer.r2dbc.mysql.ConnectionContext; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import java.util.Objects; import reactor.core.publisher.Mono; import java.nio.charset.Charset; @@ -67,6 +68,23 @@ public Mono encode(ByteBufAllocator allocator, ConnectionContext contex }); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TextQueryMessage that = (TextQueryMessage) o; + return Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(sql); + } + @Override public String toString() { return "TextQueryMessage{sql=REDACTED}"; diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index 075604031..c5e2c12cd 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -139,6 +139,31 @@ void transactionDefinitionIsolationLevel() { })); } + @Test + void shouldApplyIsolationLevelForSubsequentTransactions() { + complete(connection -> { + Mono setTransactionIsolationLevel = + connection.setTransactionIsolationLevel(READ_COMMITTED); + + Mono beginTransaction = + connection.beginTransaction(MySqlTransactionDefinition.builder() + .isolationLevel(SERIALIZABLE) + .build()); + + Mono rollbackTransaction = connection.rollbackTransaction(); + + return setTransactionIsolationLevel + .then(beginTransaction) + .doOnSuccess(ignored -> + assertThat(connection.getTransactionIsolationLevel()).isEqualTo(SERIALIZABLE) + ) + .then(rollbackTransaction) + .doOnSuccess(ignored -> + assertThat(connection.getTransactionIsolationLevel()).isEqualTo(READ_COMMITTED) + ); + }); + } + @Test void transactionDefinition() { // The WITH CONSISTENT SNAPSHOT phrase can only be used with the REPEATABLE READ isolation level. diff --git a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java index b82f1154d..b201db0df 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java @@ -19,13 +19,21 @@ import io.asyncer.r2dbc.mysql.cache.Caches; import io.asyncer.r2dbc.mysql.client.Client; import io.asyncer.r2dbc.mysql.codec.Codecs; +import io.asyncer.r2dbc.mysql.message.client.ClientMessage; +import io.asyncer.r2dbc.mysql.message.client.TextQueryMessage; import io.r2dbc.spi.IsolationLevel; import org.assertj.core.api.ThrowableTypeAssert; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests for {@link MySqlConnection}. @@ -125,6 +133,18 @@ void badSetTransactionIsolationLevel() { assertThatIllegalArgumentException().isThrownBy(() -> noPrepare.setTransactionIsolationLevel(null)); } + @Test + void shouldSetTransactionIsolationLevelSuccessfully() { + ClientMessage message = new TextQueryMessage("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + when(client.exchange(eq(message), any())).thenReturn(Flux.empty()); + + noPrepare.setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE) + .as(StepVerifier::create) + .verifyComplete(); + + assertThat(noPrepare.getSessionTransactionIsolationLevel()).isEqualTo(IsolationLevel.SERIALIZABLE); + } + @SuppressWarnings("ConstantConditions") @Test void badValidate() {