Skip to content

Commit

Permalink
feat: transaction metric (#116)
Browse files Browse the repository at this point in the history
* feat: transaction metric

* fix: fix golint issue

* chore: adjust mr comment

* feat: add timeout metric
  • Loading branch information
fatelei authored May 31, 2022
1 parent bf3b084 commit 0ae8c77
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 12 deletions.
1 change: 0 additions & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ var (
}

dbpack := server.NewServer()

for _, listenerConf := range conf.Listeners {
switch listenerConf.ProtocolType {
case config.Mysql:
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ require (
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mgechev/revive v1.2.1 // indirect
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/tools v0.1.10 // indirect
google.golang.org/protobuf v1.27.1
)
23 changes: 23 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 h1:tFXjAxje9thrTF4h57Ckik+scJjTWdwAtZqZPtOT48M=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4/go.mod h1:W8EnPSQ8Nv4fUjc/v1/8tHFqhuOJXnRub0dTfuAQktU=
github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA=
github.com/cheynewallace/tabby v1.1.1/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -225,7 +227,10 @@ github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
Expand Down Expand Up @@ -560,17 +565,24 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
Expand All @@ -580,12 +592,17 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg=
github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 h1:zpIH83+oKzcpryru8ceC6BxnoG8TBrhgAvRg8obzup0=
github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg=
github.com/mgechev/revive v1.0.2/go.mod h1:rb0dQy1LVAxW9SWy5R3LPUjevzUbUS316U5MFySA2lo=
github.com/mgechev/revive v1.2.1 h1:GjFml7ZsoR0IrQ2E2YIvWFNS5GPDV7xNwvA5GM1HZC4=
github.com/mgechev/revive v1.2.1/go.mod h1:+Ro3wqY4vakcYNtkBWdZC7dBg1xSB6sp054wWwmeFm0=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
Expand Down Expand Up @@ -623,6 +640,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -748,6 +767,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qq
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -1171,12 +1191,15 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 h1:MpIuURY70f0iKp/oooEFtB2oENcHITo/z1b6u41pKCw=
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
58 changes: 52 additions & 6 deletions pkg/dt/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cectc/dbpack/pkg/config"
"github.com/cectc/dbpack/pkg/dt/api"
"github.com/cectc/dbpack/pkg/dt/metrics"
"github.com/cectc/dbpack/pkg/dt/storage"
"github.com/cectc/dbpack/pkg/log"
"github.com/cectc/dbpack/pkg/misc"
Expand Down Expand Up @@ -57,6 +58,7 @@ func InitDistributedTransactionManager(conf *config.DistributedTransaction, stor
if conf.RetryDeadThreshold == 0 {
conf.RetryDeadThreshold = DefaultRetryDeadThreshold
}

manager = &DistributedTransactionManager{
applicationID: conf.ApplicationID,
storageDriver: storageDriver,
Expand Down Expand Up @@ -110,6 +112,7 @@ func (manager *DistributedTransactionManager) Begin(ctx context.Context, transac
if err := manager.storageDriver.AddGlobalSession(ctx, gt); err != nil {
return "", err
}
metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, metrics.TransactionStatusActive).Inc()
manager.globalSessionQueue.AddAfter(gt, time.Duration(timeout)*time.Millisecond)
log.Infof("successfully begin global transaction xid = {%s}", gt.XID)
return xid, nil
Expand Down Expand Up @@ -144,6 +147,7 @@ func (manager *DistributedTransactionManager) BranchRegister(ctx context.Context
if err := manager.storageDriver.AddBranchSession(ctx, bs); err != nil {
return "", 0, err
}
metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, in.ResourceID, metrics.TransactionStatusActive).Inc()
return branchID, branchSessionID, nil
}

Expand Down Expand Up @@ -220,14 +224,15 @@ func (manager *DistributedTransactionManager) _branchRollback(bs *api.BranchSess
}

func (manager *DistributedTransactionManager) processGlobalSessions() error {
globalSessions, err := manager.storageDriver.ListGlobalSession(context.Background(), manager.applicationID)
ctx := context.Background()
globalSessions, err := manager.storageDriver.ListGlobalSession(ctx, manager.applicationID)
if err != nil {
return err
}
for _, gs := range globalSessions {
if gs.Status == api.Begin {
if isGlobalSessionTimeout(gs) {
if _, err := manager.Rollback(context.Background(), gs.XID); err != nil {
if _, err := manager.Rollback(ctx, gs.XID); err != nil {
return err
}
}
Expand All @@ -244,11 +249,21 @@ func (manager *DistributedTransactionManager) processGlobalSessions() error {
if err != nil {
return err
}
// branch session has been committed or rollbacked
if len(bsKeys) == 0 {
if err := manager.storageDriver.DeleteGlobalSession(context.Background(), gs.XID); err != nil {
return err
}
log.Debugf("global session finished, key: %s", gs.XID)
switch gs.Status {
case api.Committing:
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusCommitted)
case api.Rollbacking:
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked)
}
} else {
// global transaction timeout
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusTimeout)
}
}
}
Expand Down Expand Up @@ -283,7 +298,7 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte
}
if newGlobalSession.Status == api.Begin {
if isGlobalSessionTimeout(newGlobalSession) {
_, err := manager.Rollback(context.Background(), newGlobalSession.XID)
_, err = manager.Rollback(context.Background(), newGlobalSession.XID)
if err != nil {
log.Error(err)
}
Expand All @@ -299,6 +314,15 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte
log.Error(err)
}
log.Debugf("global session finished, key: %s", newGlobalSession.XID)
switch newGlobalSession.Status {
case api.Committing:
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusCommitted)
case api.Rollbacking:
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked)
}
} else {
// global transaction timeout.
manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked)
}
}
return true
Expand All @@ -320,6 +344,7 @@ func (manager *DistributedTransactionManager) processBranchSessions() error {
manager.branchSessionQueue.Add(bs)
case api.PhaseTwoRollbacking:
if manager.IsRollingBackDead(bs) {
metrics.BranchTransactionCounter.WithLabelValues(bs.ApplicationID, bs.ResourceID, metrics.TransactionStatusTimeout)
log.Debugf("branch session rollback dead, key: %s, lock key: %s", bs.BranchID, bs.LockKey)
if manager.rollbackRetryTimeoutUnlockEnable {
log.Debugf("branch id: %d, lock key: %s released", bs.BranchID, bs.LockKey)
Expand Down Expand Up @@ -356,8 +381,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
defer manager.branchSessionQueue.Done(obj)

bs := obj.(*api.BranchSession)
var (
status api.BranchSession_BranchStatus
transactionStatus string
err error
)
if bs.Status == api.PhaseTwoCommitting {
status, err := manager.branchCommit(bs)
transactionStatus = metrics.TransactionStatusCommitted
status, err = manager.branchCommit(bs)
if err != nil {
log.Error(err)
manager.branchSessionQueue.Add(obj)
Expand All @@ -367,14 +398,16 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
}
}
if bs.Status == api.PhaseTwoRollbacking {
transactionStatus = metrics.TransactionStatusRollbacked
if manager.IsRollingBackDead(bs) {
metrics.BranchTransactionCounter.WithLabelValues(bs.ApplicationID, bs.ResourceID, metrics.TransactionStatusTimeout)
if manager.rollbackRetryTimeoutUnlockEnable {
if _, err := manager.storageDriver.ReleaseLockKeys(context.Background(), bs.ResourceID, []string{bs.LockKey}); err != nil {
if _, err := manager.storageDriver.ReleaseLockKeys(ctx, bs.ResourceID, []string{bs.LockKey}); err != nil {
log.Error(err)
}
}
} else {
status, err := manager.branchRollback(bs)
status, err = manager.branchRollback(bs)
if err != nil {
log.Error(err)
manager.branchSessionQueue.Add(obj)
Expand All @@ -384,6 +417,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
}
}
}

if status == api.Complete {
metrics.BranchTransactionTimer.WithLabelValues(manager.applicationID, bs.ResourceID, transactionStatus).Observe(
float64(int64(misc.CurrentTimeMillis()) - bs.BeginTime))
metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, metrics.TransactionStatusActive).Desc()
metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, transactionStatus).Inc()
}

return true
}

Expand All @@ -395,6 +436,11 @@ func (manager *DistributedTransactionManager) watchBranchSession() {
}
}

func (manager *DistributedTransactionManager) recordGlobalTransactionMetric(transactionName string, transactionStatus string) {
metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, metrics.TransactionStatusActive).Desc()
metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, transactionStatus).Inc()
}

func isGlobalSessionTimeout(gs *api.GlobalSession) bool {
return (misc.CurrentTimeMillis() - uint64(gs.BeginTime)) > uint64(gs.Timeout)
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/dt/metrics/transaction_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022 CECTC, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import "github.com/prometheus/client_golang/prometheus"

const (
TransactionStatusActive = "active"
TransactionStatusCommitted = "committed"
TransactionStatusRollbacked = "rollbacked"
TransactionStatusTimeout = "timeout"
)

var (
GlobalTransactionCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "dbpack",
Subsystem: "global_transaction",
Name: "count",
Help: "global transaction count",
}, []string{"appid", "transactionname", "status"})

BranchTransactionCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "dbpack",
Subsystem: "branch_transaction",
Name: "count",
Help: "branch transaction count",
}, []string{"appid", "resourceid", "status"})

BranchTransactionTimer = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "dbpack",
Subsystem: "branch_transaction",
Name: "timer",
Help: "global transaction timer",
}, []string{"appid", "resourceid", "status"})
)

func init() {
prometheus.MustRegister(GlobalTransactionCounter)
prometheus.MustRegister(BranchTransactionCounter)
prometheus.MustRegister(BranchTransactionTimer)
}
2 changes: 1 addition & 1 deletion pkg/dt/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchS
}
txn = txn.If(cmpSlice...)

var ops []clientv3.Op
ops := make([]clientv3.Op, 0, 2*len(rowKeys))
for _, rowKey := range rowKeys {
lockKey := fmt.Sprintf("lk/%s/%s", branchSession.XID, rowKey)
ops = append(ops, clientv3.OpPut(lockKey, rowKey))
Expand Down
Loading

0 comments on commit 0ae8c77

Please sign in to comment.