From 884f9943aaff978c206bfc90d0bc339fcb67e8d4 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Tue, 9 Jul 2024 11:44:19 +0800 Subject: [PATCH 1/8] Redis pubsub improvement --- api/go.mod | 3 +- api/go.sum | 22 +- coa/go.mod | 4 +- coa/go.sum | 12 +- .../v1alpha2/providers/pubsub/redis/redis.go | 251 +++++++++--------- .../providers/pubsub/redis/redis_test.go | 143 ++++++++-- .../providers/states/redisstate/redisstate.go | 22 +- go.work.sum | 2 + .../helm/symphony/files/symphony-api.json | 3 +- .../02.basic/manifest/oss/instance.yaml | 3 + 10 files changed, 284 insertions(+), 181 deletions(-) diff --git a/api/go.mod b/api/go.mod index 1569bb3c6..dc669fd4e 100644 --- a/api/go.mod +++ b/api/go.mod @@ -38,8 +38,8 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cheggaaa/pb/v3 v3.0.4 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/go-redis/redis/v7 v7.4.1 // indirect github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -56,6 +56,7 @@ require ( github.com/onsi/ginkgo/v2 v2.15.0 // indirect github.com/onsi/gomega v1.31.0 // indirect github.com/openzipkin/zipkin-go v0.4.1 // indirect + github.com/redis/go-redis/v9 v9.5.3 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.45.0 // indirect diff --git a/api/go.sum b/api/go.sum index b27ac1b62..b2e2d7a19 100644 --- a/api/go.sum +++ b/api/go.sum @@ -53,6 +53,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bshuster-repo/logrus-logstash-hook v1.0.0 h1:e+C0SB5R1pu//O4MQ3f9cFuPGoOVeF2fE4Og9otCc70= github.com/bshuster-repo/logrus-logstash-hook v1.0.0/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd h1:rFt+Y/IK1aEZkEHchZRSq9OQbsSzIT/OrI8YFFmRIng= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembjv71DPz3uX/V/6MMlSyD9JBQ6kQ= @@ -93,6 +97,8 @@ github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxG github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2 h1:aBfCb7iqHmDEIp6fBvC/hQUddQfg+3qdYjwzaiP9Hnc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2/go.mod h1:WHNsWjnIn2V1LYOrME7e8KxSeKunYHsxEm4am0BUtcI= github.com/docker/cli v24.0.6+incompatible h1:fF+XCQCgJjjQNIMjzaSmiKJSCcfcXb3TWTcc7GAneOY= @@ -156,8 +162,6 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= -github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= -github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -356,19 +360,13 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI= github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -411,6 +409,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rubenv/sql-migrate v1.5.2 h1:bMDqOnrJVV/6JQgQ/MxOpU+AdO8uzYYA/TxFUBzFtS0= @@ -530,7 +530,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -561,7 +560,6 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -588,7 +586,6 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= @@ -639,7 +636,6 @@ gopkg.in/VividCortex/ewma.v1 v1.1.1/go.mod h1:TekXuFipeiHWiAlO1+wSS23vTcyFau5u3r gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v2 v2.0.7/go.mod h1:0CiZ1p8pvtxBlQpLXkHuUTpdJ1shm3OqCF1QugkjHL4= @@ -650,11 +646,9 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/mattn/go-colorable.v0 v0.1.0/go.mod h1:BVJlBXzARQxdi3nZo6f6bnl5yR20/tOL6p+V0KejgSY= gopkg.in/mattn/go-isatty.v0 v0.0.4/go.mod h1:wt691ab7g0X4ilKZNmMII3egK0bTxl37fEn/Fwbd8gc= gopkg.in/mattn/go-runewidth.v0 v0.0.4/go.mod h1:BmXejnxvhwdaATwiJbB1vZ2dtXkQKZGu9yLFCZb4msQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/coa/go.mod b/coa/go.mod index 36fa6a8c2..89e71dd90 100644 --- a/coa/go.mod +++ b/coa/go.mod @@ -15,11 +15,11 @@ require ( github.com/eclipse-symphony/symphony/packages/mage v0.0.0-00010101000000-000000000000 github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/fasthttp/router v1.4.20 - github.com/go-redis/redis/v7 v7.4.1 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/google/uuid v1.6.0 github.com/microsoft/ApplicationInsights-Go v0.4.4 github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.5.3 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 github.com/valyala/fasthttp v1.50.0 @@ -47,8 +47,10 @@ require ( github.com/VividCortex/ewma v1.1.1 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cheggaaa/pb/v3 v3.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect diff --git a/coa/go.sum b/coa/go.sum index 6bc3346b0..fb6d59b6c 100644 --- a/coa/go.sum +++ b/coa/go.sum @@ -14,8 +14,14 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheggaaa/pb v2.0.7+incompatible/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= github.com/cheggaaa/pb/v3 v3.0.4 h1:QZEPYOj2ix6d5oEg63fbHmpolrnNiwjUsk+h74Yt4bM= github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw= @@ -23,6 +29,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= @@ -47,8 +55,6 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= -github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= -github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -158,6 +164,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/princjef/mageutil v1.0.0 h1:1OfZcJUMsooPqieOz2ooLjI+uHUo618pdaJsbCXcFjQ= github.com/princjef/mageutil v1.0.0/go.mod h1:mkShhaUomCYfAoVvTKRcbAs8YSVPdtezI5j6K+VXhrs= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index 7b99888c7..526accadc 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -20,19 +20,20 @@ import ( "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/utils" "github.com/eclipse-symphony/symphony/coa/pkg/logger" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) var mLog = logger.NewLogger("coa.runtime") type RedisPubSubProvider struct { - Config RedisPubSubProviderConfig `json:"config"` - Subscribers map[string][]v1alpha2.EventHandler `json:"subscribers"` - Client *redis.Client - Queue chan RedisMessageWrapper - Ctx context.Context - Cancel context.CancelFunc - Context *contexts.ManagerContext + Config RedisPubSubProviderConfig `json:"config"` + Subscribers map[string][]v1alpha2.EventHandler `json:"subscribers"` + Client *redis.Client + Queue chan RedisMessageWrapper + Ctx context.Context + Cancel context.CancelFunc + Context *contexts.ManagerContext + ClaimedMessages map[string]bool } type RedisMessageWrapper struct { @@ -43,17 +44,30 @@ type RedisMessageWrapper struct { } type RedisPubSubProviderConfig struct { - Name string `json:"name"` - Host string `json:"host"` - Password string `json:"password,omitempty"` - RequiresTLS bool `json:"requiresTLS,omitempty"` - NumberOfWorkers int `json:"numberOfWorkers,omitempty"` - QueueDepth int `json:"queueDepth,omitempty"` - ConsumerID string `json:"consumerID"` - ProcessingTimeout time.Duration `json:"processingTimeout,omitempty"` - RedeliverInterval time.Duration `json:"redeliverInterval,omitempty"` + Name string `json:"name"` + Host string `json:"host"` + Password string `json:"password,omitempty"` + RequiresTLS bool `json:"requiresTLS,omitempty"` + NumberOfWorkers int `json:"numberOfWorkers,omitempty"` + QueueDepth int `json:"queueDepth,omitempty"` + ConsumerID string `json:"consumerID"` + MultiInstance bool `json:"multiInstance,omitempty"` } +const ( + RedisGroup = "symphony" + // defines the interval in which the provider should check for pending messages + PendingMessagesScanInterval = 5 * time.Second + // defines after how much idle time the provider should check for pending messages that previously claimed + // by itself and reset the idle time of them to prevent them from being claimed by other clients + ExtendMessageOwnershipWithIdleTime = 30 * time.Second + // defines the interval in which the provider should check for pending messages from other clients + PendingMessagesScanIntervalOtherClient = 60 * time.Second + // defines after how much idle time the provider should check for pending messages that previously claimed + // by other clients + ClaimMessageFromOtherClientWithIdleTime = 60 * time.Second +) + func RedisPubSubProviderConfigFromMap(properties map[string]string) (RedisPubSubProviderConfig, error) { ret := RedisPubSubProviderConfig{} if v, ok := properties["name"]; ok { @@ -77,6 +91,18 @@ func RedisPubSubProviderConfigFromMap(properties map[string]string) (RedisPubSub ret.RequiresTLS = bVal } } + if v, ok := properties["multiInstance"]; ok { + val := v //providers.LoadEnv(v) + if val != "" { + bVal, err := strconv.ParseBool(val) + if err != nil { + return ret, v1alpha2.NewCOAError(err, "invalid bool value in the 'requiresTLS' setting of Redis pub-sub provider", v1alpha2.BadConfig) + } + ret.MultiInstance = bVal + } + } else { + ret.MultiInstance = false + } if v, ok := properties["numberOfWorkers"]; ok { val := v //providers.LoadEnv(v) if val != "" { @@ -99,31 +125,15 @@ func RedisPubSubProviderConfigFromMap(properties map[string]string) (RedisPubSub ret.QueueDepth = n } } + if ret.QueueDepth <= 0 { + ret.QueueDepth = 10 + } if v, ok := properties["consumerID"]; ok { ret.ConsumerID = v // providers.LoadEnv(v) + } else { + ret.ConsumerID = "" } - - if v, ok := properties["processingTimeout"]; ok { - val := v //providers.LoadEnv(v) - if val != "" { - n, err := utils.UnmarshalDuration(val) - if err != nil { - return ret, v1alpha2.NewCOAError(err, "invalid int value in the 'processingTimeout' setting of Redis pub-sub provider", v1alpha2.BadConfig) - } - ret.ProcessingTimeout = n - } - } - - if v, ok := properties["redeliverInterval"]; ok { - val := v //providers.LoadEnv(v) - if val != "" { - n, err := utils.UnmarshalDuration(val) - if err != nil { - return ret, v1alpha2.NewCOAError(err, "invalid int value in the 'redeliverInterval' setting of Redis pub-sub provider", v1alpha2.BadConfig) - } - ret.RedeliverInterval = n - } - } + ret.ConsumerID = ret.ConsumerID + generateConsumerIDSuffix() if ret.NumberOfWorkers <= 0 { ret.NumberOfWorkers = 1 @@ -143,7 +153,7 @@ func (s *RedisPubSubProvider) SetContext(ctx *contexts.ManagerContext) { func (i *RedisPubSubProvider) InitWithMap(properties map[string]string) error { config, err := RedisPubSubProviderConfigFromMap(properties) if err != nil { - mLog.Debugf(" P (Redis PubSub) : failed to initialize provider %v", err) + mLog.Errorf(" P (Redis PubSub) : failed to initialize provider %v", err) return err } return i.Init(config) @@ -152,7 +162,7 @@ func (i *RedisPubSubProvider) InitWithMap(properties map[string]string) error { func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { vConfig, err := toRedisPubSubProviderConfig(config) if err != nil { - mLog.Debugf(" P (Redis PubSub): failed to parse provider config %+v", err) + mLog.Errorf(" P (Redis PubSub): failed to parse provider config %+v", err) return v1alpha2.NewCOAError(nil, "provided config is not a valid redis pub-sub provider config", v1alpha2.BadConfig) } i.Config = vConfig @@ -160,6 +170,9 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { return v1alpha2.NewCOAError(nil, "Redis host is not supplied", v1alpha2.MissingConfig) } + i.Ctx, i.Cancel = context.WithCancel(context.Background()) + i.ClaimedMessages = make(map[string]bool) + i.Subscribers = make(map[string][]v1alpha2.EventHandler) options := &redis.Options{ Addr: i.Config.Host, @@ -174,12 +187,11 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { } } client := redis.NewClient(options) - if _, err := client.Ping().Result(); err != nil { - mLog.Debugf(" P (Redis PubSub): failed to connect to redis %+v", err) + if _, err := client.Ping(i.Ctx).Result(); err != nil { + mLog.Errorf(" P (Redis PubSub): failed to connect to redis %+v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("redis stream: error connecting to redis at %s", i.Config.Host), v1alpha2.InternalError) } i.Client = client - i.Ctx, i.Cancel = context.WithCancel(context.Background()) i.Queue = make(chan RedisMessageWrapper, int(i.Config.QueueDepth)) for k := uint(0); k < uint(i.Config.NumberOfWorkers); k++ { go i.worker() @@ -198,6 +210,8 @@ func (i *RedisPubSubProvider) worker() { } } func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { + i.ClaimedMessages[msg.MessageID] = true + defer delete(i.ClaimedMessages, msg.MessageID) var evt v1alpha2.Event err := json.Unmarshal([]byte(utils.FormatAsString(msg.Message)), &evt) if err != nil { @@ -206,14 +220,14 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { if err := msg.Handler(msg.Topic, evt); err != nil { return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError) } - if err := i.Client.XAck(msg.Topic, i.Config.ConsumerID, msg.MessageID).Err(); err != nil { + if err := i.Client.XAck(i.Ctx, msg.Topic, i.Config.ConsumerID, msg.MessageID).Err(); err != nil { return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to acknowledge message %s", msg.MessageID), v1alpha2.InternalError) } return nil } func (i *RedisPubSubProvider) Publish(topic string, event v1alpha2.Event) error { - _, err := i.Client.XAdd(&redis.XAddArgs{ + _, err := i.Client.XAdd(i.Ctx, &redis.XAddArgs{ Stream: topic, Values: map[string]interface{}{"data": event}, }).Result() @@ -224,14 +238,17 @@ func (i *RedisPubSubProvider) Publish(topic string, event v1alpha2.Event) error return nil } func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHandler) error { - err := i.Client.XGroupCreateMkStream(topic, i.Config.ConsumerID, "0").Err() + err := i.Client.XGroupCreateMkStream(i.Ctx, topic, RedisGroup, "0").Err() //Ignore BUSYGROUP errors if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { mLog.Debugf(" P (Redis PubSub) : failed to subscribe %v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError) } go i.pollNewMessagesLoop(topic, handler) - go i.reclaimPendingMessagesLoop(topic, handler) + go i.ClaimMessageLoop(topic, i.Config.ConsumerID, handler, PendingMessagesScanInterval, ExtendMessageOwnershipWithIdleTime) + if i.Config.MultiInstance { + go i.ClaimMessageLoop(topic, "", handler, PendingMessagesScanIntervalOtherClient, ClaimMessageFromOtherClientWithIdleTime) + } return nil } @@ -240,8 +257,8 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 if i.Ctx.Err() != nil { return } - streams, err := i.Client.XReadGroup(&redis.XReadGroupArgs{ - Group: i.Config.ConsumerID, + streams, err := i.Client.XReadGroup(i.Ctx, &redis.XReadGroupArgs{ + Group: RedisGroup, Consumer: i.Config.ConsumerID, Streams: []string{topic, ">"}, Count: int64(i.Config.QueueDepth), @@ -260,6 +277,11 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 func (i *RedisPubSubProvider) enqueueMessages(topic string, handler v1alpha2.EventHandler, msgs []redis.XMessage) { for _, msg := range msgs { + if _, ok := i.ClaimedMessages[msg.ID]; ok { + mLog.Debugf(" P (Redis PubSub) : claimed old message %s", msg.ID) + continue + } + mLog.Debugf(" P (Redis PubSub) : claimed new message %s", msg.ID) rmsg := createRedisMessageWrapper(topic, handler, msg) select { case i.Queue <- rmsg: @@ -282,94 +304,62 @@ func createRedisMessageWrapper(topic string, handler v1alpha2.EventHandler, msg } } -func (i *RedisPubSubProvider) reclaimPendingMessagesLoop(topic string, handler v1alpha2.EventHandler) { - if i.Config.ProcessingTimeout == 0 || i.Config.RedeliverInterval == 0 { - return - } - i.reclaimPendingMessages(topic, handler) - reclaimTicker := time.NewTicker(i.Config.RedeliverInterval) +func (i *RedisPubSubProvider) ClaimMessageLoop(topic string, consumerId string, handler v1alpha2.EventHandler, scanInterval time.Duration, messageIdleTime time.Duration) { + i.reclaimPendingMessages(topic, messageIdleTime, consumerId, handler) + reclaimTicker := time.NewTicker(scanInterval) + defer reclaimTicker.Stop() for { select { case <-i.Ctx.Done(): return case <-reclaimTicker.C: - i.reclaimPendingMessages(topic, handler) + i.reclaimPendingMessages(topic, messageIdleTime, consumerId, handler) } } } -func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, handler v1alpha2.EventHandler) { +func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time.Duration, consumer string, handler v1alpha2.EventHandler) { + mLog.Debugf(" P (Redis PubSub) : reclaiming pending messages for consumer %s", consumer) + start := "-" for { - pendingResult, err := i.Client.XPendingExt(&redis.XPendingExtArgs{ - Stream: topic, - Group: i.Config.ConsumerID, - Start: "-", - End: "+", - Count: int64(i.Config.QueueDepth), + pendingResult, err := i.Client.XPendingExt(i.Ctx, &redis.XPendingExtArgs{ + Stream: topic, + Group: RedisGroup, + Start: start, + End: "+", + Count: int64(i.Config.QueueDepth), + Idle: idleTime, + Consumer: consumer, }).Result() if err != nil && !errors.Is(err, redis.Nil) { mLog.Debugf(" P (Redis PubSub) : failed to get pending message %v", err) break } - msgIDs := make([]string, 0, len(pendingResult)) - for _, msg := range pendingResult { - if msg.Idle >= i.Config.ProcessingTimeout { - msgIDs = append(msgIDs, msg.ID) - } - } - if len(msgIDs) == 0 { - break - } - claimResult, err := i.Client.XClaim(&redis.XClaimArgs{ - Stream: topic, - Group: i.Config.ConsumerID, - Consumer: i.Config.ConsumerID, - MinIdle: i.Config.ProcessingTimeout, - Messages: msgIDs, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - mLog.Debugf(" P (Redis PubSub) : failed to reclaim pending message %v", err) + if len(pendingResult) == 0 { break } - i.enqueueMessages(topic, handler, claimResult) - // If the Redis nil error is returned, it means some messages in the pending - // state no longer exist. We need to acknowledge these mesages to - // remove them from the pending list - if errors.Is(err, redis.Nil) { - // Build a set of message IDs that were not returned - // that potentitally no longer exist - expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) - for _, id := range msgIDs { - expectedMsgIDs[id] = struct{}{} - } - for _, claimed := range claimResult { - delete(expectedMsgIDs, claimed.ID) - } - i.removeMessagesThatNoLongerExistFromPending(topic, expectedMsgIDs, handler) + start = pendingResult[len(pendingResult)-1].ID + msgIDs := make([]string, 0, len(pendingResult)) + for _, msg := range pendingResult { + msgIDs = append(msgIDs, msg.ID) } + i.XClaimWrapper(topic, idleTime, msgIDs, handler) } } - -func (i *RedisPubSubProvider) removeMessagesThatNoLongerExistFromPending(topic string, messageIDs map[string]struct{}, handler v1alpha2.EventHandler) { - for pendingID := range messageIDs { - claimResultSingleMsg, err := i.Client.XClaim(&redis.XClaimArgs{ - Stream: topic, - Group: i.Config.ConsumerID, - Consumer: i.Config.ConsumerID, - MinIdle: i.Config.ProcessingTimeout, - Messages: []string{pendingID}, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - mLog.Debugf(" P (Redis PubSub) : failed to remove pending message %v", err) - continue - } - if errors.Is(err, redis.Nil) { - if err = i.Client.XAck(topic, i.Config.ConsumerID, pendingID).Err(); err != nil { - mLog.Debugf(" P (Redis PubSub) : error acknowledging Redis message %s after failed claim for %s - %v", i.Config.ConsumerID, pendingID, err) - } else { - i.enqueueMessages(topic, handler, claimResultSingleMsg) - } - } +func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, msgIDs []string, handler v1alpha2.EventHandler) { + claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{ + Stream: topic, + Group: RedisGroup, + Consumer: i.Config.ConsumerID, + MinIdle: minIdle, + Messages: msgIDs, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + mLog.Debugf(" P (Redis PubSub) : failed to reclaim pending message %v", err) + return + } + if err == nil || errors.Is(err, redis.Nil) { + i.enqueueMessages(topic, handler, claimResult) } } @@ -379,12 +369,25 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP if err != nil { return ret, err } - err = json.Unmarshal(data, &ret) - //ret.Name = providers.LoadEnv(ret.Name) - //ret.Host = providers.LoadEnv(ret.Host) - //ret.Password = providers.LoadEnv(ret.Password) - if ret.NumberOfWorkers <= 0 { - ret.NumberOfWorkers = 1 + var configs map[string]interface{} + err = json.Unmarshal(data, &configs) + if err != nil { + mLog.Errorf(" P (Redis PubSub): failed to parse to map[string]interface{} %+v", err) + return ret, err + } + configStrings := map[string]string{} + for k, v := range configs { + configStrings[k] = utils.FormatAsString(v) + } + + ret, err = RedisPubSubProviderConfigFromMap(configStrings) + if err != nil { + mLog.Errorf(" P (Redis PubSub): failed to parse to RedisPubSubProviderConfig %+v", err) + return ret, err } return ret, err } + +func generateConsumerIDSuffix() string { + return fmt.Sprintf("%d", time.Now().UnixNano()) +} diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go index 8263dbb4d..515ae572c 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go @@ -7,13 +7,16 @@ package redis import ( + "context" "encoding/json" + "errors" "os" "testing" "time" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) @@ -77,7 +80,8 @@ func TestInitWithMap(t *testing.T) { "host": "localhost:6379", }, ) - // assert.Nil(t, err) // Provider initialization succeeds if redis is running + + assert.Nil(t, err) // Provider initialization succeeds if redis is running err = provider.InitWithMap( map[string]string{ "name": "test", @@ -119,12 +123,10 @@ func TestBasicPubSub(t *testing.T) { msg := "" provider := RedisPubSubProvider{} err := provider.Init(RedisPubSubProviderConfig{ - Name: "test", - Host: "localhost:6379", - Password: "", - NumberOfWorkers: 1, - ProcessingTimeout: 5, - RedeliverInterval: 5, + Name: "test", + Host: "localhost:6379", + Password: "", + NumberOfWorkers: 1, }) assert.Nil(t, err) provider.Subscribe("test", func(topic string, message v1alpha2.Event) error { @@ -194,13 +196,13 @@ func TestBasicPubSubTwoProvidersComplexEvent(t *testing.T) { ConsumerID: "c", }) assert.Nil(t, err) - provider2.Subscribe("job", func(topic string, message v1alpha2.Event) error { + provider2.Subscribe("testjob", func(topic string, message v1alpha2.Event) error { jData, _ := json.Marshal(message.Body) json.Unmarshal(jData, &msg) sig <- 1 return nil }) - provider1.Publish("job", v1alpha2.Event{ + provider1.Publish("testjob", v1alpha2.Event{ Metadata: map[string]string{ "objectType": "mock", }, @@ -210,7 +212,7 @@ func TestBasicPubSubTwoProvidersComplexEvent(t *testing.T) { }, }) <-sig - assert.Equal(t, "do-it", msg.Action) + assert.Equal(t, v1alpha2.JobAction("do-it"), msg.Action) } func TestMultipleSubscriber(t *testing.T) { testRedis := os.Getenv("TEST_REDIS") @@ -267,12 +269,10 @@ func TestMultipleSubscriber(t *testing.T) { func TestSubscribePublish(t *testing.T) { provider := RedisPubSubProvider{} provider.Init(RedisPubSubProviderConfig{ - Name: "test", - Host: "localhost:6379", - Password: "", - NumberOfWorkers: 1, - ProcessingTimeout: 5, - RedeliverInterval: 5, + Name: "test", + Host: "localhost:6379", + Password: "", + NumberOfWorkers: 1, }) // assert.Nil(t, err) // Provider initialization succeeds if redis is running @@ -290,15 +290,13 @@ func TestSubscribePublish(t *testing.T) { func TestRedisPubSubProviderConfigFromMap(t *testing.T) { configMap := map[string]string{ - "name": "test", - "host": "localhost:6379", - "password": "123", - "requiresTLS": "true", - "numberOfWorkers": "1", - "queueDepth": "10", - "consumerID": "test-consumer", - "processingTimeout": "10", - "redeliverInterval": "10", + "name": "test", + "host": "localhost:6379", + "password": "123", + "requiresTLS": "true", + "numberOfWorkers": "1", + "queueDepth": "10", + "consumerID": "test-consumer", } config, err := RedisPubSubProviderConfigFromMap(configMap) assert.Nil(t, err) @@ -308,7 +306,96 @@ func TestRedisPubSubProviderConfigFromMap(t *testing.T) { assert.Equal(t, true, config.RequiresTLS) assert.Equal(t, 1, config.NumberOfWorkers) assert.Equal(t, 10, config.QueueDepth) - assert.Equal(t, "test-consumer", config.ConsumerID) - assert.Equal(t, time.Duration(10), config.ProcessingTimeout) - assert.Equal(t, time.Duration(10), config.RedeliverInterval) + assert.Contains(t, config.ConsumerID, "test-consumer") +} + +func TestRedisStreamBasic(t *testing.T) { + testRedis := os.Getenv("TEST_REDIS") + if testRedis == "" { + t.Skip("Skipping because TEST_REDIS enviornment variable is not set") + } + Ctx, _ := context.WithCancel(context.Background()) + options := &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + MaxRetries: 3, + MaxRetryBackoff: time.Second * 2, + } + client := redis.NewClient(options) + consumerId := "testconsumer" + topic := "test" + err := client.XGroupCreateMkStream(Ctx, topic, consumerId, "0").Err() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + mLog.Debugf(" P (Redis PubSub) : failed to subscribe %v", err) + assert.Nil(t, err) + } + _, err = client.XAdd(Ctx, &redis.XAddArgs{ + Stream: topic, + Values: map[string]interface{}{"data": v1alpha2.Event{Body: "TEST"}}, + }).Result() + assert.Nil(t, err) + + streams, err := client.XReadGroup(Ctx, &redis.XReadGroupArgs{ + Group: consumerId, + Consumer: consumerId, + Streams: []string{topic, ">"}, + Count: int64(10), + Block: 0, + }).Result() + assert.NotNil(t, streams) + assert.Nil(t, err) + + pendingResult, err := client.XPendingExt(Ctx, &redis.XPendingExtArgs{ + Stream: topic, + Group: consumerId, + Start: "-", + End: "+", + Count: int64(10), + Consumer: "", + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + mLog.Debugf(" P (Redis PubSub) : failed to get pending message %v", err) + assert.Nil(t, err) + } + msgIDs := make([]string, 0, len(pendingResult)) + for _, msg := range pendingResult { + msgIDs = append(msgIDs, msg.ID) + } + assert.NotNil(t, msgIDs) + claimResult, err := client.XClaim(Ctx, &redis.XClaimArgs{ + Stream: topic, + Group: consumerId, + Consumer: consumerId, + MinIdle: time.Duration(1 * time.Second), + Messages: msgIDs, + }).Result() + assert.Nil(t, err) + assert.Equal(t, 1, len(claimResult)) + + pendingResult, err = client.XPendingExt(Ctx, &redis.XPendingExtArgs{ + Stream: topic, + Group: consumerId, + Start: "-", + End: "+", + Count: int64(10), + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + mLog.Debugf(" P (Redis PubSub) : failed to get pending message %v", err) + assert.Nil(t, err) + } + msgIDs = make([]string, 0, len(pendingResult)) + for _, msg := range pendingResult { + msgIDs = append(msgIDs, msg.ID) + } + assert.NotNil(t, msgIDs) + claimResult, err = client.XClaim(Ctx, &redis.XClaimArgs{ + Stream: topic, + Group: consumerId, + Consumer: consumerId, + MinIdle: time.Duration(100 * time.Second), + Messages: msgIDs, + }).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(claimResult)) } diff --git a/coa/pkg/apis/v1alpha2/providers/states/redisstate/redisstate.go b/coa/pkg/apis/v1alpha2/providers/states/redisstate/redisstate.go index efccf7b13..c5a02e3bb 100644 --- a/coa/pkg/apis/v1alpha2/providers/states/redisstate/redisstate.go +++ b/coa/pkg/apis/v1alpha2/providers/states/redisstate/redisstate.go @@ -22,7 +22,7 @@ import ( providers "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" "github.com/eclipse-symphony/symphony/coa/pkg/logger" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) var rLog = logger.NewLogger("coa.runtime") @@ -70,6 +70,8 @@ type RedisStateProvider struct { Config RedisStateProviderConfig Context *contexts.ManagerContext Client *redis.Client + Ctx context.Context + Cancel context.CancelFunc } func (r *RedisStateProvider) ID() string { @@ -98,7 +100,7 @@ func (r *RedisStateProvider) Init(config providers.IProviderConfig) error { if r.Config.Host == "" { return v1alpha2.NewCOAError(nil, "Redis host is not supplied", v1alpha2.MissingConfig) } - + r.Ctx, r.Cancel = context.WithCancel(context.Background()) options := &redis.Options{ Addr: r.Config.Host, Password: r.Config.Password, @@ -112,7 +114,7 @@ func (r *RedisStateProvider) Init(config providers.IProviderConfig) error { } } client := redis.NewClient(options) - if _, err := client.Ping().Result(); err != nil { + if _, err := client.Ping(r.Ctx).Result(); err != nil { rLog.Debugf(" P (Redis State): failed to connect to redis %+v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("redis stream: error connecting to redis at %s", r.Config.Host), v1alpha2.InternalError) } @@ -146,7 +148,7 @@ func (r *RedisStateProvider) Upsert(ctx context.Context, entry states.UpsertRequ } if entry.Options.UpdateStateOnly { var existing string - existing, err = r.Client.HGet(key, "values").Result() + existing, err = r.Client.HGet(r.Ctx, key, "values").Result() if err != nil { return entry.Value.ID, v1alpha2.NewCOAError(nil, fmt.Sprintf("redis state %s not found. Cannot update state only", entry.Value.ID), v1alpha2.BadRequest) } @@ -166,7 +168,7 @@ func (r *RedisStateProvider) Upsert(ctx context.Context, entry states.UpsertRequ } oldEntryDict["status"] = oldStatusDict body, _ = json.Marshal(oldEntryDict) - _, err = r.Client.HSet(key, "values", string(body)).Result() + _, err = r.Client.HSet(r.Ctx, key, "values", string(body)).Result() return entry.Value.ID, err } @@ -174,7 +176,7 @@ func (r *RedisStateProvider) Upsert(ctx context.Context, entry states.UpsertRequ "values": string(body), "etag": entry.Value.ETag, } - _, err = r.Client.HSet(key, properties).Result() + _, err = r.Client.HSet(r.Ctx, key, properties).Result() return entry.Value.ID, err } @@ -205,13 +207,13 @@ func (r *RedisStateProvider) List(ctx context.Context, request states.ListReques for { var err error - keys, cursor, err = r.Client.Scan(cursor, filter, entryCountPerList).Result() + keys, cursor, err = r.Client.Scan(r.Ctx, cursor, filter, entryCountPerList).Result() if err != nil { rLog.Errorf(" P (Redis State): failed to get all the keys matching pattern %s: %+v", keyPrefix, err) } for _, key := range keys { - result, err := r.Client.HGetAll(key).Result() + result, err := r.Client.HGetAll(r.Ctx, key).Result() if err != nil || len(result) == 0 { rLog.Errorf(" P (Redis State): failed to get entry for key %s: %+v", key, err) continue @@ -263,7 +265,7 @@ func (r *RedisStateProvider) Delete(ctx context.Context, request states.DeleteRe rLog.Debugf(" P (Redis State): delete state %s with keyPrefix %s, traceId: %s", request.ID, keyPrefix, span.SpanContext().TraceID().String()) HKey := fmt.Sprintf("%s%s%s", keyPrefix, separator, request.ID) - _, err = r.Client.Del(HKey).Result() + _, err = r.Client.Del(r.Ctx, HKey).Result() return nil } @@ -285,7 +287,7 @@ func (r *RedisStateProvider) Get(ctx context.Context, request states.GetRequest) HKey := fmt.Sprintf("%s%s%s", keyPrefix, separator, request.ID) var data map[string]string - data, err = r.Client.HGetAll(HKey).Result() + data, err = r.Client.HGetAll(r.Ctx, HKey).Result() if err != nil { rLog.Errorf(" P (Redis State): failed to get state %s with keyPrefix %s, traceId: %s", request.ID, keyPrefix, span.SpanContext().TraceID().String()) return states.StateEntry{}, err diff --git a/go.work.sum b/go.work.sum index 2d92c582b..5a5453423 100644 --- a/go.work.sum +++ b/go.work.sum @@ -22,6 +22,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cheggaaa/pb v2.0.7+incompatible h1:gLKifR1UkZ/kLkda5gC0K6c8g+jU2sINPtBeOiNlMhU= github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY= diff --git a/packages/helm/symphony/files/symphony-api.json b/packages/helm/symphony/files/symphony-api.json index 6b6acd4fc..f693c6999 100644 --- a/packages/helm/symphony/files/symphony-api.json +++ b/packages/helm/symphony/files/symphony-api.json @@ -37,7 +37,8 @@ "host": "{{ include "symphony.redisHost" . }}", "requireTLS": false, "password": "", - "numberOfWorkers": 1 + "numberOfWorkers": 1, + "multiInstance": false } } {{- else }} diff --git a/test/integration/scenarios/02.basic/manifest/oss/instance.yaml b/test/integration/scenarios/02.basic/manifest/oss/instance.yaml index f235f2776..4da0d6a8e 100755 --- a/test/integration/scenarios/02.basic/manifest/oss/instance.yaml +++ b/test/integration/scenarios/02.basic/manifest/oss/instance.yaml @@ -14,3 +14,6 @@ spec: solution: SOLUTIONREFNAME target: name: TARGETREFNAME + reconciliationPolicy: + state: active + interval: 1m From 0cec5917be0a42e95e64ab24043eb49a34df5f4b Mon Sep 17 00:00:00 2001 From: Xingdong Date: Tue, 9 Jul 2024 14:59:23 +0800 Subject: [PATCH 2/8] test fix --- .../providers/pubsub/redis/redis_test.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go index 515ae572c..00750deba 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis_test.go @@ -74,15 +74,18 @@ func TestInit(t *testing.T) { func TestInitWithMap(t *testing.T) { provider := RedisPubSubProvider{} - err := provider.InitWithMap( - map[string]string{ - "name": "test", - "host": "localhost:6379", - }, - ) + testRedis := os.Getenv("TEST_REDIS") + if testRedis != "" { + err := provider.InitWithMap( + map[string]string{ + "name": "test", + "host": "localhost:6379", + }, + ) + assert.Nil(t, err) // Provider initialization succeeds if redis is running + } - assert.Nil(t, err) // Provider initialization succeeds if redis is running - err = provider.InitWithMap( + err := provider.InitWithMap( map[string]string{ "name": "test", }, From 16e7a30115cedd9c749e05eac67aa4ed81207539 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Tue, 9 Jul 2024 17:07:15 +0800 Subject: [PATCH 3/8] remove logs --- .../apis/v1alpha2/providers/pubsub/redis/redis.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index 526accadc..b39fac263 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -232,7 +232,7 @@ func (i *RedisPubSubProvider) Publish(topic string, event v1alpha2.Event) error Values: map[string]interface{}{"data": event}, }).Result() if err != nil { - mLog.Debugf(" P (Redis PubSub) : failed to publish message %v", err) + mLog.Errorf(" P (Redis PubSub) : failed to publish message %v", err) return v1alpha2.NewCOAError(err, "failed to publish message", v1alpha2.InternalError) } return nil @@ -241,7 +241,7 @@ func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHand err := i.Client.XGroupCreateMkStream(i.Ctx, topic, RedisGroup, "0").Err() //Ignore BUSYGROUP errors if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { - mLog.Debugf(" P (Redis PubSub) : failed to subscribe %v", err) + mLog.Errorf(" P (Redis PubSub) : failed to subscribe %v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError) } go i.pollNewMessagesLoop(topic, handler) @@ -265,7 +265,7 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 Block: 0, }).Result() if err != nil { - mLog.Debugf(" P (Redis PubSub) : failed to poll message %v", err) + mLog.Errorf(" P (Redis PubSub) : failed to poll message %v", err) time.Sleep(30 * time.Second) continue } @@ -278,10 +278,8 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 func (i *RedisPubSubProvider) enqueueMessages(topic string, handler v1alpha2.EventHandler, msgs []redis.XMessage) { for _, msg := range msgs { if _, ok := i.ClaimedMessages[msg.ID]; ok { - mLog.Debugf(" P (Redis PubSub) : claimed old message %s", msg.ID) continue } - mLog.Debugf(" P (Redis PubSub) : claimed new message %s", msg.ID) rmsg := createRedisMessageWrapper(topic, handler, msg) select { case i.Queue <- rmsg: @@ -319,7 +317,6 @@ func (i *RedisPubSubProvider) ClaimMessageLoop(topic string, consumerId string, } func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time.Duration, consumer string, handler v1alpha2.EventHandler) { - mLog.Debugf(" P (Redis PubSub) : reclaiming pending messages for consumer %s", consumer) start := "-" for { pendingResult, err := i.Client.XPendingExt(i.Ctx, &redis.XPendingExtArgs{ @@ -332,7 +329,7 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time Consumer: consumer, }).Result() if err != nil && !errors.Is(err, redis.Nil) { - mLog.Debugf(" P (Redis PubSub) : failed to get pending message %v", err) + mLog.Errorf(" P (Redis PubSub) : failed to get pending message %v", err) break } if len(pendingResult) == 0 { @@ -355,7 +352,7 @@ func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, Messages: msgIDs, }).Result() if err != nil && !errors.Is(err, redis.Nil) { - mLog.Debugf(" P (Redis PubSub) : failed to reclaim pending message %v", err) + mLog.Error(" P (Redis PubSub) : failed to reclaim pending message %v", err) return } if err == nil || errors.Is(err, redis.Nil) { From a9e94f1a437ec0b3fd895ad82126c397b4ca2338 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Tue, 9 Jul 2024 18:43:00 +0800 Subject: [PATCH 4/8] add lock for inmemory map --- .../v1alpha2/providers/pubsub/redis/redis.go | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index b39fac263..6db8f4aca 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "strconv" + "sync" "time" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" @@ -34,6 +35,7 @@ type RedisPubSubProvider struct { Cancel context.CancelFunc Context *contexts.ManagerContext ClaimedMessages map[string]bool + TopicLock map[string]*sync.Mutex } type RedisMessageWrapper struct { @@ -172,6 +174,7 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { i.Ctx, i.Cancel = context.WithCancel(context.Background()) i.ClaimedMessages = make(map[string]bool) + i.TopicLock = make(map[string]*sync.Mutex) i.Subscribers = make(map[string][]v1alpha2.EventHandler) options := &redis.Options{ @@ -211,18 +214,20 @@ func (i *RedisPubSubProvider) worker() { } func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { i.ClaimedMessages[msg.MessageID] = true - defer delete(i.ClaimedMessages, msg.MessageID) var evt v1alpha2.Event err := json.Unmarshal([]byte(utils.FormatAsString(msg.Message)), &evt) if err != nil { return v1alpha2.NewCOAError(err, "failed to unmarshal event", v1alpha2.InternalError) } if err := msg.Handler(msg.Topic, evt); err != nil { + delete(i.ClaimedMessages, msg.MessageID) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError) } - if err := i.Client.XAck(i.Ctx, msg.Topic, i.Config.ConsumerID, msg.MessageID).Err(); err != nil { - return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to acknowledge message %s", msg.MessageID), v1alpha2.InternalError) - } + i.TopicLock[msg.Topic].Lock() + defer i.TopicLock[msg.Topic].Unlock() + i.Client.XAck(i.Ctx, msg.Topic, i.Config.ConsumerID, msg.MessageID) + delete(i.ClaimedMessages, msg.MessageID) + return nil } @@ -244,6 +249,7 @@ func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHand mLog.Errorf(" P (Redis PubSub) : failed to subscribe %v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError) } + i.TopicLock[topic] = &sync.Mutex{} go i.pollNewMessagesLoop(topic, handler) go i.ClaimMessageLoop(topic, i.Config.ConsumerID, handler, PendingMessagesScanInterval, ExtendMessageOwnershipWithIdleTime) if i.Config.MultiInstance { @@ -277,9 +283,6 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 func (i *RedisPubSubProvider) enqueueMessages(topic string, handler v1alpha2.EventHandler, msgs []redis.XMessage) { for _, msg := range msgs { - if _, ok := i.ClaimedMessages[msg.ID]; ok { - continue - } rmsg := createRedisMessageWrapper(topic, handler, msg) select { case i.Queue <- rmsg: @@ -344,6 +347,8 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time } } func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, msgIDs []string, handler v1alpha2.EventHandler) { + i.TopicLock[topic].Lock() + defer i.TopicLock[topic].Unlock() claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{ Stream: topic, Group: RedisGroup, @@ -355,9 +360,13 @@ func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, mLog.Error(" P (Redis PubSub) : failed to reclaim pending message %v", err) return } - if err == nil || errors.Is(err, redis.Nil) { - i.enqueueMessages(topic, handler, claimResult) + filteredClaimResult := make([]redis.XMessage, 0, len(claimResult)) + for _, msg := range claimResult { + if _, ok := i.ClaimedMessages[msg.ID]; !ok { + filteredClaimResult = append(filteredClaimResult, msg) + } } + i.enqueueMessages(topic, handler, filteredClaimResult) } func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubProviderConfig, error) { From c08d9e94ab063c6ab619dddd4b5e5f277b9ab623 Mon Sep 17 00:00:00 2001 From: Xingdong Li Date: Tue, 9 Jul 2024 21:07:12 +0800 Subject: [PATCH 5/8] fix wrong group name --- coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index 6db8f4aca..9e96783f7 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -225,7 +225,7 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { } i.TopicLock[msg.Topic].Lock() defer i.TopicLock[msg.Topic].Unlock() - i.Client.XAck(i.Ctx, msg.Topic, i.Config.ConsumerID, msg.MessageID) + i.Client.XAck(i.Ctx, msg.Topic, RedisGroup, msg.MessageID) delete(i.ClaimedMessages, msg.MessageID) return nil From 362cd990614fc245cf5f00dc6e2278d80d810517 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Wed, 10 Jul 2024 15:30:16 +0800 Subject: [PATCH 6/8] prevent retry --- api/pkg/apis/v1alpha1/vendors/job-vendor.go | 3 +- api/pkg/apis/v1alpha1/vendors/stage-vendor.go | 11 ++--- .../v1alpha2/providers/pubsub/redis/redis.go | 8 ++-- packages/go.work.sum | 41 +++++++++++++++++-- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/api/pkg/apis/v1alpha1/vendors/job-vendor.go b/api/pkg/apis/v1alpha1/vendors/job-vendor.go index 8c48a3d4e..e563b20f2 100644 --- a/api/pkg/apis/v1alpha1/vendors/job-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/job-vendor.go @@ -66,7 +66,8 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana if err != nil && v1alpha2.IsDelayed(err) { go e.Vendor.Context.Publish(topic, event) } - return err + // job reconciler already has a retry mechanism, return nil to avoid retrying + return nil }) e.Vendor.Context.Subscribe("heartbeat", func(topic string, event v1alpha2.Event) error { return e.JobsManager.HandleHeartBeatEvent(context.Background(), event) diff --git a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go index 10fba49c5..01509d052 100644 --- a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go @@ -95,10 +95,7 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa activation, err := s.ActivationsManager.GetState(context.TODO(), actData.Activation, actData.Namespace) if err != nil { log.Error("V (Stage): unable to find activation: %+v", err) - err = s.reportActivationStatusWithBadRequest(actData.Activation, actData.Namespace, err) - // If report status succeeded, return an empty err so the subscribe function will not be retried - // The actual error will be stored in Activation cr - return err + return nil } evt, err := s.StageManager.HandleActivationEvent(context.TODO(), actData, *campaign.Spec, activation) @@ -139,7 +136,11 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa return err } status.Outputs["__namespace"] = triggerData.Namespace - + _, err = s.ActivationsManager.GetState(context.TODO(), triggerData.Activation, triggerData.Namespace) + if err != nil { + log.Error("V (Stage): unable to find activation: %+v", err) + return nil + } campaignName := api_utils.ReplaceSeperator(triggerData.Campaign) campaign, err := s.CampaignsManager.GetState(context.TODO(), campaignName, triggerData.Namespace) if err != nil { diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index 9e96783f7..28668300d 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -272,12 +272,12 @@ func (i *RedisPubSubProvider) pollNewMessagesLoop(topic string, handler v1alpha2 }).Result() if err != nil { mLog.Errorf(" P (Redis PubSub) : failed to poll message %v", err) - time.Sleep(30 * time.Second) continue } for _, s := range streams { i.enqueueMessages(s.Stream, handler, s.Messages) } + time.Sleep(PendingMessagesScanInterval) } } @@ -343,16 +343,16 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time for _, msg := range pendingResult { msgIDs = append(msgIDs, msg.ID) } - i.XClaimWrapper(topic, idleTime, msgIDs, handler) + i.XClaimWrapper(topic, idleTime, consumer, msgIDs, handler) } } -func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, msgIDs []string, handler v1alpha2.EventHandler) { +func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, consumer string, msgIDs []string, handler v1alpha2.EventHandler) { i.TopicLock[topic].Lock() defer i.TopicLock[topic].Unlock() claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{ Stream: topic, Group: RedisGroup, - Consumer: i.Config.ConsumerID, + Consumer: consumer, MinIdle: minIdle, Messages: msgIDs, }).Result() diff --git a/packages/go.work.sum b/packages/go.work.sum index 95b892dd5..5089a5fbb 100644 --- a/packages/go.work.sum +++ b/packages/go.work.sum @@ -29,6 +29,7 @@ cloud.google.com/go/cloudtasks v1.12.1/go.mod h1:a9udmnou9KO2iulGscKR0qBYjreuX8o cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/contactcenterinsights v1.10.0/go.mod h1:bsg/R7zGLYMVxFFzfh9ooLTruLRCG9fnzhH9KznHhbM= cloud.google.com/go/container v1.24.0/go.mod h1:lTNExE2R7f+DLbAN+rJiKTisauFCaoDq6NURZ83eVH4= @@ -123,6 +124,7 @@ cloud.google.com/go/vpcaccess v1.7.1/go.mod h1:FogoD46/ZU+JUBX9D606X21EnxiszYi2t cloud.google.com/go/webrisk v1.9.1/go.mod h1:4GCmXKcOa2BZcZPn6DCEvE7HypmEJcJkr4mtM+sqYPc= cloud.google.com/go/websecurityscanner v1.6.1/go.mod h1:Njgaw3rttgRHXzwCB8kgCYqv5/rGpFCsBOvPbYgszpg= cloud.google.com/go/workflows v1.11.1/go.mod h1:Z+t10G1wF7h8LgdY/EmRcQY8ptBD/nvofaL6FqlET6g= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0/go.mod h1:OahwfttHWG6eJ0clwcfBAHoDI6X/LV/15hx/wlMZSrU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= @@ -152,6 +154,8 @@ github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3Q github.com/Masterminds/sprig v2.22.0+incompatible h1:z4yfnGrZ7netVz+0EDJ0Wi+5VZCSYp4Z0m2dk6cEM60= github.com/Masterminds/sprig/v3 v3.2.1/go.mod h1:UoaO7Yp8KlPnJIYWTFkMaqPUYKTfGFPhxNuwnnxkKlk= github.com/Masterminds/vcs v1.13.3/go.mod h1:TiE7xuEjl1N4j016moRd6vezp6e6Lz23gypeXfzXeW8= +github.com/Microsoft/cosesign1go v1.1.0/go.mod h1:o+sw7nhlGE6twhfjXQDWmBJO8zmfQXEmCcXEi3zha8I= +github.com/Microsoft/didx509go v0.0.2/go.mod h1:F+msvNlKCEm3RgUE3kRpi7E+6hdR6r5PtOLWQKYfGbs= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw= @@ -222,11 +226,13 @@ github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/container-orchestrated-devices/container-device-interface v0.5.4/go.mod h1:DjE95rfPiiSmG7uVXtg0z6MnPm/Lx4wxKCIts0ZE0vg= +github.com/container-orchestrated-devices/container-device-interface v0.6.1/go.mod h1:40T6oW59rFrL/ksiSs7q45GzjGlbvxnA4xaK6cyq+kA= github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE= github.com/containerd/aufs v0.0.0-20201003224125-76a6863f2989/go.mod h1:AkGGQs9NM2vtYHaUen+NljV0/baGCAPELGm2q9ZXpWU= github.com/containerd/aufs v0.0.0-20210316121734-20793ff83c97/go.mod h1:kL5kd6KM5TzQjR79jljyi4olc1Vrx6XBlcyj3gNv2PU= @@ -243,7 +249,7 @@ github.com/containerd/cgroups v0.0.0-20200824123100-0b889c03f102/go.mod h1:s5q4S github.com/containerd/cgroups v0.0.0-20210114181951-8a68de567b68/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8= -github.com/containerd/cgroups/v3 v3.0.2/go.mod h1:JUgITrzdFqp42uI2ryGA+ge0ap/nxzYgkGmIcetmErE= +github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE= @@ -287,6 +293,7 @@ github.com/containerd/nri v0.0.0-20201007170849-eb1350a75164/go.mod h1:+2wGSDGFY github.com/containerd/nri v0.0.0-20210316161719-dbaa18c31c14/go.mod h1:lmxnXF6oMkbqs39FiCt1s0R2HSMhcLel9vNL3m4AaeY= github.com/containerd/nri v0.1.0/go.mod h1:lmxnXF6oMkbqs39FiCt1s0R2HSMhcLel9vNL3m4AaeY= github.com/containerd/nri v0.4.0/go.mod h1:Zw9q2lP16sdg0zYybemZ9yTDy8g7fPCIB3KXOGlggXI= +github.com/containerd/protobuild v0.3.0/go.mod h1:5mNMFKKAwCIAkFBPiOdtRx2KiQlyEJeMXnL5R1DsWu8= github.com/containerd/stargz-snapshotter/estargz v0.4.1/go.mod h1:x7Q9dg9QYb4+ELgxmo4gBUeJB0tl5dqH1Sdz0nJU1QM= github.com/containerd/stargz-snapshotter/estargz v0.14.3/go.mod h1:KY//uOCIkSuNAHhJogcZtrNHdKrA99/FCCRjE3HD36o= github.com/containerd/ttrpc v0.0.0-20190828154514-0e0f228740de/go.mod h1:PvCDdDGpgqzQIzDW1TphrGLssLDZp2GuS+X5DkEJB8o= @@ -357,6 +364,7 @@ github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnG github.com/daviddengcn/go-colortext v1.0.0/go.mod h1:zDqEI5NVUop5QPpVJUxE9UO10hRnmkD5G4Pmri9+m4c= github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d/go.mod h1:tmAIfUFEirG/Y8jhZ9M+h36obRZAk/1fcSpXwAVlfqE= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgraph-io/badger/v3 v3.2103.2/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= @@ -386,7 +394,9 @@ github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRr github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -436,6 +446,7 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20180201030542-885f9cc04c9c/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= @@ -456,6 +467,7 @@ github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= @@ -472,6 +484,7 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-containerregistry v0.5.1/go.mod h1:Ct15B4yir3PLOP5jsy0GNeYVaIZs/MK/Jz5any1wFW0= github.com/google/go-containerregistry v0.14.0/go.mod h1:aiJ2fp/SXvkWgmYHioXnbMdlgB8eXiiYOY55gfN91Wk= +github.com/google/go-containerregistry v0.17.0/go.mod h1:u0qB2l7mvtWVR5kNcbFIhFY1hLbf8eeGapA+vbFDCtQ= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -533,10 +546,14 @@ github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NB github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y= github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ= +github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU= github.com/lestrrat-go/httpcc v1.0.1/go.mod h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E= github.com/lestrrat-go/iter v1.0.1/go.mod h1:zIdgO1mRKhn8l9vrZJZz9TUMMFbQbLeTsbqPDrJ/OJc= +github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4= github.com/lestrrat-go/jwx v1.2.25/go.mod h1:zoNuZymNl5lgdcu6P7K6ie2QRll5HVfF4xwxBBK1NxY= +github.com/lestrrat-go/jwx v1.2.28/go.mod h1:nF+91HEMh/MYFVwKPl5HHsBGMPscqbQb+8IDQdIazP8= github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linuxkit/virtsock v0.0.0-20201010232012-f8cee7dfc7a3/go.mod h1:3r6x7q95whyfWQpmGZTu3gk3v2YkMi05HEzl7Tf7YEo= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= @@ -569,6 +586,7 @@ github.com/moby/sys/signal v0.6.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn github.com/moby/sys/signal v0.7.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn88Kg8Tg= github.com/moby/sys/symlink v0.1.0/go.mod h1:GGDODQmbFOjFsXvfLVn3+ZRxkch54RkSiGqsZeMYowQ= github.com/moby/sys/symlink v0.2.0/go.mod h1:7uZVF2dqJjG/NsClqul95CqKOBRQyYSNnJ6BMgR/gFs= +github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= github.com/moby/term v0.0.0-20200312100748-672ec06f55cd/go.mod h1:DdlQx2hp0Ss5/fLikoLlEeIYiATotOjgB//nb973jeo= github.com/moby/term v0.0.0-20210610120745-9d4ed1856297/go.mod h1:vgPCkQMyxTZ7IDy8SXRufE172gr8+K/JE/7hHFxHW3A= github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= @@ -612,6 +630,7 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/open-policy-agent/opa v0.42.2/go.mod h1:MrmoTi/BsKWT58kXlVayBb+rYVeaMwuBm3nYAN3923s= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -622,6 +641,8 @@ github.com/opencontainers/image-spec v1.0.2-0.20211117181255-693428a734f5/go.mod github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b/go.mod h1:3OVijpioIKYWTqjiG0zfF6wvoJ4fAXGbjdZuI2NgsRQ= github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= +github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf3phss= +github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8= github.com/opencontainers/runtime-spec v0.1.2-0.20190507144316-5b71a03e2700/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.0.1/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.0.2-0.20190207185410-29686dbc5559/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= @@ -629,6 +650,7 @@ github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/ github.com/opencontainers/runtime-spec v1.0.3-0.20200929063507-e6143ca7d51d/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.1.0-rc.1/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs= github.com/opencontainers/runtime-tools v0.9.1-0.20221107090550-2e043c6bd626/go.mod h1:BRHJJd0E+cx42OybVYSgUvZmU0B8P9gZuRXlZUP7TKI= github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqiriPsEqVhc+svHE= @@ -636,8 +658,6 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3 github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/opencontainers/selinux v1.9.1/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/opencontainers/selinux v1.11.0/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec= -github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf3phss= -github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= @@ -737,15 +757,19 @@ github.com/urfave/cli v1.19.1/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8= +github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI= +github.com/vbatts/tar-split v0.11.3/go.mod h1:9QlHN18E+fEH7RdG+QAJJcuya3rqT7eXSTY7wGrAokY= github.com/vektah/gqlparser/v2 v2.4.5/go.mod h1:flJWIR04IMQPGz+BXLrORkrARBxv/rtyIAFvd/MceW0= github.com/veraison/go-cose v1.0.0-rc.1/go.mod h1:7ziE85vSq4ScFTg6wyoMXjucIGOf4JkFEZi/an96Ct4= +github.com/veraison/go-cose v1.2.0/go.mod h1:7ziE85vSq4ScFTg6wyoMXjucIGOf4JkFEZi/an96Ct4= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= @@ -761,6 +785,7 @@ go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= @@ -782,6 +807,7 @@ go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0= go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0/go.mod h1:vsh3ySueQCiKPxFLvjWC4Z135gIa34TQ/NSqkDTZYUM= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= @@ -823,6 +849,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= @@ -850,12 +877,14 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0 golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA= golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -948,7 +977,6 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -999,11 +1027,14 @@ google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02Oq google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8= google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5/go.mod h1:5DZzOUPCLYL3mNkQ0ms0F3EuUNZ7py1Bqeq6sxzI7/Q= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I= @@ -1022,6 +1053,7 @@ google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGO google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= @@ -1086,6 +1118,7 @@ k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8 k8s.io/gengo v0.0.0-20201113003025-83324d819ded/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo v0.0.0-20211129171323-c02415ce4185/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70/go.mod h1:VH3AT8AaQOqiGjMF9p0/IM1Dj+82ZwjfxUP1IxaHE+8= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.9.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= From baf9663e1998a92a57c8f9a2cffdf381c1693131 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Mon, 15 Jul 2024 15:45:59 +0800 Subject: [PATCH 7/8] Correct lock scope --- coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index 28668300d..c2b48ba9f 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -219,12 +219,13 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { if err != nil { return v1alpha2.NewCOAError(err, "failed to unmarshal event", v1alpha2.InternalError) } - if err := msg.Handler(msg.Topic, evt); err != nil { + err = msg.Handler(msg.Topic, evt) + i.TopicLock[msg.Topic].Lock() + defer i.TopicLock[msg.Topic].Unlock() + if err != nil { delete(i.ClaimedMessages, msg.MessageID) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError) } - i.TopicLock[msg.Topic].Lock() - defer i.TopicLock[msg.Topic].Unlock() i.Client.XAck(i.Ctx, msg.Topic, RedisGroup, msg.MessageID) delete(i.ClaimedMessages, msg.MessageID) From 0ee4781fa442420dbb470803a47e898668dbdc92 Mon Sep 17 00:00:00 2001 From: Xingdong Date: Tue, 16 Jul 2024 13:50:45 +0800 Subject: [PATCH 8/8] Make TopicLock thread safe --- .../v1alpha2/providers/pubsub/redis/redis.go | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go index c2b48ba9f..8498be7e3 100644 --- a/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go +++ b/coa/pkg/apis/v1alpha2/providers/pubsub/redis/redis.go @@ -36,6 +36,7 @@ type RedisPubSubProvider struct { Context *contexts.ManagerContext ClaimedMessages map[string]bool TopicLock map[string]*sync.Mutex + MapLock *sync.Mutex } type RedisMessageWrapper struct { @@ -174,6 +175,7 @@ func (i *RedisPubSubProvider) Init(config providers.IProviderConfig) error { i.Ctx, i.Cancel = context.WithCancel(context.Background()) i.ClaimedMessages = make(map[string]bool) + i.MapLock = &sync.Mutex{} i.TopicLock = make(map[string]*sync.Mutex) i.Subscribers = make(map[string][]v1alpha2.EventHandler) @@ -220,8 +222,9 @@ func (i *RedisPubSubProvider) processMessage(msg RedisMessageWrapper) error { return v1alpha2.NewCOAError(err, "failed to unmarshal event", v1alpha2.InternalError) } err = msg.Handler(msg.Topic, evt) - i.TopicLock[msg.Topic].Lock() - defer i.TopicLock[msg.Topic].Unlock() + lock := i.getTopicLock(msg.Topic) + lock.Lock() + defer lock.Unlock() if err != nil { delete(i.ClaimedMessages, msg.MessageID) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to handle message %s", msg.MessageID), v1alpha2.InternalError) @@ -250,7 +253,6 @@ func (i *RedisPubSubProvider) Subscribe(topic string, handler v1alpha2.EventHand mLog.Errorf(" P (Redis PubSub) : failed to subscribe %v", err) return v1alpha2.NewCOAError(err, fmt.Sprintf("failed to subsceribe to topic %s", topic), v1alpha2.InternalError) } - i.TopicLock[topic] = &sync.Mutex{} go i.pollNewMessagesLoop(topic, handler) go i.ClaimMessageLoop(topic, i.Config.ConsumerID, handler, PendingMessagesScanInterval, ExtendMessageOwnershipWithIdleTime) if i.Config.MultiInstance { @@ -348,8 +350,9 @@ func (i *RedisPubSubProvider) reclaimPendingMessages(topic string, idleTime time } } func (i *RedisPubSubProvider) XClaimWrapper(topic string, minIdle time.Duration, consumer string, msgIDs []string, handler v1alpha2.EventHandler) { - i.TopicLock[topic].Lock() - defer i.TopicLock[topic].Unlock() + lock := i.getTopicLock(topic) + lock.Lock() + defer lock.Unlock() claimResult, err := i.Client.XClaim(i.Ctx, &redis.XClaimArgs{ Stream: topic, Group: RedisGroup, @@ -398,3 +401,12 @@ func toRedisPubSubProviderConfig(config providers.IProviderConfig) (RedisPubSubP func generateConsumerIDSuffix() string { return fmt.Sprintf("%d", time.Now().UnixNano()) } + +func (i *RedisPubSubProvider) getTopicLock(topic string) *sync.Mutex { + i.MapLock.Lock() + defer i.MapLock.Unlock() + if _, ok := i.TopicLock[topic]; !ok { + i.TopicLock[topic] = &sync.Mutex{} + } + return i.TopicLock[topic] +}