Skip to content

Commit

Permalink
Ensure isolation level is applied to subsequent transactions after se…
Browse files Browse the repository at this point in the history
…tting
  • Loading branch information
JohnNiang committed Jan 16, 2024
1 parent 96c11dc commit 919058b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private final MySqlConnectionMetadata metadata;

private final IsolationLevel sessionLevel;
private volatile IsolationLevel sessionLevel;

private final QueryCache queryCache;

Expand Down Expand Up @@ -303,13 +303,22 @@ public IsolationLevel getTransactionIsolationLevel() {
return currentLevel;
}

/**
* Gets session transaction isolation level(Only for testing).
*
* @return session transaction isolation level.
*/
IsolationLevel getSessionTransactionIsolationLevel() {
return sessionLevel;
}

@Override
public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
requireNonNull(isolationLevel, "isolationLevel must not be null");

// Set next transaction isolation level.
return QueryFlow.executeVoid(client, "SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> setIsolationLevel(isolationLevel));
// Set subsequent transaction isolation level.
return QueryFlow.executeVoid(client, "SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> this.sessionLevel = isolationLevel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.asyncer.r2dbc.mysql.ConnectionContext;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.Objects;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -67,6 +68,23 @@ public Mono<ByteBuf> encode(ByteBufAllocator allocator, ConnectionContext contex
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TextQueryMessage that = (TextQueryMessage) o;
return Objects.equals(sql, that.sql);
}

@Override
public int hashCode() {
return Objects.hash(sql);
}

@Override
public String toString() {
return "TextQueryMessage{sql=REDACTED}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,31 @@ void transactionDefinitionIsolationLevel() {
}));
}

@Test
void shouldApplyIsolationLevelForSubsequentTransactions() {
complete(connection -> {
Mono<Void> setTransactionIsolationLevel =
connection.setTransactionIsolationLevel(READ_COMMITTED);

Mono<Void> beginTransaction =
connection.beginTransaction(MySqlTransactionDefinition.builder()
.isolationLevel(SERIALIZABLE)
.build());

Mono<Void> rollbackTransaction = connection.rollbackTransaction();

return setTransactionIsolationLevel
.then(beginTransaction)
.doOnSuccess(ignored ->
assertThat(connection.getTransactionIsolationLevel()).isEqualTo(SERIALIZABLE)
)
.then(rollbackTransaction)
.doOnSuccess(ignored ->
assertThat(connection.getTransactionIsolationLevel()).isEqualTo(READ_COMMITTED)
);
});
}

@Test
void transactionDefinition() {
// The WITH CONSISTENT SNAPSHOT phrase can only be used with the REPEATABLE READ isolation level.
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
import io.asyncer.r2dbc.mysql.cache.Caches;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.TextQueryMessage;
import io.r2dbc.spi.IsolationLevel;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Unit tests for {@link MySqlConnection}.
Expand Down Expand Up @@ -125,6 +133,18 @@ void badSetTransactionIsolationLevel() {
assertThatIllegalArgumentException().isThrownBy(() -> noPrepare.setTransactionIsolationLevel(null));
}

@Test
void shouldSetTransactionIsolationLevelSuccessfully() {
ClientMessage message = new TextQueryMessage("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE");
when(client.exchange(eq(message), any())).thenReturn(Flux.empty());

noPrepare.setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE)
.as(StepVerifier::create)
.verifyComplete();

assertThat(noPrepare.getSessionTransactionIsolationLevel()).isEqualTo(IsolationLevel.SERIALIZABLE);
}

@SuppressWarnings("ConstantConditions")
@Test
void badValidate() {
Expand Down

0 comments on commit 919058b

Please sign in to comment.