From 0ae8c774106b58f5dbbfa045d4a8591fb913c929 Mon Sep 17 00:00:00 2001 From: wangxiaolei Date: Tue, 31 May 2022 10:52:33 +0800 Subject: [PATCH] feat: transaction metric (#116) * feat: transaction metric * fix: fix golint issue * chore: adjust mr comment * feat: add timeout metric --- cmd/cmd.go | 1 - go.mod | 5 +- go.sum | 23 ++++++++++ pkg/dt/distributed_transaction_manger.go | 58 +++++++++++++++++++++--- pkg/dt/metrics/transaction_metric.go | 55 ++++++++++++++++++++++ pkg/dt/storage/etcd/etcd.go | 2 +- pkg/executor/read_write_splitting.go | 4 ++ pkg/executor/sharding.go | 1 + pkg/executor/single_db.go | 4 ++ pkg/filter/dt/filter_mysql.go | 2 +- pkg/filter/dt/transaction.go | 3 +- 11 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 pkg/dt/metrics/transaction_metric.go diff --git a/cmd/cmd.go b/cmd/cmd.go index c9aae8e..4b31c02 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -126,7 +126,6 @@ var ( } dbpack := server.NewServer() - for _, listenerConf := range conf.Listeners { switch listenerConf.ProtocolType { case config.Mysql: diff --git a/go.mod b/go.mod index 2554032..79ba40d 100644 --- a/go.mod +++ b/go.mod @@ -49,9 +49,12 @@ require ( github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mgechev/revive v1.2.1 // indirect github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect - golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/tools v0.1.10 // indirect google.golang.org/protobuf v1.27.1 ) diff --git a/go.sum b/go.sum index 11015e3..b000fd5 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 h1:tFXjAxje9thrTF4h57Ckik+scJjTWdwAtZqZPtOT48M= +github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4/go.mod h1:W8EnPSQ8Nv4fUjc/v1/8tHFqhuOJXnRub0dTfuAQktU= github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA= github.com/cheynewallace/tabby v1.1.1/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -225,7 +227,10 @@ github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= @@ -560,6 +565,9 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -567,10 +575,14 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -580,12 +592,17 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ= github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= +github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 h1:zpIH83+oKzcpryru8ceC6BxnoG8TBrhgAvRg8obzup0= +github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= github.com/mgechev/revive v1.0.2/go.mod h1:rb0dQy1LVAxW9SWy5R3LPUjevzUbUS316U5MFySA2lo= +github.com/mgechev/revive v1.2.1 h1:GjFml7ZsoR0IrQ2E2YIvWFNS5GPDV7xNwvA5GM1HZC4= +github.com/mgechev/revive v1.2.1/go.mod h1:+Ro3wqY4vakcYNtkBWdZC7dBg1xSB6sp054wWwmeFm0= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= @@ -623,6 +640,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -748,6 +767,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qq github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -1171,12 +1191,15 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 h1:MpIuURY70f0iKp/oooEFtB2oENcHITo/z1b6u41pKCw= golang.org/x/sys v0.0.0-20220519141025-dcacdad47464/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/dt/distributed_transaction_manger.go b/pkg/dt/distributed_transaction_manger.go index aabfedc..e490a0e 100644 --- a/pkg/dt/distributed_transaction_manger.go +++ b/pkg/dt/distributed_transaction_manger.go @@ -29,6 +29,7 @@ import ( "github.com/cectc/dbpack/pkg/config" "github.com/cectc/dbpack/pkg/dt/api" + "github.com/cectc/dbpack/pkg/dt/metrics" "github.com/cectc/dbpack/pkg/dt/storage" "github.com/cectc/dbpack/pkg/log" "github.com/cectc/dbpack/pkg/misc" @@ -57,6 +58,7 @@ func InitDistributedTransactionManager(conf *config.DistributedTransaction, stor if conf.RetryDeadThreshold == 0 { conf.RetryDeadThreshold = DefaultRetryDeadThreshold } + manager = &DistributedTransactionManager{ applicationID: conf.ApplicationID, storageDriver: storageDriver, @@ -110,6 +112,7 @@ func (manager *DistributedTransactionManager) Begin(ctx context.Context, transac if err := manager.storageDriver.AddGlobalSession(ctx, gt); err != nil { return "", err } + metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, metrics.TransactionStatusActive).Inc() manager.globalSessionQueue.AddAfter(gt, time.Duration(timeout)*time.Millisecond) log.Infof("successfully begin global transaction xid = {%s}", gt.XID) return xid, nil @@ -144,6 +147,7 @@ func (manager *DistributedTransactionManager) BranchRegister(ctx context.Context if err := manager.storageDriver.AddBranchSession(ctx, bs); err != nil { return "", 0, err } + metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, in.ResourceID, metrics.TransactionStatusActive).Inc() return branchID, branchSessionID, nil } @@ -220,14 +224,15 @@ func (manager *DistributedTransactionManager) _branchRollback(bs *api.BranchSess } func (manager *DistributedTransactionManager) processGlobalSessions() error { - globalSessions, err := manager.storageDriver.ListGlobalSession(context.Background(), manager.applicationID) + ctx := context.Background() + globalSessions, err := manager.storageDriver.ListGlobalSession(ctx, manager.applicationID) if err != nil { return err } for _, gs := range globalSessions { if gs.Status == api.Begin { if isGlobalSessionTimeout(gs) { - if _, err := manager.Rollback(context.Background(), gs.XID); err != nil { + if _, err := manager.Rollback(ctx, gs.XID); err != nil { return err } } @@ -244,11 +249,21 @@ func (manager *DistributedTransactionManager) processGlobalSessions() error { if err != nil { return err } + // branch session has been committed or rollbacked if len(bsKeys) == 0 { if err := manager.storageDriver.DeleteGlobalSession(context.Background(), gs.XID); err != nil { return err } log.Debugf("global session finished, key: %s", gs.XID) + switch gs.Status { + case api.Committing: + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusCommitted) + case api.Rollbacking: + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked) + } + } else { + // global transaction timeout + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusTimeout) } } } @@ -283,7 +298,7 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte } if newGlobalSession.Status == api.Begin { if isGlobalSessionTimeout(newGlobalSession) { - _, err := manager.Rollback(context.Background(), newGlobalSession.XID) + _, err = manager.Rollback(context.Background(), newGlobalSession.XID) if err != nil { log.Error(err) } @@ -299,6 +314,15 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte log.Error(err) } log.Debugf("global session finished, key: %s", newGlobalSession.XID) + switch newGlobalSession.Status { + case api.Committing: + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusCommitted) + case api.Rollbacking: + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked) + } + } else { + // global transaction timeout. + manager.recordGlobalTransactionMetric(gs.TransactionName, metrics.TransactionStatusRollbacked) } } return true @@ -320,6 +344,7 @@ func (manager *DistributedTransactionManager) processBranchSessions() error { manager.branchSessionQueue.Add(bs) case api.PhaseTwoRollbacking: if manager.IsRollingBackDead(bs) { + metrics.BranchTransactionCounter.WithLabelValues(bs.ApplicationID, bs.ResourceID, metrics.TransactionStatusTimeout) log.Debugf("branch session rollback dead, key: %s, lock key: %s", bs.BranchID, bs.LockKey) if manager.rollbackRetryTimeoutUnlockEnable { log.Debugf("branch id: %d, lock key: %s released", bs.BranchID, bs.LockKey) @@ -356,8 +381,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte defer manager.branchSessionQueue.Done(obj) bs := obj.(*api.BranchSession) + var ( + status api.BranchSession_BranchStatus + transactionStatus string + err error + ) if bs.Status == api.PhaseTwoCommitting { - status, err := manager.branchCommit(bs) + transactionStatus = metrics.TransactionStatusCommitted + status, err = manager.branchCommit(bs) if err != nil { log.Error(err) manager.branchSessionQueue.Add(obj) @@ -367,14 +398,16 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte } } if bs.Status == api.PhaseTwoRollbacking { + transactionStatus = metrics.TransactionStatusRollbacked if manager.IsRollingBackDead(bs) { + metrics.BranchTransactionCounter.WithLabelValues(bs.ApplicationID, bs.ResourceID, metrics.TransactionStatusTimeout) if manager.rollbackRetryTimeoutUnlockEnable { - if _, err := manager.storageDriver.ReleaseLockKeys(context.Background(), bs.ResourceID, []string{bs.LockKey}); err != nil { + if _, err := manager.storageDriver.ReleaseLockKeys(ctx, bs.ResourceID, []string{bs.LockKey}); err != nil { log.Error(err) } } } else { - status, err := manager.branchRollback(bs) + status, err = manager.branchRollback(bs) if err != nil { log.Error(err) manager.branchSessionQueue.Add(obj) @@ -384,6 +417,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte } } } + + if status == api.Complete { + metrics.BranchTransactionTimer.WithLabelValues(manager.applicationID, bs.ResourceID, transactionStatus).Observe( + float64(int64(misc.CurrentTimeMillis()) - bs.BeginTime)) + metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, metrics.TransactionStatusActive).Desc() + metrics.BranchTransactionCounter.WithLabelValues(manager.applicationID, bs.ResourceID, transactionStatus).Inc() + } + return true } @@ -395,6 +436,11 @@ func (manager *DistributedTransactionManager) watchBranchSession() { } } +func (manager *DistributedTransactionManager) recordGlobalTransactionMetric(transactionName string, transactionStatus string) { + metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, metrics.TransactionStatusActive).Desc() + metrics.GlobalTransactionCounter.WithLabelValues(manager.applicationID, transactionName, transactionStatus).Inc() +} + func isGlobalSessionTimeout(gs *api.GlobalSession) bool { return (misc.CurrentTimeMillis() - uint64(gs.BeginTime)) > uint64(gs.Timeout) } diff --git a/pkg/dt/metrics/transaction_metric.go b/pkg/dt/metrics/transaction_metric.go new file mode 100644 index 0000000..569d648 --- /dev/null +++ b/pkg/dt/metrics/transaction_metric.go @@ -0,0 +1,55 @@ +/* + * Copyright 2022 CECTC, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const ( + TransactionStatusActive = "active" + TransactionStatusCommitted = "committed" + TransactionStatusRollbacked = "rollbacked" + TransactionStatusTimeout = "timeout" +) + +var ( + GlobalTransactionCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "dbpack", + Subsystem: "global_transaction", + Name: "count", + Help: "global transaction count", + }, []string{"appid", "transactionname", "status"}) + + BranchTransactionCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "dbpack", + Subsystem: "branch_transaction", + Name: "count", + Help: "branch transaction count", + }, []string{"appid", "resourceid", "status"}) + + BranchTransactionTimer = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "dbpack", + Subsystem: "branch_transaction", + Name: "timer", + Help: "global transaction timer", + }, []string{"appid", "resourceid", "status"}) +) + +func init() { + prometheus.MustRegister(GlobalTransactionCounter) + prometheus.MustRegister(BranchTransactionCounter) + prometheus.MustRegister(BranchTransactionTimer) +} diff --git a/pkg/dt/storage/etcd/etcd.go b/pkg/dt/storage/etcd/etcd.go index d50de34..cf020c7 100644 --- a/pkg/dt/storage/etcd/etcd.go +++ b/pkg/dt/storage/etcd/etcd.go @@ -111,7 +111,7 @@ func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchS } txn = txn.If(cmpSlice...) - var ops []clientv3.Op + ops := make([]clientv3.Op, 0, 2*len(rowKeys)) for _, rowKey := range rowKeys { lockKey := fmt.Sprintf("lk/%s/%s", branchSession.XID, rowKey) ops = append(ops, clientv3.OpPut(lockKey, rowKey)) diff --git a/pkg/executor/read_write_splitting.go b/pkg/executor/read_write_splitting.go index 85caab1..3603746 100644 --- a/pkg/executor/read_write_splitting.go +++ b/pkg/executor/read_write_splitting.go @@ -159,6 +159,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context case *ast.SetStmt: if shouldStartTransaction(stmt) { db = executor.masters.Next(proto.WithMaster(ctx)).(*DataSourceBrief) + // TODO add metrics tx, result, err = db.DB.Begin(ctx) if err != nil { return nil, 0, err @@ -186,6 +187,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context } case *ast.BeginStmt: db = executor.masters.Next(proto.WithMaster(ctx)).(*DataSourceBrief) + // TODO add metrics tx, result, err = db.DB.Begin(ctx) if err != nil { return nil, 0, err @@ -199,6 +201,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context } defer executor.localTransactionMap.Delete(connectionID) tx = txi.(proto.Tx) + // TODO add metrics if result, err = tx.Commit(ctx); err != nil { return nil, 0, err } @@ -210,6 +213,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context } defer executor.localTransactionMap.Delete(connectionID) tx = txi.(proto.Tx) + // TODO add metrics if result, err = tx.Rollback(ctx); err != nil { return nil, 0, err } diff --git a/pkg/executor/sharding.go b/pkg/executor/sharding.go index 2e80301..91d15bb 100644 --- a/pkg/executor/sharding.go +++ b/pkg/executor/sharding.go @@ -259,6 +259,7 @@ func (executor *ShardingExecutor) ConnectionClose(ctx context.Context) { if !ok { return } + // TODO add metrics if _, err := tx.Rollback(ctx); err != nil { log.Error(err) } diff --git a/pkg/executor/single_db.go b/pkg/executor/single_db.go index 30d5e32..31ae598 100644 --- a/pkg/executor/single_db.go +++ b/pkg/executor/single_db.go @@ -138,6 +138,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri switch stmt := queryStmt.(type) { case *ast.SetStmt: if shouldStartTransaction(stmt) { + // TODO add metrics tx, result, err = db.Begin(ctx) if err != nil { return nil, 0, err @@ -153,6 +154,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri return db.Query(ctx, sql) } case *ast.BeginStmt: + // TODO add metrics tx, result, err = db.Begin(ctx) if err != nil { return nil, 0, err @@ -166,6 +168,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri } defer executor.localTransactionMap.Delete(connectionID) tx = txi.(proto.Tx) + // TODO add metrics if result, err = tx.Commit(ctx); err != nil { return nil, 0, err } @@ -177,6 +180,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri } defer executor.localTransactionMap.Delete(connectionID) tx = txi.(proto.Tx) + // TODO add metrics if result, err = tx.Rollback(ctx); err != nil { return nil, 0, err } diff --git a/pkg/filter/dt/filter_mysql.go b/pkg/filter/dt/filter_mysql.go index 9a7e44d..25edf97 100644 --- a/pkg/filter/dt/filter_mysql.go +++ b/pkg/filter/dt/filter_mysql.go @@ -193,7 +193,7 @@ func (f *_mysqlFilter) registerBranchTransaction(ctx context.Context, xid, resou ApplicationData: nil, } for retryCount := 0; retryCount < f.lockRetryTimes; retryCount++ { - _, branchID, err = dt.GetDistributedTransactionManager().BranchRegister(context.Background(), br) + _, branchID, err = dt.GetDistributedTransactionManager().BranchRegister(ctx, br) if err == nil { break } diff --git a/pkg/filter/dt/transaction.go b/pkg/filter/dt/transaction.go index 9ac23ef..c338bcf 100644 --- a/pkg/filter/dt/transaction.go +++ b/pkg/filter/dt/transaction.go @@ -47,6 +47,7 @@ func (f *_httpFilter) handleHttp1GlobalBegin(ctx *fasthttp.RequestCtx, transacti func (f *_httpFilter) handleHttp1GlobalEnd(ctx *fasthttp.RequestCtx) error { xidParam := ctx.UserValue(XID) xid := xidParam.(string) + if ctx.Response.StatusCode() == http.StatusOK { err := f.globalCommit(ctx, xid) if err != nil { @@ -130,7 +131,6 @@ func (f *_httpFilter) globalCommit(ctx context.Context, xid string) error { transactionManager := dt.GetDistributedTransactionManager() status, err = transactionManager.Commit(ctx, xid) - log.Infof("[%s] commit status: %s", xid, status.String()) return err } @@ -143,7 +143,6 @@ func (f *_httpFilter) globalRollback(ctx context.Context, xid string) error { transactionManager := dt.GetDistributedTransactionManager() status, err = transactionManager.Rollback(ctx, xid) - log.Infof("[%s] rollback status: %s", xid, status.String()) return err }