From f44427f617d99766c055c312fae1093747b316f6 Mon Sep 17 00:00:00 2001 From: jchrys Date: Fri, 22 Dec 2023 00:25:04 +0900 Subject: [PATCH 1/2] Enhance Transactional Methods for Improved Request Queue Handling Motivation: The current Transaction Methods implementation lacks consideration for the request queue, resulting in potential undefined behavior and issues. Modifications: Transaction Methods properly respect the request queue. Result: Bug resolved. Enhanced stability and reliability. --- .../asyncer/r2dbc/mysql/MySqlConnection.java | 16 ----- .../mysql/ConnectionIntegrationTest.java | 63 +++++++++++++++++++ 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 92ea0aa56..7adc00fd0 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -185,10 +185,6 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS @Override public Mono beginTransaction() { return Mono.defer(() -> { - if (isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.executeVoid(client, "BEGIN"); }); } @@ -196,10 +192,6 @@ public Mono beginTransaction() { @Override public Mono beginTransaction(TransactionDefinition definition) { return Mono.defer(() -> { - if (isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.beginTransaction(client, this, batchSupported, definition); }); } @@ -219,10 +211,6 @@ public Mono close() { @Override public Mono commitTransaction() { return Mono.defer(() -> { - if (!isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported); }); } @@ -300,10 +288,6 @@ public Mono releaseSavepoint(String name) { @Override public Mono rollbackTransaction() { return Mono.defer(() -> { - if (!isInTransaction()) { - return Mono.empty(); - } - return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported); }); } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index cb51f38ee..5063c29f3 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -292,6 +292,69 @@ void errorPropagteRequestQueue() { ); } + @Test + void commitTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(Flux.merge( + connection.beginTransaction(), + connection.createStatement("INSERT INTO test VALUES (1, 'test1')") + .execute(), + connection.commitTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> + Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + ) + .doOnNext(text -> assertThat(text).isEqualTo(1L)) + ); + } + + @Test + void rollbackTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(Flux.merge( + connection.beginTransaction(), + connection.createStatement("INSERT INTO test VALUES (1, 'test1')") + .execute(), + connection.rollbackTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + .doOnNext(count -> assertThat(count).isEqualTo(0L))) + ); + } + + @Test + void beginTransactionShouldRespectQueuedMessages() { + final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))"; + complete(connection -> + Mono.from(connection.createStatement(tdl).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .then(Mono.from(connection.beginTransaction())) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue()) + .thenMany(Flux.merge( + connection.createStatement("INSERT INTO test VALUES (1, 'test1')").execute(), + connection.commitTransaction(), + connection.beginTransaction() + )) + .doOnComplete(() -> assertThat(connection.isInTransaction()).isTrue()) + .then(Mono.from(connection.rollbackTransaction())) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse()) + .thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute()) + .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) + .doOnNext(count -> assertThat(count).isEqualTo(1L))) + ); + + } + @Test void batchCrud() { // TODO: spilt it to multiple test cases and move it to BatchIntegrationTest From ad5f792b117abf6144073ee02b7406e04d3f5e73 Mon Sep 17 00:00:00 2001 From: jchrys Date: Fri, 22 Dec 2023 13:49:52 +0900 Subject: [PATCH 2/2] Transaction Methods Respect Request Queue Motivation: The current Transaction Methods implementation lacks consideration for the request queue, resulting in potential undefined behavior and issues. Modifications: Transaction Methods properly respect the request queue. Result: Bug resolved. Enhanced stability and reliability. --- src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 7adc00fd0..f83c76c38 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -184,9 +184,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS @Override public Mono beginTransaction() { - return Mono.defer(() -> { - return QueryFlow.executeVoid(client, "BEGIN"); - }); + return beginTransaction(MySqlTransactionDefinition.empty()); } @Override