Skip to content

Commit

Permalink
Transaction States Should Be Checked In Queue (#185)
Browse files Browse the repository at this point in the history
Motivation:
Currently, `MySqlConnection` checks state between the `Mono.defer` is
subscribed and `Exchangeable` is executed. It may cause undefined
behavior.

Modification:
Checks transaction state when request queue executes task.

Result:
Resolves #183
  • Loading branch information
jchrys authored Jan 10, 2024
1 parent a605469 commit aa4400b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 102 deletions.
7 changes: 7 additions & 0 deletions src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ interface ConnectionState {
*/
void setIsolationLevel(IsolationLevel level);

/**
* Reutrns session lock wait timeout.
*
* @return Session lock wait timeout.
*/
long getSessionLockWaitTimeout();

/**
* Sets current lock wait timeout.
*
Expand Down
23 changes: 8 additions & 15 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public Mono<Void> close() {
@Override
public Mono<Void> commitTransaction() {
return Mono.defer(() -> {
return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported);
return QueryFlow.doneTransaction(client, this, true, batchSupported);
});
}

Expand All @@ -223,19 +223,7 @@ public MySqlBatch createBatch() {
@Override
public Mono<Void> createSavepoint(String name) {
requireValidName(name, "Savepoint name must not be empty and not contain backticks");

String sql = String.format("SAVEPOINT `%s`", name);

return Mono.defer(() -> {
if (isInTransaction()) {
return QueryFlow.executeVoid(client, sql);
} else if (batchSupported) {
// If connection does not in transaction, then starts transaction.
return QueryFlow.executeVoid(client, "BEGIN;" + sql);
}

return QueryFlow.executeVoid(client, "BEGIN", sql);
});
return QueryFlow.createSavepoint(client, this, name, batchSupported);
}

@Override
Expand Down Expand Up @@ -286,7 +274,7 @@ public Mono<Void> releaseSavepoint(String name) {
@Override
public Mono<Void> rollbackTransaction() {
return Mono.defer(() -> {
return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported);
return QueryFlow.doneTransaction(client, this, false, batchSupported);
});
}

Expand Down Expand Up @@ -371,6 +359,11 @@ public void setIsolationLevel(IsolationLevel level) {
this.currentLevel = level;
}

@Override
public long getSessionLockWaitTimeout() {
return lockWaitTimeout;
}

@Override
public void setCurrentLockWaitTimeout(long timeoutSeconds) {
this.currentLockWaitTimeout = timeoutSeconds;
Expand Down
Loading

0 comments on commit aa4400b

Please sign in to comment.