Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure isolation level is applied to subsequent transactions after setting #202

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private final MySqlConnectionMetadata metadata;

private final IsolationLevel sessionLevel;
private volatile IsolationLevel sessionLevel;

private final QueryCache queryCache;

Expand Down Expand Up @@ -303,13 +303,27 @@ public IsolationLevel getTransactionIsolationLevel() {
return currentLevel;
}

/**
* Gets session transaction isolation level(Only for testing).
*
* @return session transaction isolation level.
*/
IsolationLevel getSessionTransactionIsolationLevel() {
return sessionLevel;
}

@Override
public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
requireNonNull(isolationLevel, "isolationLevel must not be null");

// Set next transaction isolation level.
return QueryFlow.executeVoid(client, "SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> setIsolationLevel(isolationLevel));
// Set subsequent transaction isolation level.
return QueryFlow.executeVoid(client, "SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> {
this.sessionLevel = isolationLevel;
if (!this.isInTransaction()) {
mirromutth marked this conversation as resolved.
Show resolved Hide resolved
this.currentLevel = isolationLevel;
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.asyncer.r2dbc.mysql.ConnectionContext;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.Objects;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -67,6 +68,23 @@ public Mono<ByteBuf> encode(ByteBufAllocator allocator, ConnectionContext contex
});
}

@Override
JohnNiang marked this conversation as resolved.
Show resolved Hide resolved
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TextQueryMessage that = (TextQueryMessage) o;
return Objects.equals(sql, that.sql);
}

@Override
public int hashCode() {
return Objects.hash(sql);
}

@Override
public String toString() {
return "TextQueryMessage{sql=REDACTED}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,51 @@ void transactionDefinitionIsolationLevel() {
}));
}

@Test
JohnNiang marked this conversation as resolved.
Show resolved Hide resolved
void setTransactionLevelNotInTransaction() {
complete(connection ->
// check initial session isolation level
Mono.fromSupplier(connection::getTransactionIsolationLevel)
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.rollbackTransaction())
.then(connection.setTransactionIsolationLevel(READ_COMMITTED))
// ensure that session isolation level is changed
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
// ensure transaction isolation level applies to subsequent transactions
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
);
}

@Test
void setTransactionLevelInTransaction() {
complete(connection ->
// check initial session transaction isolation level
Mono.fromSupplier(connection::getTransactionIsolationLevel)
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.beginTransaction())
.then(connection.setTransactionIsolationLevel(READ_COMMITTED))
// ensure that current transaction isolation level is not changed
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isNotEqualTo(READ_COMMITTED))
.then(connection.rollbackTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())
// ensure that session isolation level is changed after rollback
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
// ensure transaction isolation level applies to subsequent transactions
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
);
}

@Test
void transactionDefinition() {
// The WITH CONSISTENT SNAPSHOT phrase can only be used with the REPEATABLE READ isolation level.
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
import io.asyncer.r2dbc.mysql.cache.Caches;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.TextQueryMessage;
import io.r2dbc.spi.IsolationLevel;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Unit tests for {@link MySqlConnection}.
Expand Down Expand Up @@ -125,6 +132,18 @@ void badSetTransactionIsolationLevel() {
assertThatIllegalArgumentException().isThrownBy(() -> noPrepare.setTransactionIsolationLevel(null));
}

@Test
void shouldSetTransactionIsolationLevelSuccessfully() {
ClientMessage message = new TextQueryMessage("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE");
when(client.exchange(eq(message), any())).thenReturn(Flux.empty());

noPrepare.setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE)
.as(StepVerifier::create)
.verifyComplete();

assertThat(noPrepare.getSessionTransactionIsolationLevel()).isEqualTo(IsolationLevel.SERIALIZABLE);
}

@SuppressWarnings("ConstantConditions")
@Test
void badValidate() {
Expand Down