diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 92ea0aa56..f83c76c38 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -184,22 +184,12 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS @Override public Mono beginTransaction() { - return Mono.defer(() -> { - if (isInTransaction()) { - return Mono.empty(); - } - - return QueryFlow.executeVoid(client, "BEGIN"); - }); + return beginTransaction(MySqlTransactionDefinition.empty()); } @Override public Mono beginTransaction(TransactionDefinition definition) { return Mono.defer(() -> { - if (isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.beginTransaction(client, this, batchSupported, definition); }); } @@ -219,10 +209,6 @@ public Mono close() { @Override public Mono commitTransaction() { return Mono.defer(() -> { - if (!isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported); }); } @@ -300,10 +286,6 @@ public Mono releaseSavepoint(String name) { @Override public Mono rollbackTransaction() { return Mono.defer(() -> { - if (!isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported); }); } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index cb51f38ee..5063c29f3 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -292,6 +292,69 @@ void errorPropagteRequestQueue() { ); } + @Test + void commitTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(Flux.merge( + connection.beginTransaction(), + connection.createStatement("INSERT INTO test VALUES (1, 'test1')") + .execute(), + connection.commitTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> + Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + ) + .doOnNext(text -> assertThat(text).isEqualTo(1L)) + ); + } + + @Test + void rollbackTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(Flux.merge( + connection.beginTransaction(), + connection.createStatement("INSERT INTO test VALUES (1, 'test1')") + .execute(), + connection.rollbackTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + .doOnNext(count -> assertThat(count).isEqualTo(0L))) + ); + } + + @Test + void beginTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .then(Mono.from(connection.beginTransaction())) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue()) + .thenMany(Flux.merge( + connection.createStatement("INSERT INTO test VALUES (1, 'test1')").execute(), + connection.commitTransaction(), + connection.beginTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isTrue()) + .then(Mono.from(connection.rollbackTransaction())) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + .doOnNext(count -> assertThat(count).isEqualTo(1L))) + ); + + } + @Test void batchCrud() { // TODO: spilt it to multiple test cases and move it to BatchIntegrationTest