diff --git a/.github/workflows/build-timestamped-master.yml b/.github/workflows/build-timestamped-master.yml index 1ee496b..3726589 100644 --- a/.github/workflows/build-timestamped-master.yml +++ b/.github/workflows/build-timestamped-master.yml @@ -39,7 +39,7 @@ jobs: - name: Generate Codecov Report uses: codecov/codecov-action@v2 - name: Upload Artifact - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: ballerinai-transaction path: target/ballerinai-transaction/ diff --git a/.github/workflows/build-with-bal-test-graalvm.yml b/.github/workflows/build-with-bal-test-graalvm.yml index 18da987..4a318fa 100644 --- a/.github/workflows/build-with-bal-test-graalvm.yml +++ b/.github/workflows/build-with-bal-test-graalvm.yml @@ -6,7 +6,7 @@ on: lang_tag: description: Branch/Release Tag of the Ballerina Lang required: true - default: master + default: transaction-recovery lang_version: description: Ballerina Lang Version (If given ballerina lang buid will be skipped) required: false diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index c0f4cda..5308417 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -25,7 +25,7 @@ jobs: packagePAT: ${{ secrets.GITHUB_TOKEN }} run: ./gradlew build --scan --no-daemon - name: Archive Error Log - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 if: failure() with: name: Ballerina Internal Log diff --git a/gradle.properties b/gradle.properties index 21fdaf1..df28e4e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ puppycrawlCheckstyleVersion=10.12.0 group=org.ballerinalang version=1.10.1-SNAPSHOT -ballerinaLangVersion=2201.10.0 +ballerinaLangVersion=2201.11.0-20241011-144500-86ae40bf stdlibIoVersion=1.6.1 stdlibConstraintVersion=1.5.0 stdlibOsVersion=1.8.0 diff --git a/transaction-ballerina/2pc_transaction.bal b/transaction-ballerina/2pc_transaction.bal index 322026f..a0d5bb3 100644 --- a/transaction-ballerina/2pc_transaction.bal +++ b/transaction-ballerina/2pc_transaction.bal @@ -44,18 +44,28 @@ class TwoPhaseCommitTransaction { string|lang_trx:Error ret = ""; // Prepare local resource managers + writeToLog(transactionId, transactionBlockId, STATE_PREPARING); boolean localPrepareSuccessful = prepareResourceManagers(self.transactionId, self.transactionBlockId); if (!localPrepareSuccessful) { log:printInfo("Local prepare failed, aborting.."); + writeToLog(transactionId, transactionBlockId, STATE_ABORTING); var result = self.notifyParticipants(COMMAND_ABORT, ()); if (result is error) { + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); return "hazard"; } else { match result { - "committed" => { return "committed"; } - "aborted" => { return "aborted"; } + "committed" => { + writeToLog(transactionId, transactionBlockId, STATE_COMMITTED); + return "committed"; + } + "aborted" => { + writeToLog(transactionId, transactionBlockId, STATE_ABORTED); + return "aborted"; + } } } + writeToLog(transactionId, transactionBlockId, STATE_ABORTED); return "aborted"; } @@ -68,6 +78,7 @@ class TwoPhaseCommitTransaction { if (prepareDurablesDecision == PREPARE_DECISION_COMMIT) { // If all durable participants voted YES (PREPARED or READONLY), next call notify(commit) on all // (durable & volatile) participants and return committed to the initiator + writeToLog(transactionId, transactionBlockId, STATE_COMMITTING); var result = self.notifyParticipants(COMMAND_COMMIT, ()); if (result is error) { // return Hazard outcome if a participant cannot successfully end its branch of the transaction @@ -76,26 +87,33 @@ class TwoPhaseCommitTransaction { boolean localCommitSuccessful = commitResourceManagers(self.transactionId, self.transactionBlockId); if (!localCommitSuccessful) { // "Local commit failed" + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { + writeToLog(transactionId, transactionBlockId, STATE_COMMITTED); ret = OUTCOME_COMMITTED; } } } else { // If some durable participants voted NO, next call notify(abort) on all participants // and return aborted to the initiator + writeToLog(transactionId, transactionBlockId, STATE_ABORTING); var result = self.notifyParticipants(COMMAND_ABORT, ()); if (result is error) { // return Hazard outcome if a participant cannot successfully end its branch of the transaction + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { boolean localAbortSuccessful = abortResourceManagers(self.transactionId, self.transactionBlockId); if (!localAbortSuccessful) { + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { if (self.possibleMixedOutcome) { + writeToLog(transactionId, transactionBlockId, STATE_MIXED); ret = OUTCOME_MIXED; } else { + writeToLog(transactionId, transactionBlockId, STATE_ABORTED); ret = OUTCOME_ABORTED; } } @@ -104,18 +122,23 @@ class TwoPhaseCommitTransaction { } else { // If some volatile participants voted NO, next call notify(abort) on all volatile articipants // and return aborted to the initiator + writeToLog(transactionId, transactionBlockId, STATE_ABORTING); var result = self.notifyParticipants(COMMAND_ABORT, PROTOCOL_VOLATILE); if (result is error) { // return Hazard outcome if a participant cannot successfully end its branch of the transaction + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { boolean localAbortSuccessful = abortResourceManagers(self.transactionId, self.transactionBlockId); if (!localAbortSuccessful) { + writeToLog(transactionId, transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { if (self.possibleMixedOutcome) { + writeToLog(transactionId, transactionBlockId, STATE_MIXED); ret = OUTCOME_MIXED; } else { + writeToLog(transactionId, transactionBlockId, STATE_ABORTED); ret = OUTCOME_ABORTED; } } @@ -158,8 +181,8 @@ class TwoPhaseCommitTransaction { while (i < participantArr.length()) { var participant = participantArr[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var participant in self.participants { + //TODO: commenting due to a caching issue + //foreach var participant in self.participants { future<[(PrepareResult|error)?, Participant]> f = @strand{thread:"any"} start participant.prepare(protocol); results[results.length()] = f; } @@ -182,7 +205,7 @@ class TwoPhaseCommitTransaction { string participantId = participant.participantId; if (result is PrepareResult) { if (result == PREPARE_RESULT_PREPARED) { - // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything + // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything } else if (result == PREPARE_RESULT_COMMITTED) { // If one or more participants returns "committed" and the overall prepare fails, we have to // report a mixed-outcome to the initiator @@ -192,14 +215,14 @@ class TwoPhaseCommitTransaction { self.removeParticipant(participantId, "Could not remove committed participant: " + participantId + " from transaction: " + self.transactionId); - // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything + // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything } else if (result == PREPARE_RESULT_READ_ONLY) { // Don't send notify to this participant because it is read-only. // We can forget about this participant. self.removeParticipant(participantId, "Could not remove read-only participant: " + participantId + " from transaction: " + self.transactionId); - // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything + // All set for a PREPARE_DECISION_COMMIT so we can proceed without doing anything } else if (result == PREPARE_RESULT_ABORTED) { // Remove the participant who sent the abort since we don't want to do a notify(Abort) to that // participant @@ -226,8 +249,8 @@ class TwoPhaseCommitTransaction { while (i < participantArr.length()) { var participant = participantArr[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var participant in self.participants { + //TODO: commenting due to a caching issue + //foreach var participant in self.participants { future<(NotifyResult|error)?> f = @strand{thread:"any"} start participant.notify(action, protocolName); results[results.length()] = f; } @@ -235,8 +258,8 @@ class TwoPhaseCommitTransaction { while (j < results.length()) { var r = results[j]; j += 1; - //TODO: commenting due to a caching issue - //foreach var r in results { + //TODO: commenting due to a caching issue + //foreach var r in results { future<(NotifyResult|error)?> f; if (r is future<(NotifyResult|error)?>) { f = r; @@ -256,19 +279,24 @@ class TwoPhaseCommitTransaction { function abortInitiatorTransaction() returns string|lang_trx:Error { log:printInfo("Aborting initiated transaction: " + self.transactionId + ":" + self.transactionBlockId); string|lang_trx:Error ret = ""; + writeToLog(self.transactionId, self.transactionBlockId, STATE_ABORTING); // return response to the initiator. ( Aborted | Mixed ) var result = self.notifyParticipants(COMMAND_ABORT, ()); if (result is error) { // return Hazard outcome if a participant cannot successfully end its branch of the transaction + writeToLog(self.transactionId, self.transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { boolean localAbortSuccessful = abortResourceManagers(self.transactionId, self.transactionBlockId); if (!localAbortSuccessful) { + writeToLog(self.transactionId, self.transactionBlockId, STATE_HAZARD); ret = prepareError(OUTCOME_HAZARD); } else { if (self.possibleMixedOutcome) { + writeToLog(self.transactionId, self.transactionBlockId, STATE_MIXED); ret = OUTCOME_MIXED; } else { + writeToLog(self.transactionId, self.transactionBlockId, STATE_ABORTED); ret = OUTCOME_ABORTED; } } diff --git a/transaction-ballerina/Ballerina.toml b/transaction-ballerina/Ballerina.toml index 0179678..c9a9737 100644 --- a/transaction-ballerina/Ballerina.toml +++ b/transaction-ballerina/Ballerina.toml @@ -9,6 +9,6 @@ graalvmCompatible = true [[platform.java17.dependency]] artifactId = "transaction" -version = "1.10.0" -path = "../transaction-native/build/libs/transaction-native-1.10.0.jar" +version = "1.11.0-SNAPSHOT" +path = "../transaction-native/build/libs/transaction-native-1.11.0-SNAPSHOT.jar" groupId = "ballerina" diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index 3f16aa3..3afa15d 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-SNAPSHOT" [[package]] org = "ballerina" @@ -64,7 +64,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.12.0" +version = "2.12.1" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, @@ -315,7 +315,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.4.0" +version = "2.5.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] diff --git a/transaction-ballerina/build.gradle b/transaction-ballerina/build.gradle index 51b5e3e..dbd8834 100644 --- a/transaction-ballerina/build.gradle +++ b/transaction-ballerina/build.gradle @@ -89,11 +89,16 @@ publishing { } } +task cleanUpLogsBeforeTest { + delete "$project.projectDir/target/transactionLogs" +} + updateTomlFiles.dependsOn copyStdlibs build.dependsOn ":transaction-native:build" build.dependsOn ":transaction-ballerina:generatePomFileForMavenPublication" test.dependsOn ":transaction-native:build" +test.dependsOn ":transaction-ballerina:cleanUpLogsBeforeTest" publishToMavenLocal.dependsOn build publish.dependsOn build diff --git a/transaction-ballerina/constants.bal b/transaction-ballerina/constants.bal index 7a9ff7b..7ab5c89 100644 --- a/transaction-ballerina/constants.bal +++ b/transaction-ballerina/constants.bal @@ -46,5 +46,15 @@ const string OUTCOME_HAZARD = "hazard"; const string TRANSACTION_UNKNOWN = "Transaction-Unknown"; +// Recovery Log States (Write-Ahead Logging) +const string STATE_PREPARING = "PREPARING"; +const string STATE_COMMITTING = "COMMITTING"; +const string STATE_ABORTING = "ABORTING"; +const string STATE_COMMITTED = "COMMITTED"; +const string STATE_ABORTED = "ABORTED"; +const string STATE_HAZARD = "HAZARD"; +const string STATE_MIXED = "MIXED"; +const string STATE_TERMINATED = "TERMINATED"; + configurable string coordinatorHost = getHostAddress(); configurable int coordinatorPort = getAvailablePort(); diff --git a/transaction-ballerina/internal.bal b/transaction-ballerina/internal.bal index 6118a0a..fa80cd6 100644 --- a/transaction-ballerina/internal.bal +++ b/transaction-ballerina/internal.bal @@ -225,3 +225,13 @@ function getTransactionCleanupTimeout() returns int = @java:Method { 'class: "org.ballerinalang.stdlib.transaction.Utils", name: "getTransactionCleanupTimeout" } external; + +function writeToLog(string trxId, string transactionBlockId, string transactionStatus) = @java:Method { + 'class: "org.ballerinalang.stdlib.transaction.TransactionRecovery", + name: "writeToLog" +} external; + +function startupCrashRecovery() = @java:Method { + 'class: "org.ballerinalang.stdlib.transaction.TransactionRecovery", + name: "startupCrashRecovery" +} external; diff --git a/transaction-ballerina/tests/Config.toml b/transaction-ballerina/tests/Config.toml index e69de29..b73ae57 100644 --- a/transaction-ballerina/tests/Config.toml +++ b/transaction-ballerina/tests/Config.toml @@ -0,0 +1,4 @@ +[ballerina.lang.transaction] +recoveryLogDir="./target/transactionLogs" +recoveryLogName="testLogs" +checkpointInterval=-1 diff --git a/transaction-native/src/main/java/org/ballerinalang/stdlib/transaction/TransactionRecovery.java b/transaction-native/src/main/java/org/ballerinalang/stdlib/transaction/TransactionRecovery.java new file mode 100644 index 0000000..26c710e --- /dev/null +++ b/transaction-native/src/main/java/org/ballerinalang/stdlib/transaction/TransactionRecovery.java @@ -0,0 +1,47 @@ +// Copyright (c) 2024 WSO2 Inc. (http://www.wso2.org) +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.ballerinalang.stdlib.transaction; + +import io.ballerina.runtime.api.values.BString; +import io.ballerina.runtime.transactions.RecoveryState; +import io.ballerina.runtime.transactions.TransactionLogRecord; +import io.ballerina.runtime.transactions.TransactionResourceManager; + +public class TransactionRecovery { + + public static void writeToLog(BString trxId, BString transactionBlockId, BString transactionStatus) { + if (TransactionResourceManager.getInstance().getTransactionManagerEnabled()) { + return; + } + TransactionLogRecord logRecord = createLogRecord(trxId, transactionBlockId, transactionStatus); + TransactionResourceManager.getInstance().getLogManager().put(logRecord); + } + + private static TransactionLogRecord createLogRecord(BString trxId, BString transactionBlockId, + BString transactionStatus) { + return new TransactionLogRecord(trxId.getValue(), transactionBlockId.getValue(), + RecoveryState.valueOf(transactionStatus.getValue())); + } + + public static void startupCrashRecovery() { + if (TransactionResourceManager.getInstance().getTransactionManagerEnabled()) { + // if atomikos tm is used, we skip startup crash recovery. + return; + } + TransactionResourceManager.getInstance().startupCrashRecovery(); + } +}