diff --git a/deepfence_agent/plugins/YaraHunter b/deepfence_agent/plugins/YaraHunter index ada213db30..796ff20d62 160000 --- a/deepfence_agent/plugins/YaraHunter +++ b/deepfence_agent/plugins/YaraHunter @@ -1 +1 @@ -Subproject commit ada213db3045ee12c23d178d0bb9bd2cd7591c0b +Subproject commit 796ff20d62c2350dbd9994954761e2dd72ed6dea diff --git a/deepfence_server/go.mod b/deepfence_server/go.mod index 3841981b15..707b8392bb 100644 --- a/deepfence_server/go.mod +++ b/deepfence_server/go.mod @@ -89,6 +89,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hibiken/asynq v0.24.1 // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -122,8 +123,10 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/xid v1.5.0 // indirect github.com/segmentio/asm v1.2.0 // indirect + github.com/spf13/cast v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/swaggest/jsonschema-go v0.3.57 // indirect github.com/swaggest/refl v1.2.0 // indirect diff --git a/deepfence_server/go.sum b/deepfence_server/go.sum index 2131d4bb0a..c465564768 100644 --- a/deepfence_server/go.sum +++ b/deepfence_server/go.sum @@ -24,7 +24,9 @@ github.com/aws/aws-sdk-go v1.44.325/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8 github.com/bool64/dev v0.2.29 h1:x+syGyh+0eWtOzQ1ItvLzOGIWyNWnyjXpHIcpF2HvL4= github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= +github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= +github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/casbin/casbin/v2 v2.75.0 h1:vSgtloFgyijYrFAoMKH1u9vdT7R55WDU74hJDDWT+5w= github.com/casbin/casbin/v2 v2.75.0/go.mod h1:mzGx0hYW9/ksOSpw3wNjk3NRAroq5VMFYUQ6G43iGPk= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= @@ -164,6 +166,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hibiken/asynq v0.24.1 h1:+5iIEAyA9K/lcSPvx3qoPtsKJeKI5u9aOIvUmSsazEw= +github.com/hibiken/asynq v0.24.1/go.mod h1:u5qVeSbrnfT+vtG5Mq8ZPzQu/BmCKMHvTGb91uy9Tts= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= @@ -281,10 +285,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/riandyrn/otelchi v0.5.1 h1:0/45omeqpP7f/cvdL16GddQBfAEmZvUyl2QzLSE6uYo= github.com/riandyrn/otelchi v0.5.1/go.mod h1:ZxVxNEl+jQ9uHseRYIxKWRb3OY8YXFEu+EkNiiSNUEA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -304,6 +311,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -353,6 +362,7 @@ github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCO github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/contrib v1.0.0 h1:khwDCxdSspjOLmFnvMuSHd/5rPzbTx0+l6aURwtQdfE= go.opentelemetry.io/contrib v1.0.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM= @@ -374,6 +384,8 @@ go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKu go.opentelemetry.io/otel/trace v1.6.1/go.mod h1:RkFRM1m0puWIq10oxImnGEduNBzxiN7TXluRBtE+5j0= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -386,8 +398,10 @@ golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= @@ -402,6 +416,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -419,6 +434,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -433,8 +449,10 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -470,14 +488,17 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= @@ -500,6 +521,7 @@ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cn google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/deepfence_server/handler/cloud_node.go b/deepfence_server/handler/cloud_node.go index 26c6893524..3af5ac436c 100644 --- a/deepfence_server/handler/cloud_node.go +++ b/deepfence_server/handler/cloud_node.go @@ -7,9 +7,6 @@ import ( "net/http" "net/http/httputil" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/deepfence/ThreatMapper/deepfence_server/model" reporters_scan "github.com/deepfence/ThreatMapper/deepfence_server/reporters/scan" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -229,20 +226,9 @@ func (h *Handler) extractCloudNodeDetails(w http.ResponseWriter, r *http.Request } func (h *Handler) CachePostureProviders(ctx context.Context) error { - msg := message.NewMessage(watermill.NewUUID(), []byte{}) - namespace, err := directory.ExtractNamespace(ctx) + worker, err := directory.Worker(ctx) if err != nil { - log.Error().Msgf("cannot extract namespace:", err) return err } - msg.Metadata = map[string]string{directory.NamespaceKey: string(namespace)} - msg.SetContext(directory.NewContextWithNameSpace(namespace)) - middleware.SetCorrelationID(watermill.NewShortUUID(), msg) - - err = h.TasksPublisher.Publish(utils.CachePostureProviders, msg) - if err != nil { - log.Error().Msgf("cannot publish message:", err) - return err - } - return nil + return worker.Enqueue(utils.CachePostureProviders, []byte{}) } diff --git a/deepfence_server/handler/container_registry.go b/deepfence_server/handler/container_registry.go index 8f90e2d7aa..be379076d9 100644 --- a/deepfence_server/handler/container_registry.go +++ b/deepfence_server/handler/container_registry.go @@ -9,9 +9,6 @@ import ( "strconv" "strings" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" api_messages "github.com/deepfence/ThreatMapper/deepfence_server/constants/api-messages" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/constants" @@ -692,23 +689,19 @@ func (h *Handler) SyncRegistry(rCtx context.Context, pgID int32) error { PgID: pgID, }) if err != nil { - log.Error().Msgf("cannot marshal payload:", err) + log.Error().Msgf("cannot marshal payload: %v", err) return err } - msg := message.NewMessage(watermill.NewUUID(), payload) - namespace, err := directory.ExtractNamespace(rCtx) + worker, err := directory.Worker(rCtx) if err != nil { - log.Error().Msgf("cannot extract namespace:", err) + log.Error().Msgf("cannot extract namespace: %v", err) return err } - msg.Metadata = map[string]string{directory.NamespaceKey: string(namespace)} - msg.SetContext(directory.NewContextWithNameSpace(namespace)) - middleware.SetCorrelationID(watermill.NewShortUUID(), msg) - err = h.TasksPublisher.Publish(utils.SyncRegistryTask, msg) + err = worker.Enqueue(utils.SyncRegistryTask, payload) if err != nil { - log.Error().Msgf("cannot publish message:", err) + log.Error().Msgf("cannot publish message: %v", err) return err } return nil diff --git a/deepfence_server/handler/custom_handler.go b/deepfence_server/handler/custom_handler.go index 1a9b9cdf58..643b486b27 100644 --- a/deepfence_server/handler/custom_handler.go +++ b/deepfence_server/handler/custom_handler.go @@ -1,7 +1,6 @@ package handler import ( - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/casbin/casbin/v2" "github.com/deepfence/ThreatMapper/deepfence_server/apiDocs" consolediagnosis "github.com/deepfence/ThreatMapper/deepfence_server/diagnosis/console-diagnosis" @@ -19,6 +18,5 @@ type Handler struct { Validator *validator.Validate Translator ut.Translator IngestChan chan *kgo.Record - TasksPublisher *kafka.Publisher ConsoleDiagnosis consolediagnosis.ConsoleDiagnosisHandler } diff --git a/deepfence_server/handler/export_reports.go b/deepfence_server/handler/export_reports.go index 525117cfd2..b3fe0c37cf 100644 --- a/deepfence_server/handler/export_reports.go +++ b/deepfence_server/handler/export_reports.go @@ -6,9 +6,6 @@ import ( "sort" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/deepfence/ThreatMapper/deepfence_server/ingesters" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -263,7 +260,7 @@ func (h *Handler) GenerateReport(w http.ResponseWriter, r *http.Request) { Filters: req.Filters, } - namespace, err := directory.ExtractNamespace(r.Context()) + worker, err := directory.Worker(r.Context()) if err != nil { log.Error().Msg(err.Error()) h.respondError(err, w) @@ -320,16 +317,7 @@ func (h *Handler) GenerateReport(w http.ResponseWriter, r *http.Request) { return } - // create a task message - msg := message.NewMessage(watermill.NewUUID(), payload) - msg.Metadata = map[string]string{ - directory.NamespaceKey: string(namespace), - "report_type": req.ReportType, - } - msg.SetContext(directory.NewContextWithNameSpace(namespace)) - middleware.SetCorrelationID(watermill.NewShortUUID(), msg) - - err = h.TasksPublisher.Publish(utils.ReportGeneratorTask, msg) + err = worker.Enqueue(utils.ReportGeneratorTask, payload) if err != nil { log.Error().Msgf("failed to publish task: %+v", err) h.respondError(err, w) diff --git a/deepfence_server/handler/scan_reports.go b/deepfence_server/handler/scan_reports.go index 2c0c910a55..15940325ac 100644 --- a/deepfence_server/handler/scan_reports.go +++ b/deepfence_server/handler/scan_reports.go @@ -14,9 +14,6 @@ import ( "strings" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/deepfence/ThreatMapper/deepfence_server/ingesters" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/reporters" @@ -668,18 +665,14 @@ func (h *Handler) IngestSbomHandler(w http.ResponseWriter, r *http.Request) { return } - msg := message.NewMessage(watermill.NewUUID(), payload) - namespace, err := directory.ExtractNamespace(r.Context()) + worker, err := directory.Worker(r.Context()) if err != nil { log.Error().Msg(err.Error()) h.respondError(err, w) return } - msg.Metadata = map[string]string{directory.NamespaceKey: string(namespace)} - msg.SetContext(directory.NewContextWithNameSpace(namespace)) - middleware.SetCorrelationID(watermill.NewShortUUID(), msg) - err = h.TasksPublisher.Publish(utils.ScanSBOMTask, msg) + err = worker.Enqueue(utils.ScanSBOMTask, payload) if err != nil { log.Error().Msgf("cannot publish message:", err) h.respondError(err, w) diff --git a/deepfence_server/main.go b/deepfence_server/main.go index ac1c7b84d9..f290a3667b 100644 --- a/deepfence_server/main.go +++ b/deepfence_server/main.go @@ -13,8 +13,6 @@ import ( "strings" "sync" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/deepfence/ThreatMapper/deepfence_server/apiDocs" consolediagnosis "github.com/deepfence/ThreatMapper/deepfence_server/diagnosis/console-diagnosis" "github.com/deepfence/ThreatMapper/deepfence_server/handler" @@ -162,30 +160,15 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) go utils.StartKafkaProducer(ctx, strings.Split(kafkaBrokers, ","), ingestC) - wml := watermill.NewStdLogger(false, false) - - // task publisher - publisher, err := kafka.NewPublisher( - kafka.PublisherConfig{ - Brokers: strings.Split(kafkaBrokers, ","), - Marshaler: kafka.DefaultMarshaler{}, - }, - wml, - ) - if err != nil { - panic(err) - } - defer publisher.Close() - err = router.SetupRoutes(mux, - config.HttpListenEndpoint, *serveOpenapiDocs, ingestC, publisher, openApiDocs, config.Orchestrator, + config.HttpListenEndpoint, *serveOpenapiDocs, ingestC, openApiDocs, config.Orchestrator, ) if err != nil { log.Error().Msg(err.Error()) return } - err = router.InternalRoutes(internalMux, ingestC, publisher) + err = router.InternalRoutes(internalMux, ingestC) if err != nil { log.Error().Msg(err.Error()) return diff --git a/deepfence_server/router/internal.go b/deepfence_server/router/internal.go index 962af03359..2f3f06cae4 100644 --- a/deepfence_server/router/internal.go +++ b/deepfence_server/router/internal.go @@ -1,14 +1,13 @@ package router import ( - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/deepfence/ThreatMapper/deepfence_server/handler" "github.com/go-chi/chi/v5" "github.com/go-playground/validator/v10" "github.com/twmb/franz-go/pkg/kgo" ) -func InternalRoutes(r *chi.Mux, ingestC chan *kgo.Record, taskPublisher *kafka.Publisher) error { +func InternalRoutes(r *chi.Mux, ingestC chan *kgo.Record) error { // authorization authEnforcer, err := newAuthorizationHandler() if err != nil { @@ -20,7 +19,6 @@ func InternalRoutes(r *chi.Mux, ingestC chan *kgo.Record, taskPublisher *kafka.P SaasDeployment: IsSaasDeployment(), Validator: validator.New(), IngestChan: ingestC, - TasksPublisher: taskPublisher, } r.Route("/deepfence/internal", func(r chi.Router) { diff --git a/deepfence_server/router/router.go b/deepfence_server/router/router.go index 281239d906..278ebb7ac7 100644 --- a/deepfence_server/router/router.go +++ b/deepfence_server/router/router.go @@ -7,7 +7,6 @@ import ( "os" "strings" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/casbin/casbin/v2" "github.com/deepfence/ThreatMapper/deepfence_server/apiDocs" consolediagnosis "github.com/deepfence/ThreatMapper/deepfence_server/diagnosis/console-diagnosis" @@ -100,8 +99,7 @@ func getJWTAuthSignKey() (string, error) { } } -func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC chan *kgo.Record, - taskPublisher *kafka.Publisher, openApiDocs *apiDocs.OpenApiDocs, orchestrator string) error { +func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC chan *kgo.Record, openApiDocs *apiDocs.OpenApiDocs, orchestrator string) error { var tokenAuth *jwtauth.JWTAuth @@ -135,7 +133,6 @@ func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC c Validator: apiValidator, Translator: translator, IngestChan: ingestC, - TasksPublisher: taskPublisher, ConsoleDiagnosis: consoleDiagnosis, } diff --git a/deepfence_utils/directory/utils.go b/deepfence_utils/directory/utils.go index 6154be76e4..c7bf24abe5 100644 --- a/deepfence_utils/directory/utils.go +++ b/deepfence_utils/directory/utils.go @@ -9,7 +9,7 @@ import ( "github.com/redis/go-redis/v9" ) -func getClient[T *redis.Client | *CypherDriver | *postgresqlDb.Queries | *minio.Client](ctx context.Context, pool *sync.Map, newClient func(DBConfigs) (T, error)) (T, error) { +func getClient[T *redis.Client | *CypherDriver | *postgresqlDb.Queries | *minio.Client | *asyncq_clients](ctx context.Context, pool *sync.Map, newClient func(DBConfigs) (T, error)) (T, error) { key, err := ExtractNamespace(ctx) if err != nil { return nil, err diff --git a/deepfence_utils/directory/worker.go b/deepfence_utils/directory/worker.go new file mode 100644 index 0000000000..b76692af9d --- /dev/null +++ b/deepfence_utils/directory/worker.go @@ -0,0 +1,77 @@ +package directory + +import ( + "context" + "errors" + "sync" + + "github.com/hibiken/asynq" +) + +const ( + max_size = 500 * 1024 * 1024 // 500 MB +) + +var ErrExhaustedResources = errors.New("Exhausted worker resources") + +type asyncq_clients struct { + client *asynq.Client + inspector *asynq.Inspector +} + +type WorkEnqueuer struct { + clients asyncq_clients +} + +var worker_clients_pool sync.Map + +func init() { + worker_clients_pool = sync.Map{} +} + +func new_asynq_client(endpoints DBConfigs) (*asyncq_clients, error) { + if endpoints.Redis == nil { + return nil, errors.New("No defined Redis config") + } + redisCfg := asynq.RedisClientOpt{Addr: endpoints.Redis.Endpoint} + return &asyncq_clients{ + client: asynq.NewClient(redisCfg), + inspector: asynq.NewInspector(redisCfg), + }, nil +} + +func (ws WorkEnqueuer) Enqueue(task_enum string, data []byte) error { + + client := ws.clients.client + inspector := ws.clients.inspector + + qs, err := inspector.Queues() + if err != nil { + return err + } + size := 0 + for _, q := range qs { + res, err := inspector.GetQueueInfo(q) + if err != nil { + continue + } + size += res.Size + } + + if size >= max_size { + return ErrExhaustedResources + } + + _, err = client.Enqueue(asynq.NewTask(task_enum, data)) + + return err +} + +func Worker(ctx context.Context) (WorkEnqueuer, error) { + client, err := getClient(ctx, &worker_clients_pool, new_asynq_client) + if err != nil { + return WorkEnqueuer{}, err + } + + return WorkEnqueuer{clients: *client}, err +} diff --git a/deepfence_worker/controls/controls.go b/deepfence_worker/controls/controls.go index c8d0c49f6f..fb7068484c 100644 --- a/deepfence_worker/controls/controls.go +++ b/deepfence_worker/controls/controls.go @@ -5,29 +5,27 @@ import ( "fmt" "sync" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" sdkUtils "github.com/deepfence/ThreatMapper/deepfence_utils/utils" - "github.com/deepfence/ThreatMapper/deepfence_worker/utils" ) -var controls map[ctl.ActionID]func(namespace string, req []byte) error +var controls map[ctl.ActionID]func(namespace directory.NamespaceID, req []byte) error var controls_guard sync.RWMutex func RegisterControl[T ctl.StartVulnerabilityScanRequest | ctl.StartSecretScanRequest | ctl.StartComplianceScanRequest | ctl.StartMalwareScanRequest | ctl.StartAgentUpgradeRequest | ctl.StopSecretScanRequest | ctl.StopMalwareScanRequest | ctl.StopVulnerabilityScanRequest](id ctl.ActionID, - callback func(namespace string, req T) error) error { + callback func(namespace directory.NamespaceID, req T) error) error { controls_guard.Lock() defer controls_guard.Unlock() if controls[id] != nil { return fmt.Errorf("action %v already registered", id) } - controls[id] = func(namespace string, req []byte) error { + controls[id] = func(namespace directory.NamespaceID, req []byte) error { var typedReq T err := json.Unmarshal(req, &typedReq) if err != nil { @@ -41,7 +39,7 @@ func RegisterControl[T ctl.StartVulnerabilityScanRequest | ctl.StartSecretScanRe return nil } -func ApplyControl(namespace string, req ctl.Action) error { +func ApplyControl(namespace directory.NamespaceID, req ctl.Action) error { controls_guard.RLock() defer controls_guard.RUnlock() log.Info().Msgf("apply control req: %+v", req) @@ -49,51 +47,45 @@ func ApplyControl(namespace string, req ctl.Action) error { } func init() { - controls = map[ctl.ActionID]func(namespace string, req []byte) error{} + controls = map[ctl.ActionID]func(namespace directory.NamespaceID, req []byte) error{} } -func ConsoleActionSetup(pub *kafka.Publisher) error { +func ConsoleActionSetup() error { // for vulnerability scan err := RegisterControl(ctl.StartVulnerabilityScan, - GetRegisterControlFunc[ctl.StartVulnerabilityScanRequest](pub, - sdkUtils.GenerateSBOMTask)) + GetRegisterControlFunc[ctl.StartVulnerabilityScanRequest](sdkUtils.GenerateSBOMTask)) if err != nil { return err } // for secret scan err = RegisterControl(ctl.StartSecretScan, - GetRegisterControlFunc[ctl.StartSecretScanRequest](pub, - sdkUtils.SecretScanTask)) + GetRegisterControlFunc[ctl.StartSecretScanRequest](sdkUtils.SecretScanTask)) if err != nil { return err } // for malware scan err = RegisterControl(ctl.StartMalwareScan, - GetRegisterControlFunc[ctl.StartMalwareScanRequest](pub, - sdkUtils.MalwareScanTask)) + GetRegisterControlFunc[ctl.StartMalwareScanRequest](sdkUtils.MalwareScanTask)) if err != nil { return err } err = RegisterControl(ctl.StopSecretScan, - GetRegisterControlFunc[ctl.StopSecretScanRequest](pub, - sdkUtils.StopSecretScanTask)) + GetRegisterControlFunc[ctl.StopSecretScanRequest](sdkUtils.StopSecretScanTask)) if err != nil { return err } err = RegisterControl(ctl.StopMalwareScan, - GetRegisterControlFunc[ctl.StopMalwareScanRequest](pub, - sdkUtils.StopMalwareScanTask)) + GetRegisterControlFunc[ctl.StopMalwareScanRequest](sdkUtils.StopMalwareScanTask)) if err != nil { return err } err = RegisterControl(ctl.StopVulnerabilityScan, - GetRegisterControlFunc[ctl.StopVulnerabilityScanRequest](pub, - sdkUtils.StopVulnerabilityScanTask)) + GetRegisterControlFunc[ctl.StopVulnerabilityScanRequest](sdkUtils.StopVulnerabilityScanTask)) if err != nil { return err } @@ -104,11 +96,10 @@ func ConsoleActionSetup(pub *kafka.Publisher) error { func GetRegisterControlFunc[T ctl.StartVulnerabilityScanRequest | ctl.StartSecretScanRequest | ctl.StartComplianceScanRequest | ctl.StartMalwareScanRequest | ctl.StopSecretScanRequest | ctl.StopMalwareScanRequest | - ctl.StopVulnerabilityScanRequest](pub *kafka.Publisher, - task string) func(namespace string, req T) error { + ctl.StopVulnerabilityScanRequest]( + task string) func(namespace directory.NamespaceID, req T) error { - controlFunc := func(namespace string, req T) error { - metadata := map[string]string{directory.NamespaceKey: namespace} + controlFunc := func(namespace directory.NamespaceID, req T) error { BinArgs := ctl.GetBinArgs(req) log.Info().Msgf("%s payload: %+v", task, BinArgs) data, err := json.Marshal(BinArgs) @@ -116,7 +107,12 @@ func GetRegisterControlFunc[T ctl.StartVulnerabilityScanRequest | ctl.StartSecre log.Error().Msg(err.Error()) return err } - if err := utils.PublishNewJob(pub, metadata, task, data); err != nil { + worker, err := directory.Worker(directory.NewContextWithNameSpace(namespace)) + if err != nil { + log.Error().Msg(err.Error()) + return err + } + if err := worker.Enqueue(task, data); err != nil { log.Error().Msg(err.Error()) return err } diff --git a/deepfence_worker/cronjobs/agent.go b/deepfence_worker/cronjobs/agent.go index d99153da01..db4fcb0d76 100644 --- a/deepfence_worker/cronjobs/agent.go +++ b/deepfence_worker/cronjobs/agent.go @@ -11,10 +11,9 @@ import ( "strings" "time" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/hibiken/asynq" m "github.com/minio/minio-go/v7" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) @@ -37,10 +36,7 @@ func getVersionMetadata(url string, result *[]map[string]interface{}) error { return nil } -func CheckAgentUpgrade(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func CheckAgentUpgrade(ctx context.Context, task *asynq.Task) error { log.Info().Msg("Start agent version check") res := []map[string]interface{}{} @@ -53,9 +49,6 @@ func CheckAgentUpgrade(msg *message.Message) error { } } - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) - tags_with_urls, err := prepareAgentReleases(ctx, tags_to_ingest) if err != nil { log.Error().Msgf("Prepare agent releases: %v", err) diff --git a/deepfence_worker/cronjobs/cloud_compliance.go b/deepfence_worker/cronjobs/cloud_compliance.go index bc065d0ad4..bc6a69acf5 100644 --- a/deepfence_worker/cronjobs/cloud_compliance.go +++ b/deepfence_worker/cronjobs/cloud_compliance.go @@ -1,18 +1,19 @@ package cronjobs import ( + "context" "encoding/json" "fmt" "os" "strings" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/constants" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/hibiken/asynq" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) @@ -45,13 +46,8 @@ type Control struct { Executable bool `json:"executable"` } -func AddCloudControls(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func AddCloudControls(ctx context.Context, task *asynq.Task) error { log.Info().Msgf("Starting Cloud Compliance Population") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { log.Error().Msgf(err.Error()) @@ -187,13 +183,8 @@ func AddCloudControls(msg *message.Message) error { return tx.Commit() } -func CachePostureProviders(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func CachePostureProviders(ctx context.Context, task *asynq.Task) error { log.Info().Msgf("Caching Posture Providers") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) driver, err := directory.Neo4jClient(ctx) if err != nil { return err diff --git a/deepfence_worker/cronjobs/console.go b/deepfence_worker/cronjobs/console.go index d5dadb2682..b609dab613 100644 --- a/deepfence_worker/cronjobs/console.go +++ b/deepfence_worker/cronjobs/console.go @@ -1,12 +1,14 @@ package cronjobs import ( - "github.com/ThreeDotsLabs/watermill/message" + "context" + "github.com/deepfence/ThreatMapper/deepfence_server/controls" utils_ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" ctl "github.com/deepfence/ThreatMapper/deepfence_worker/controls" + "github.com/hibiken/asynq" ) const ( @@ -22,15 +24,9 @@ var ( While this functon is a cron job, it is running on the worker's address space Hence Allocator can be shared across tasks */ -func TriggerConsoleControls(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func TriggerConsoleControls(ctx context.Context, t *asynq.Task) error { log.Debug().Msgf("Trigger console actions #capacity: %v", ScanWorkloadAllocator.MaxAllocable()) - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) - actions, errs := controls.GetAgentActions(ctx, ConsoleAgentId, int(ScanWorkloadAllocator.MaxAllocable())) for _, e := range errs { if e != nil { @@ -41,10 +37,14 @@ func TriggerConsoleControls(msg *message.Message) error { ScanWorkloadAllocator.Reserve(int32(len(actions))) log.Debug().Msgf("Trigger console actions #actions: %d", len(actions)) + ns, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } for _, action := range actions { log.Info().Msgf("Init execute: %v", action.ID) - err := ctl.ApplyControl(namespace, action) + err := ctl.ApplyControl(ns, action) if err != nil { log.Error().Msgf("Control %v failed: %v", action, err) } diff --git a/deepfence_worker/cronjobs/file_server.go b/deepfence_worker/cronjobs/file_server.go index a0951b0d96..62bc074913 100644 --- a/deepfence_worker/cronjobs/file_server.go +++ b/deepfence_worker/cronjobs/file_server.go @@ -1,19 +1,17 @@ package cronjobs import ( + "context" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_server/diagnosis" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/hibiken/asynq" "github.com/minio/minio-go/v7" ) -func CleanUpDiagnosisLogs(msg *message.Message) error { - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) - +func CleanUpDiagnosisLogs(ctx context.Context, task *asynq.Task) error { mc, err := directory.MinioClient(ctx) if err != nil { return err diff --git a/deepfence_worker/cronjobs/neo4j.go b/deepfence_worker/cronjobs/neo4j.go index de9a9b8344..d99baa73c4 100644 --- a/deepfence_worker/cronjobs/neo4j.go +++ b/deepfence_worker/cronjobs/neo4j.go @@ -5,7 +5,7 @@ import ( "sync/atomic" "time" - "github.com/ThreeDotsLabs/watermill/message" + "github.com/hibiken/asynq" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -66,10 +66,7 @@ func getPushBackValue(session neo4j.Session) int32 { var cleanUpRunning = atomic.Bool{} -func CleanUpDB(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func CleanUpDB(ctx context.Context, task *asynq.Task) error { if cleanUpRunning.Swap(true) { return nil } @@ -77,8 +74,6 @@ func CleanUpDB(msg *message.Message) error { log.Info().Msgf("Clean up DB Starting") defer log.Info().Msgf("Clean up DB Done") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) dbScannedResourceCleanUpTimeout := getResourceCleanUpTimeout(ctx) @@ -385,10 +380,7 @@ func CleanUpDB(msg *message.Message) error { var linkCloudResourcesRunning = atomic.Bool{} -func LinkCloudResources(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func LinkCloudResources(ctx context.Context, task *asynq.Task) error { if linkCloudResourcesRunning.Swap(true) { return nil } @@ -396,8 +388,6 @@ func LinkCloudResources(msg *message.Message) error { log.Info().Msgf("Link CR Starting") defer log.Info().Msgf("Link CR Done") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { @@ -536,10 +526,7 @@ func LinkCloudResources(msg *message.Message) error { var linkNodesRunning = atomic.Bool{} -func LinkNodes(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func LinkNodes(ctx context.Context, task *asynq.Task) error { if linkNodesRunning.Swap(true) { return nil } @@ -547,8 +534,6 @@ func LinkNodes(msg *message.Message) error { log.Info().Msgf("Link Nodes Starting") defer log.Info().Msgf("Link Nodes Done") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { @@ -604,14 +589,10 @@ func LinkNodes(msg *message.Message) error { return nil } -func RetryScansDB(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) +func RetryScansDB(ctx context.Context, task *asynq.Task) error { log.Info().Msgf("Retry scan DB Starting") defer log.Info().Msgf("Retry scan DB Done") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { return err @@ -661,14 +642,9 @@ func RetryScansDB(msg *message.Message) error { return tx.Commit() } -func RetryUpgradeAgent(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func RetryUpgradeAgent(ctx context.Context, task *asynq.Task) error { log.Info().Msgf("Retry upgrade DB Starting") defer log.Info().Msgf("Retry upgrade DB Done") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { return err diff --git a/deepfence_worker/cronjobs/notification.go b/deepfence_worker/cronjobs/notification.go index 6a4e30f22f..a685eed3a7 100644 --- a/deepfence_worker/cronjobs/notification.go +++ b/deepfence_worker/cronjobs/notification.go @@ -1,6 +1,7 @@ package cronjobs import ( + "context" "database/sql" "encoding/json" "errors" @@ -8,7 +9,6 @@ import ( "sync" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/integration" "github.com/deepfence/ThreatMapper/deepfence_server/reporters" @@ -17,6 +17,7 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_utils/log" postgresql_db "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/hibiken/asynq" ) var fieldsMap = map[string]map[string]string{utils.ScanTypeDetectedNode[utils.NEO4J_VULNERABILITY_SCAN]: { @@ -82,17 +83,12 @@ var fieldsMap = map[string]map[string]string{utils.ScanTypeDetectedNode[utils.NE var notificationLock sync.Mutex -func SendNotifications(msg *message.Message) error { +func SendNotifications(ctx context.Context, task *asynq.Task) error { //This lock is to ensure only one notification handler runs at a time notificationLock.Lock() defer notificationLock.Unlock() - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - - log.Info().Msgf("SendNotifications task starting at %s", string(msg.Payload)) - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) + log.Info().Msgf("SendNotifications task starting at %s", string(task.Payload())) pgClient, err := directory.PostgresClient(ctx) if err != nil { return nil @@ -110,7 +106,7 @@ func SendNotifications(msg *message.Message) error { log.Info().Msgf("Processing integration for %s rowId: %d", integration.IntegrationType, integration.ID) - err := processIntegrationRow(integration, msg) + err := processIntegrationRow(integration, ctx, task) log.Info().Msgf("Processed integration for %s rowId: %d", integration.IntegrationType, integration.ID) @@ -141,23 +137,23 @@ func SendNotifications(msg *message.Message) error { }(integrationRow) } wg.Wait() - log.Info().Msgf("SendNotifications task ended for timestamp %s", string(msg.Payload)) + log.Info().Msgf("SendNotifications task ended for timestamp %s", string(task.Payload())) return nil } -func processIntegrationRow(integrationRow postgresql_db.Integration, msg *message.Message) error { +func processIntegrationRow(integrationRow postgresql_db.Integration, ctx context.Context, task *asynq.Task) error { switch integrationRow.Resource { case utils.ScanTypeDetectedNode[utils.NEO4J_VULNERABILITY_SCAN]: - return processIntegration[model.Vulnerability](msg, integrationRow) + return processIntegration[model.Vulnerability](ctx, task, integrationRow) case utils.ScanTypeDetectedNode[utils.NEO4J_SECRET_SCAN]: - return processIntegration[model.Secret](msg, integrationRow) + return processIntegration[model.Secret](ctx, task, integrationRow) case utils.ScanTypeDetectedNode[utils.NEO4J_MALWARE_SCAN]: - return processIntegration[model.Malware](msg, integrationRow) + return processIntegration[model.Malware](ctx, task, integrationRow) case utils.ScanTypeDetectedNode[utils.NEO4J_COMPLIANCE_SCAN]: - err1 := processIntegration[model.Compliance](msg, integrationRow) + err1 := processIntegration[model.Compliance](ctx, task, integrationRow) // cloud compliance scans integrationRow.Resource = utils.ScanTypeDetectedNode[utils.NEO4J_CLOUD_COMPLIANCE_SCAN] - err2 := processIntegration[model.CloudCompliance](msg, integrationRow) + err2 := processIntegration[model.CloudCompliance](ctx, task, integrationRow) return errors.Join(err1, err2) } return errors.New("No integration type") @@ -199,18 +195,16 @@ func injectNodeDatamap(results []map[string]interface{}, common model.ScanResult return results } -func processIntegration[T any](msg *message.Message, integrationRow postgresql_db.Integration) error { +func processIntegration[T any](ctx context.Context, task *asynq.Task, integrationRow postgresql_db.Integration) error { startTime := time.Now() var filters model.IntegrationFilters err := json.Unmarshal(integrationRow.Filters, &filters) if err != nil { return err } - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) // get ts from message - ts, err := strconv.ParseInt(string(msg.Payload), 10, 64) + ts, err := strconv.ParseInt(string(task.Payload()), 10, 64) if err != nil { return err } diff --git a/deepfence_worker/cronjobs/postgresql.go b/deepfence_worker/cronjobs/postgresql.go index f900511251..9419e2f69f 100644 --- a/deepfence_worker/cronjobs/postgresql.go +++ b/deepfence_worker/cronjobs/postgresql.go @@ -1,18 +1,15 @@ package cronjobs import ( - "github.com/ThreeDotsLabs/watermill/message" + "context" + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/hibiken/asynq" ) // CleanUpPostgresDB Delete expired user invites and password reset requests -func CleanUpPostgresDB(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) +func CleanUpPostgresDB(ctx context.Context, task *asynq.Task) error { pgClient, err := directory.PostgresClient(ctx) if err != nil { return err diff --git a/deepfence_worker/cronjobs/registry.go b/deepfence_worker/cronjobs/registry.go index ca779111ec..3bd38fbb77 100644 --- a/deepfence_worker/cronjobs/registry.go +++ b/deepfence_worker/cronjobs/registry.go @@ -1,21 +1,19 @@ package cronjobs import ( + "context" "encoding/json" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/registry" sync "github.com/deepfence/ThreatMapper/deepfence_server/pkg/registrysync" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" postgresql_db "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/hibiken/asynq" ) -func SyncRegistry(msg *message.Message) error { - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) - +func SyncRegistry(ctx context.Context, task *asynq.Task) error { pgClient, err := directory.PostgresClient(ctx) if err != nil { log.Error().Msgf("unable to get postgres client: %v", err) @@ -26,10 +24,10 @@ func SyncRegistry(msg *message.Message) error { rsp := utils.RegistrySyncParams{} - if msg.Payload != nil { - err = json.Unmarshal(msg.Payload, &rsp) + if task.Payload() != nil { + err = json.Unmarshal(task.Payload(), &rsp) if err != nil { - log.Warn().Msgf("unable to unmarshal payload: %v, error: %v syncing all registries...", msg.Payload, err) + log.Warn().Msgf("unable to unmarshal payload: %v, error: %v syncing all registries...", task.Payload(), err) registries, err = pgClient.GetContainerRegistries(ctx) if err != nil { log.Error().Msgf("unable to get registries: %v", err) diff --git a/deepfence_worker/cronjobs/reports.go b/deepfence_worker/cronjobs/reports.go index 4f94739085..65d86b2a83 100644 --- a/deepfence_worker/cronjobs/reports.go +++ b/deepfence_worker/cronjobs/reports.go @@ -4,23 +4,18 @@ import ( "context" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_worker/utils" + "github.com/hibiken/asynq" "github.com/minio/minio-go/v7" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) const minioReportsPrefix = "/report/" -func CleanUpReports(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) - +func CleanUpReports(ctx context.Context, task *asynq.Task) error { log.Info().Msg("Start reports cleanup") - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) mc, err := directory.MinioClient(ctx) if err != nil { diff --git a/deepfence_worker/cronjobs/scheduled_tasks.go b/deepfence_worker/cronjobs/scheduled_tasks.go index 08b27f0f0f..d885159329 100644 --- a/deepfence_worker/cronjobs/scheduled_tasks.go +++ b/deepfence_worker/cronjobs/scheduled_tasks.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_server/handler" "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/reporters" @@ -15,14 +14,12 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_utils/log" postgresqlDb "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/hibiken/asynq" ) -func RunScheduledTasks(msg *message.Message) error { - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) - +func RunScheduledTasks(ctx context.Context, task *asynq.Task) error { messagePayload := map[string]interface{}{} - if err := json.Unmarshal(msg.Payload, &messagePayload); err != nil { + if err := json.Unmarshal(task.Payload(), &messagePayload); err != nil { log.Error().Msg(err.Error()) return nil } diff --git a/deepfence_worker/cronjobs/threat.go b/deepfence_worker/cronjobs/threat.go index fe64c92766..7f9a4922c9 100644 --- a/deepfence_worker/cronjobs/threat.go +++ b/deepfence_worker/cronjobs/threat.go @@ -1,26 +1,22 @@ package cronjobs import ( + "context" "sync/atomic" "time" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/hibiken/asynq" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) var threatGraphRunning atomic.Bool var exploitabilityRunning atomic.Bool -func ComputeThreat(msg *message.Message) error { - topic := RecordOffsets(msg) - defer SetTopicHandlerStatus(topic, false) +func ComputeThreat(ctx context.Context, task *asynq.Task) error { - namespace := msg.Metadata.Get(directory.NamespaceKey) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(namespace)) nc, err := directory.Neo4jClient(ctx) if err != nil { return err diff --git a/deepfence_worker/cronjobs/utils.go b/deepfence_worker/cronjobs/utils.go deleted file mode 100644 index ac94d99ff0..0000000000 --- a/deepfence_worker/cronjobs/utils.go +++ /dev/null @@ -1,114 +0,0 @@ -package cronjobs - -import ( - "reflect" - "sync" - "unsafe" - - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/deepfence/ThreatMapper/deepfence_utils/log" -) - -var recordLock sync.Mutex - -type TopicDataEntry struct { - Data kafka.PartitionOffset - IsRunning bool -} - -var TopicData map[string]TopicDataEntry - -func GetTopicData() map[string]TopicDataEntry { - recordLock.Lock() - defer recordLock.Unlock() - retVal := make(map[string]TopicDataEntry) - for k, v := range TopicData { - retVal[k] = v - } - return retVal -} - -func SetTopicHandlerStatus(topic string, status bool) { - if len(topic) == 0 { - return - } - - recordLock.Lock() - defer recordLock.Unlock() - - if entry, ok := TopicData[topic]; ok { - entry.IsRunning = status - TopicData[topic] = entry - } -} - -func RecordOffsets(msg *message.Message) string { - if msg == nil { - return "" - } - - topic := message.SubscribeTopicFromCtx(msg.Context()) - if len(topic) == 0 { - log.Debug().Msgf("Failed to get the topic from message Context") - return "" - } - - partionId, ok1 := kafka.MessagePartitionFromCtx(msg.Context()) - partitionOffset, ok2 := kafka.MessagePartitionOffsetFromCtx(msg.Context()) - if !ok1 || !ok2 { - return "" - } - - ts, _ := kafka.MessageTimestampFromCtx(msg.Context()) - - recordLock.Lock() - defer recordLock.Unlock() - - entry, found := TopicData[topic] - - if !found { - entry = TopicDataEntry{} - entry.Data = make(map[int32]int64) - TopicData[topic] = entry - } - - entry.Data[partionId] = partitionOffset - entry.IsRunning = true - - TopicData[topic] = entry - - log.Debug().Msgf("RecordOffsets for %s , pid:%d, offset:%d, ts:%v", - topic, partionId, partitionOffset, ts) - - return topic -} - -// Utility function to print the contents of the context -// Used ONLY for DEBUGGING -func printContextInternals(ctx interface{}, inner bool) { - contextValues := reflect.ValueOf(ctx).Elem() - contextKeys := reflect.TypeOf(ctx).Elem() - - if !inner { - log.Info().Msgf("Fields for %s.%s", contextKeys.PkgPath(), contextKeys.Name()) - } - - if contextKeys.Kind() == reflect.Struct { - for i := 0; i < contextValues.NumField(); i++ { - reflectValue := contextValues.Field(i) - reflectValue = reflect.NewAt(reflectValue.Type(), unsafe.Pointer(reflectValue.UnsafeAddr())).Elem() - - reflectField := contextKeys.Field(i) - - if reflectField.Name == "Context" { - printContextInternals(reflectValue.Interface(), true) - } else { - log.Info().Msgf("field name: %+v", reflectField.Name) - log.Info().Msgf("field value: %+v", reflectValue.Interface()) - } - } - } else { - log.Info().Msgf("context is empty (int)") - } -} diff --git a/deepfence_worker/cronscheduler/scheduler.go b/deepfence_worker/cronscheduler/scheduler.go index c00cc18114..3e480221c5 100644 --- a/deepfence_worker/cronscheduler/scheduler.go +++ b/deepfence_worker/cronscheduler/scheduler.go @@ -9,13 +9,11 @@ import ( "sync" "time" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" postgresqlDb "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db" sdkUtils "github.com/deepfence/ThreatMapper/deepfence_utils/utils" "github.com/deepfence/ThreatMapper/deepfence_utils/vulnerability_db" - "github.com/deepfence/ThreatMapper/deepfence_worker/utils" "github.com/robfig/cron/v3" ) @@ -36,12 +34,11 @@ type Jobs struct { } type Scheduler struct { - cron *cron.Cron - tasksPublisher *kafka.Publisher - jobs Jobs + cron *cron.Cron + jobs Jobs } -func NewScheduler(tasksPublisher *kafka.Publisher) (*Scheduler, error) { +func NewScheduler() (*Scheduler, error) { logger := stdLogger.New(os.Stdout, "cron: ", stdLogger.LstdFlags) scheduler := &Scheduler{ cron: cron.New( @@ -49,7 +46,6 @@ func NewScheduler(tasksPublisher *kafka.Publisher) (*Scheduler, error) { cron.WithLocation(time.UTC), cron.WithLogger(cron.VerbosePrintfLogger(logger)), ), - tasksPublisher: tasksPublisher, jobs: Jobs{ CronJobs: make(map[directory.NamespaceID]CronJobs), ScheduledJobs: make(map[directory.NamespaceID]ScheduledJobs), @@ -331,7 +327,7 @@ func (s *Scheduler) enqueueScheduledTask(namespace directory.NamespaceID, schedu return func() { log.Info().Msgf("Enqueuing task: %s, %s for namespace %s", schedule.Description, schedule.CronExpr, namespace) - metadata := map[string]string{directory.NamespaceKey: string(namespace)} + //metadata := map[string]string{directory.NamespaceKey: string(namespace)} message := map[string]interface{}{ "action": schedule.Action, "id": schedule.ID, @@ -339,7 +335,8 @@ func (s *Scheduler) enqueueScheduledTask(namespace directory.NamespaceID, schedu "description": schedule.Description, } messageJson, _ := json.Marshal(message) - err := utils.PublishNewJob(s.tasksPublisher, metadata, sdkUtils.ScheduledTasks, messageJson) + worker, err := directory.Worker(directory.NewContextWithNameSpace(namespace)) + err = worker.Enqueue(sdkUtils.ScheduledTasks, messageJson) if err != nil { log.Error().Msg(err.Error()) } @@ -350,9 +347,12 @@ func (s *Scheduler) enqueueTask(namespace directory.NamespaceID, task string) fu log.Info().Msgf("Registering task: %s for namespace %s", task, namespace) return func() { log.Info().Msgf("Enqueuing task: %s for namespace %s", task, namespace) - metadata := map[string]string{directory.NamespaceKey: string(namespace)} - err := utils.PublishNewJob(s.tasksPublisher, metadata, task, - []byte(strconv.FormatInt(sdkUtils.GetTimestamp(), 10))) + worker, err := directory.Worker(directory.NewContextWithNameSpace(namespace)) + if err != nil { + log.Error().Msg(err.Error()) + return + } + err = worker.Enqueue(task, []byte(strconv.FormatInt(sdkUtils.GetTimestamp(), 10))) if err != nil { log.Error().Msg(err.Error()) } diff --git a/deepfence_worker/go.mod b/deepfence_worker/go.mod index b2419f8f88..c62b42ba20 100644 --- a/deepfence_worker/go.mod +++ b/deepfence_worker/go.mod @@ -32,6 +32,7 @@ require ( github.com/deepfence/YaraHunter v0.0.0-00010101000000-000000000000 github.com/deepfence/agent-plugins-grpc v1.1.0 github.com/deepfence/package-scanner v0.0.0-00010101000000-000000000000 + github.com/hibiken/asynq v0.24.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/minio/minio-go/v7 v7.0.58 github.com/neo4j/neo4j-go-driver/v4 v4.4.7 diff --git a/deepfence_worker/go.sum b/deepfence_worker/go.sum index ec36dc33bd..034b375adf 100644 --- a/deepfence_worker/go.sum +++ b/deepfence_worker/go.sum @@ -135,7 +135,9 @@ github.com/bool64/dev v0.2.29 h1:x+syGyh+0eWtOzQ1ItvLzOGIWyNWnyjXpHIcpF2HvL4= github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= +github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= +github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/casbin/casbin/v2 v2.75.0 h1:vSgtloFgyijYrFAoMKH1u9vdT7R55WDU74hJDDWT+5w= github.com/casbin/casbin/v2 v2.75.0/go.mod h1:mzGx0hYW9/ksOSpw3wNjk3NRAroq5VMFYUQ6G43iGPk= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= @@ -461,6 +463,8 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= +github.com/hibiken/asynq v0.24.1 h1:+5iIEAyA9K/lcSPvx3qoPtsKJeKI5u9aOIvUmSsazEw= +github.com/hibiken/asynq v0.24.1/go.mod h1:u5qVeSbrnfT+vtG5Mq8ZPzQu/BmCKMHvTGb91uy9Tts= github.com/hillu/go-yara/v4 v4.3.2 h1:HGqUN3ORUduWZbb95RQjut4UzavGDbtt/C6SnGB3Amk= github.com/hillu/go-yara/v4 v4.3.2/go.mod h1:AHEs/FXVMQKVVlT6iG9d+q1BRr0gq0WoAWZQaZ0gS7s= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -708,6 +712,7 @@ github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuR github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= @@ -884,6 +889,8 @@ go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZE go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/deepfence_worker/log.go b/deepfence_worker/log.go deleted file mode 100644 index fa062beac9..0000000000 --- a/deepfence_worker/log.go +++ /dev/null @@ -1,42 +0,0 @@ -package main - -import ( - "github.com/ThreeDotsLabs/watermill" - "github.com/deepfence/ThreatMapper/deepfence_utils/log" -) - -type zerologWaterMillAdapter struct { - fields map[string]interface{} - debug, trace bool -} - -func NewZerologWaterMillAdapter(debug, trace bool) *zerologWaterMillAdapter { - return &zerologWaterMillAdapter{ - fields: map[string]interface{}{}, - debug: debug, - trace: trace, - } -} - -func (zerologWaterMillAdapter) Error(msg string, err error, fields watermill.LogFields) { - log.Error().CallerSkipFrame(2).Fields(fields).Err(err).Msg(msg) -} -func (zerologWaterMillAdapter) Info(msg string, fields watermill.LogFields) { - log.Info().CallerSkipFrame(2).Fields(fields).Msg(msg) -} -func (z zerologWaterMillAdapter) Debug(msg string, fields watermill.LogFields) { - if z.debug { - log.Debug().CallerSkipFrame(2).Fields(fields).Msg(msg) - } -} -func (z zerologWaterMillAdapter) Trace(msg string, fields watermill.LogFields) { - if z.trace { - log.Trace().CallerSkipFrame(2).Fields(fields).Msg(msg) - } -} -func (z zerologWaterMillAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter { - for k, v := range fields { - z.fields[k] = v - } - return z -} diff --git a/deepfence_worker/main.go b/deepfence_worker/main.go index c2a1f5ad1e..2f0d31cd0a 100644 --- a/deepfence_worker/main.go +++ b/deepfence_worker/main.go @@ -4,8 +4,6 @@ import ( "os" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" "github.com/deepfence/ThreatMapper/deepfence_worker/controls" @@ -33,6 +31,7 @@ type config struct { KafkaTopicReplicas int16 `default:"1" split_words:"true"` KafkaTopicRetentionMs string `default:"86400000" split_words:"true"` KafkaTopicPartitionsTasks int32 `default:"3" split_words:"true"` + RedisAddr string `default:"deepfence-redis:6379"` } // build info @@ -49,7 +48,6 @@ func main() { var cfg config var err error - var wml watermill.LoggerAdapter err = envconfig.Process("DEEPFENCE", &cfg) if err != nil { log.Fatal().Msg(err.Error()) @@ -59,10 +57,8 @@ func main() { if cfg.Debug { log.Initialize(zerolog.LevelDebugValue) - wml = NewZerologWaterMillAdapter(true, false) } else { log.Initialize(zerolog.LevelInfoValue) - wml = NewZerologWaterMillAdapter(false, false) } // check connection to kafka broker @@ -86,20 +82,6 @@ func main() { }() } - // task publisher - tasksPublisher, err := kafka.NewPublisher( - kafka.PublisherConfig{ - Brokers: cfg.KafkaBrokers, - Marshaler: kafka.DefaultMarshaler{}, - }, - wml, - ) - if err != nil { - log.Error().Msg(err.Error()) - return - } - defer tasksPublisher.Close() - switch cfg.Mode { case "ingester": log.Info().Msg("Starting ingester") @@ -109,11 +91,11 @@ func main() { } case "worker": log.Info().Msg("Starting worker") - if err := controls.ConsoleActionSetup(tasksPublisher); err != nil { + if err := controls.ConsoleActionSetup(); err != nil { log.Error().Msg(err.Error()) return } - err := startWorker(wml, cfg) + err := startWorker(cfg) if err != nil { log.Error().Msg(err.Error()) return @@ -122,7 +104,7 @@ func main() { log.Info().Msg("Starting scheduler") go cs.InitMinioDatabase() time.Sleep(10 * time.Second) - scheduler, err := cs.NewScheduler(tasksPublisher) + scheduler, err := cs.NewScheduler() if err != nil { log.Error().Msg(err.Error()) return diff --git a/deepfence_worker/tasks/malwarescan/malwarescan.go b/deepfence_worker/tasks/malwarescan/malwarescan.go index 4561d96c7a..484f81ef23 100644 --- a/deepfence_worker/tasks/malwarescan/malwarescan.go +++ b/deepfence_worker/tasks/malwarescan/malwarescan.go @@ -8,8 +8,8 @@ import ( "os/exec" "sync" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/YaraHunter/pkg/output" + "github.com/hibiken/asynq" "github.com/deepfence/ThreatMapper/deepfence_worker/cronjobs" workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" @@ -49,12 +49,12 @@ func NewMalwareScanner(ingest chan *kgo.Record) MalwareScan { return MalwareScan{ingestC: ingest} } -func (s MalwareScan) StopMalwareScan(msg *message.Message) error { +func (s MalwareScan) StopMalwareScan(ctx context.Context, task *asynq.Task) error { var params utils.MalwareScanParameters - log.Info().Msgf("StopMalwareScan, uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("StopMalwareScan, payload: %s ", string(task.Payload())) - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msgf("StopMalwareScan, error in Unmarshal: %s", err.Error()) return nil } @@ -75,10 +75,13 @@ func (s MalwareScan) StopMalwareScan(msg *message.Message) error { } -func (s MalwareScan) StartMalwareScan(msg *message.Message) error { +func (s MalwareScan) StartMalwareScan(ctx context.Context, task *asynq.Task) error { defer cronjobs.ScanWorkloadAllocator.Free() - tenantID := msg.Metadata.Get(directory.NamespaceKey) + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } if len(tenantID) == 0 { log.Error().Msg("tenant-id/namespace is empty") return nil @@ -89,12 +92,11 @@ func (s MalwareScan) StartMalwareScan(msg *message.Message) error { {Key: "namespace", Value: []byte(tenantID)}, } - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(tenantID)) - log.Info().Msgf("uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf(" payload: %s ", string(task.Payload())) var params utils.MalwareScanParameters - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msg(err.Error()) return nil } diff --git a/deepfence_worker/tasks/reports/reports.go b/deepfence_worker/tasks/reports/reports.go index 11a9fbdd0f..9929a8afeb 100644 --- a/deepfence_worker/tasks/reports/reports.go +++ b/deepfence_worker/tasks/reports/reports.go @@ -10,11 +10,11 @@ import ( "strings" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" sdkUtils "github.com/deepfence/ThreatMapper/deepfence_utils/utils" "github.com/deepfence/ThreatMapper/deepfence_worker/utils" + "github.com/hibiken/asynq" "github.com/minio/minio-go/v7" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) @@ -58,25 +58,26 @@ func generateReport(ctx context.Context, params sdkUtils.ReportParams) (string, return "", ErrUnknownReportType } -func GenerateReport(msg *message.Message) error { +func GenerateReport(ctx context.Context, task *asynq.Task) error { var params sdkUtils.ReportParams - tenantID := msg.Metadata.Get(directory.NamespaceKey) + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } if len(tenantID) == 0 { log.Error().Msg("tenant-id/namespace is empty") return errors.New("tenant-id/namespace is empty") } log.Info().Msgf("message tenant id %s", string(tenantID)) - log.Info().Msgf("uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("payload: %s ", string(task.Payload())) - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { - log.Error().Err(err).Msgf("error decoding report request payload %s", string(msg.Payload)) + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { + log.Error().Err(err).Msgf("error decoding report request payload %s", string(task.Payload())) } - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(tenantID)) - client, err := directory.Neo4jClient(ctx) if err != nil { log.Error().Msg(err.Error()) diff --git a/deepfence_worker/tasks/sbom/generate_sbom.go b/deepfence_worker/tasks/sbom/generate_sbom.go index 8b63193727..8b5fd65be2 100644 --- a/deepfence_worker/tasks/sbom/generate_sbom.go +++ b/deepfence_worker/tasks/sbom/generate_sbom.go @@ -10,9 +10,6 @@ import ( "strings" "sync" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" @@ -20,6 +17,7 @@ import ( workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" "github.com/deepfence/package-scanner/sbom/syft" psUtils "github.com/deepfence/package-scanner/utils" + "github.com/hibiken/asynq" "github.com/minio/minio-go/v7" "github.com/twmb/franz-go/pkg/kgo" ) @@ -37,10 +35,10 @@ func NewSbomGenerator(ingest chan *kgo.Record) SbomGenerator { return SbomGenerator{ingestC: ingest} } -func StopVulnerabilityScan(msg *message.Message) error { - log.Info().Msgf("StopVulnerabilityScan, uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) +func StopVulnerabilityScan(ctx context.Context, task *asynq.Task) error { + log.Info().Msgf("StopVulnerabilityScan, payload: %s ", string(task.Payload())) var params utils.SbomParameters - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msgf("StopVulnerabilityScan, error in Unmarshal: %s", err.Error()) return nil } @@ -60,15 +58,18 @@ func StopVulnerabilityScan(msg *message.Message) error { return nil } -func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, error) { +func (s SbomGenerator) GenerateSbom(ctx context.Context, task *asynq.Task) error { defer cronjobs.ScanWorkloadAllocator.Free() var params utils.SbomParameters - tenantID := msg.Metadata.Get(directory.NamespaceKey) + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } if len(tenantID) == 0 { log.Error().Msg("tenant-id/namespace is empty") - return nil, directory.ErrNamespaceNotFound + return directory.ErrNamespaceNotFound } log.Info().Msgf("message tenant id %s", string(tenantID)) @@ -76,21 +77,24 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e {Key: "namespace", Value: []byte(tenantID)}, } - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(tenantID)) + worker, err := directory.Worker(ctx) + if err != nil { + return err + } - log.Info().Msgf("uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("payload: %s ", string(task.Payload())) - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msg(err.Error()) SendScanStatus(s.ingestC, NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil), rh) - return nil, nil + return nil } if params.RegistryId == "" { log.Error().Msgf("registry id is empty in params %+v", params) SendScanStatus(s.ingestC, NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, "registry id is empty in params", nil), rh) - return nil, nil + return nil } statusChan := make(chan SbomScanStatus) @@ -107,7 +111,7 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e if err != nil { log.Error().Msg(err.Error()) statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil) - return nil, nil + return nil } log.Info().Msgf("Adding scanid to map:%s", params.ScanId) @@ -169,7 +173,7 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e log.Error().Msg(err.Error()) statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil) } - return nil, nil + return nil } gzpb64Sbom := bytes.Buffer{} @@ -178,7 +182,7 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e if err != nil { log.Error().Msg(err.Error()) statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil) - return nil, nil + return nil } gzipwriter.Close() @@ -187,7 +191,7 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e if err != nil { log.Error().Msg(err.Error()) statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil) - return nil, nil + return nil } sbomFile := path.Join("/sbom/", utils.ScanIdReplacer.Replace(params.ScanId)+".json.gz") @@ -223,7 +227,7 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e if logError == true { log.Error().Msg(err.Error()) statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil) - return nil, nil + return nil } } @@ -238,12 +242,13 @@ func (s SbomGenerator) GenerateSbom(msg *message.Message) ([]*message.Message, e payload, err := json.Marshal(params) if err != nil { log.Error().Msg(err.Error()) - return nil, nil + return nil } - scanMsg := message.NewMessage(watermill.NewUUID(), payload) - scanMsg.Metadata = map[string]string{directory.NamespaceKey: tenantID} - middleware.SetCorrelationID(watermill.NewShortUUID(), scanMsg) + err = worker.Enqueue(utils.ScanSBOMTask, payload) + if err != nil { + return err + } - return []*message.Message{scanMsg}, nil + return nil } diff --git a/deepfence_worker/tasks/sbom/scan_sbom.go b/deepfence_worker/tasks/sbom/scan_sbom.go index 482ccc73a0..13c68cf75b 100644 --- a/deepfence_worker/tasks/sbom/scan_sbom.go +++ b/deepfence_worker/tasks/sbom/scan_sbom.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/ThreeDotsLabs/watermill/message" "github.com/anchore/syft/syft/formats" "github.com/anchore/syft/syft/sbom" "github.com/deepfence/ThreatMapper/deepfence_server/model" @@ -23,6 +22,7 @@ import ( ps "github.com/deepfence/package-scanner/scanner" "github.com/deepfence/package-scanner/scanner/grype" psUtils "github.com/deepfence/package-scanner/utils" + "github.com/hibiken/asynq" "github.com/minio/minio-go/v7" "github.com/twmb/franz-go/pkg/kgo" ) @@ -82,9 +82,12 @@ func (b UnzippedFile) Close() error { return b.file.Close() } -func (s SbomParser) ScanSBOM(msg *message.Message) error { +func (s SbomParser) ScanSBOM(ctx context.Context, task *asynq.Task) error { - tenantID := msg.Metadata.Get(directory.NamespaceKey) + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } if len(tenantID) == 0 { log.Error().Msg("tenant-id/namespace is empty") return directory.ErrNamespaceNotFound @@ -96,11 +99,11 @@ func (s SbomParser) ScanSBOM(msg *message.Message) error { {Key: "namespace", Value: []byte(tenantID)}, } - log.Info().Msgf("uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("payload: %s ", string(task.Payload())) var params utils.SbomParameters - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msg(err.Error()) SendScanStatus(s.ingestC, NewSbomScanStatus(params, utils.SCAN_STATUS_FAILED, err.Error(), nil), rh) return nil @@ -115,8 +118,6 @@ func (s SbomParser) ScanSBOM(msg *message.Message) error { // send inprogress status statusChan <- NewSbomScanStatus(params, utils.SCAN_STATUS_INPROGRESS, "", nil) - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(tenantID)) - mc, err := directory.MinioClient(ctx) if err != nil { log.Error().Msg(err.Error()) diff --git a/deepfence_worker/tasks/secretscan/secretscan.go b/deepfence_worker/tasks/secretscan/secretscan.go index 551b30ae4e..2b230586e1 100644 --- a/deepfence_worker/tasks/secretscan/secretscan.go +++ b/deepfence_worker/tasks/secretscan/secretscan.go @@ -8,7 +8,6 @@ import ( "os/exec" "sync" - "github.com/ThreeDotsLabs/watermill/message" "github.com/deepfence/SecretScanner/core" "github.com/deepfence/SecretScanner/output" "github.com/deepfence/SecretScanner/scan" @@ -20,6 +19,7 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_worker/cronjobs" workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" pb "github.com/deepfence/agent-plugins-grpc/srcgo" + "github.com/hibiken/asynq" "github.com/twmb/franz-go/pkg/kgo" ) @@ -37,12 +37,12 @@ func NewSecretScanner(ingest chan *kgo.Record) SecretScan { return SecretScan{ingestC: ingest} } -func (s SecretScan) StopSecretScan(msg *message.Message) error { +func (s SecretScan) StopSecretScan(ctx context.Context, task *asynq.Task) error { var params utils.SecretScanParameters - log.Info().Msgf("StopSecretScan, uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("StopSecretScan, payload: %s ", string(task.Payload())) - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msgf("StopSecretScan, error in Unmarshal: %s", err.Error()) return nil } @@ -62,10 +62,13 @@ func (s SecretScan) StopSecretScan(msg *message.Message) error { return nil } -func (s SecretScan) StartSecretScan(msg *message.Message) error { +func (s SecretScan) StartSecretScan(ctx context.Context, task *asynq.Task) error { defer cronjobs.ScanWorkloadAllocator.Free() - tenantID := msg.Metadata.Get(directory.NamespaceKey) + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } if len(tenantID) == 0 { log.Error().Msg("tenant-id/namespace is empty") return nil @@ -76,12 +79,11 @@ func (s SecretScan) StartSecretScan(msg *message.Message) error { {Key: "namespace", Value: []byte(tenantID)}, } - ctx := directory.NewContextWithNameSpace(directory.NamespaceID(tenantID)) - log.Info().Msgf("uuid: %s payload: %s ", msg.UUID, string(msg.Payload)) + log.Info().Msgf("payload: %s ", string(task.Payload())) var params utils.SecretScanParameters - if err := json.Unmarshal(msg.Payload, ¶ms); err != nil { + if err := json.Unmarshal(task.Payload(), ¶ms); err != nil { log.Error().Msg(err.Error()) return nil } diff --git a/deepfence_worker/utils/watermill.go b/deepfence_worker/utils/asynq.go similarity index 69% rename from deepfence_worker/utils/watermill.go rename to deepfence_worker/utils/asynq.go index be6d9191a2..41c045a24a 100644 --- a/deepfence_worker/utils/watermill.go +++ b/deepfence_worker/utils/asynq.go @@ -7,14 +7,15 @@ import ( "context" "time" + "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/hibiken/asynq" "github.com/pkg/errors" "github.com/cenkalti/backoff/v3" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" ) +type WorkerHandler func(ctx context.Context, t *asynq.Task) error + // RecoveredPanicError holds the recovered panic's error along with the stacktrace. type RecoveredPanicError struct { V interface{} @@ -27,20 +28,17 @@ func (p RecoveredPanicError) Error() string { // Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace // to any error returned from the handler. -func Recoverer(h message.HandlerFunc) message.HandlerFunc { - return func(event *message.Message) (events []*message.Message, err error) { - +func Recoverer(h asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) (err error) { defer func() { if r := recover(); r != nil { err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())}) - // ack message as we don't want to execute panic message again - event.Ack() } }() - events, err = h(event) - return events, err - } + err = h.ProcessTask(ctx, task) + return err + }) } // Retry provides a middleware that retries the handler if errors are returned. @@ -64,16 +62,14 @@ type Retry struct { // OnRetryHook is an optional function that will be executed on each retry attempt. // The number of the current retry is passed as retryNum, OnRetryHook func(retryNum int, delay time.Duration) - - Logger watermill.LoggerAdapter } // Middleware returns the Retry middleware. -func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc { - return func(msg *message.Message) ([]*message.Message, error) { - producedMessages, err := h(msg) +func (r Retry) Middleware(h asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { + err := h.ProcessTask(ctx, task) if err == nil { - return producedMessages, nil + return nil } expBackoff := backoff.NewExponentialBackOff() @@ -83,7 +79,6 @@ func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc { expBackoff.MaxElapsedTime = r.MaxElapsedTime expBackoff.RandomizationFactor = r.RandomizationFactor - ctx := msg.Context() if r.MaxElapsedTime > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime) @@ -97,39 +92,33 @@ func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc { waitTime := expBackoff.NextBackOff() select { case <-ctx.Done(): - return producedMessages, err + return err case <-time.After(waitTime): // go on } - producedMessages, err = h(msg) + err = h.ProcessTask(ctx, task) if err == nil { - return producedMessages, nil + return nil } - if r.Logger != nil { - r.Logger.Error("Error occurred, retrying", err, watermill.LogFields{ - "retry_no": retryNum, - "max_retries": r.MaxRetries, - "wait_time": waitTime, - "elapsed_time": expBackoff.GetElapsedTime(), - }) - } + log.Warn().Msgf("Error occurred, retrying %v: %v", err, map[string]interface{}{ + "retry_no": retryNum, + "max_retries": r.MaxRetries, + "wait_time": waitTime, + "elapsed_time": expBackoff.GetElapsedTime(), + }) if r.OnRetryHook != nil { r.OnRetryHook(retryNum, waitTime) } retryNum++ if retryNum > r.MaxRetries { - if r.Logger != nil { - r.Logger.Error("Error Max retries reached", err, watermill.LogFields{"msg_uuid": msg.UUID}) - } - // ack the message don't want to execute already retried message - msg.Ack() + log.Error().Msgf("Error Max retries reached %v", err) break retryLoop } } - return nil, err - } + return err + }) } diff --git a/deepfence_worker/utils/utils.go b/deepfence_worker/utils/utils.go index 1f1733e16b..f6ffb5c257 100644 --- a/deepfence_worker/utils/utils.go +++ b/deepfence_worker/utils/utils.go @@ -9,10 +9,6 @@ import ( "strings" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/deepfence/ThreatMapper/deepfence_server/reporters" ) @@ -39,18 +35,6 @@ var httpReplacer = strings.NewReplacer( "https://", "", ) -func PublishNewJob(pub *kafka.Publisher, metadata map[string]string, topic string, data []byte) error { - msg := message.NewMessage(watermill.NewUUID(), data) - msg.Metadata = metadata - middleware.SetCorrelationID(watermill.NewShortUUID(), msg) - - err := pub.Publish(topic, msg) - if err != nil { - return err - } - return nil -} - func RunCommand(cmd *exec.Cmd) (*bytes.Buffer, error) { var out bytes.Buffer var stderr bytes.Buffer diff --git a/deepfence_worker/worker.go b/deepfence_worker/worker.go index ab50190a8a..2643b8b916 100644 --- a/deepfence_worker/worker.go +++ b/deepfence_worker/worker.go @@ -7,11 +7,7 @@ import ( "syscall" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/telemetry" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" @@ -21,41 +17,33 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_worker/tasks/sbom" "github.com/deepfence/ThreatMapper/deepfence_worker/tasks/secretscan" workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" + "github.com/hibiken/asynq" "github.com/twmb/franz-go/pkg/kgo" ) type worker struct { - wml watermill.LoggerAdapter - cfg config - mux *message.Router + cfg config + mux *asynq.ServeMux + srv *asynq.Server + namespace directory.NamespaceID } -type NoPublisherTask struct { - Task string - TaskCallback func(*message.Message) error - Handler *message.Handler - Subscriber message.Subscriber - InactiveCounter int -} - -// For thread safety, Below map should only be accessed: -// - From the main() during the initial startup -// - From the pollHandlers() during runtime -var HandlerMap map[string]*NoPublisherTask - -func NewWorker(wml watermill.LoggerAdapter, cfg config, mux *message.Router) worker { - return worker{wml: wml, cfg: cfg, mux: mux} +func NewWorker(namespace directory.NamespaceID, srv *asynq.Server, cfg config, mux *asynq.ServeMux) worker { + return worker{srv: srv, cfg: cfg, mux: mux, namespace: namespace} } func (w *worker) Run(ctx context.Context) error { - return w.mux.Run(ctx) + if err := w.srv.Run(w.mux); err != nil { + log.Fatal().Msgf("could not run server: %v", err) + } + return nil } -func telemetryCallbackWrapper(task string, taskCallback func(*message.Message) error) func(*message.Message) error { - return func(m *message.Message) error { +func telemetryCallbackWrapper(task string, taskCallback workerUtils.WorkerHandler) workerUtils.WorkerHandler { + return func(ctx context.Context, t *asynq.Task) error { span := telemetry.NewSpan(context.Background(), "workerjobs", task) defer span.End() - err := taskCallback(m) + err := taskCallback(ctx, t) if err != nil { span.EndWithErr(err) } @@ -63,163 +51,25 @@ func telemetryCallbackWrapper(task string, taskCallback func(*message.Message) e } } -func (w *worker) AddNoPublisherHandler(task string, - taskCallback func(*message.Message) error, - shouldPoll bool) error { - - subscriber, err := subscribe(task, w.cfg.KafkaBrokers, w.wml) - if err != nil { - return err - } - hdlr := w.mux.AddNoPublisherHandler( - task, - task, - subscriber, - telemetryCallbackWrapper(task, taskCallback), - ) - if shouldPoll { - HandlerMap[task] = &NoPublisherTask{task, taskCallback, hdlr, subscriber, 0} +func contextInjectorCallbackWrapper(namespace directory.NamespaceID, taskCallback workerUtils.WorkerHandler) workerUtils.WorkerHandler { + return func(ctx context.Context, t *asynq.Task) error { + ctx = context.WithValue(ctx, directory.NamespaceKey, namespace) + return taskCallback(ctx, t) } - - return nil } func (w *worker) AddHandler( task string, - taskCallback func(msg *message.Message) ([]*message.Message, error), - receiverTask string, - publisher *kafka.Publisher, + taskCallback workerUtils.WorkerHandler, ) error { - subscriber, err := subscribe(task, w.cfg.KafkaBrokers, w.wml) - if err != nil { - return err - } - w.mux.AddHandler( - task, + w.mux.HandleFunc( task, - subscriber, - receiverTask, - publisher, - taskCallback, + contextInjectorCallbackWrapper(w.namespace, telemetryCallbackWrapper(task, taskCallback)), ) return nil } -func subscribe(consumerGroup string, brokers []string, logger watermill.LoggerAdapter) (message.Subscriber, error) { - - subscriberConf := kafka.DefaultSaramaSubscriberConfig() - subscriberConf.Consumer.Offsets.AutoCommit.Enable = true - - subscriberConf.Consumer.Group.Session.Timeout = 20 * time.Second - subscriberConf.Consumer.Group.Heartbeat.Interval = 6 * time.Second - subscriberConf.Consumer.MaxProcessingTime = 500 * time.Millisecond - - sub, err := kafka.NewSubscriber( - kafka.SubscriberConfig{ - Brokers: brokers, - Unmarshaler: kafka.DefaultMarshaler{}, - ConsumerGroup: consumerGroup, - OverwriteSaramaConfig: subscriberConf, - }, - logger, - ) - if err != nil { - return nil, err - } - return sub, nil -} - -// Routine to poll the liveliness of the Handlers for topics regsitered with -// HandlerMap -func (w *worker) pollHandlers() { - ticker := time.NewTicker(30 * time.Second) - flag := true - threshold := 30 - for { - select { - case <-ticker.C: - cronjobData := cronjobs.GetTopicData() - var resetTopicList []*NoPublisherTask - for topic, task := range HandlerMap { - var sub *kafka.Subscriber - sub = task.Subscriber.(*kafka.Subscriber) - svrOffset, err := sub.PartitionOffset(task.Task) - if err != nil { - log.Info().Msgf("PartitionOffset error: %v", err) - continue - } - - entry, found := cronjobData[topic] - if !found { - continue - } - - msgOffset := entry.Data - maxDelta := int64(1) - inactiveFlag := false - for id, _ := range svrOffset { - if _, ok := msgOffset[id]; ok { - delta := svrOffset[id] - msgOffset[id] - if delta > maxDelta { - inactiveFlag = true - break - } - } - } - - inactiveFlag = inactiveFlag && (!entry.IsRunning) - if inactiveFlag == true { - task.InactiveCounter++ - log.Info().Msgf("Increasing InactiveCounter for topic: %s, counter: %d", - topic, task.InactiveCounter) - } else { - task.InactiveCounter = 0 - } - - if task.InactiveCounter < threshold { - continue - } - - if flag == true { - resetTopicList = append(resetTopicList, task) - } - } - - for _, task := range resetTopicList { - log.Info().Msgf("Initiating restart of inactive handler for topic: %s", task.Task) - task.Handler.Stop() - //This select is to make sure the handler has actually stopped - select { - case _, ok := <-task.Handler.Stopped(): - if !ok { - log.Info().Msgf("Successfully stopped handler for topic: %s", task.Task) - break - } - } - - //Below check is required as the handler is supposed to be stopped and - //cleaned up from the list when we this channel is closed. - //But actully the channel is first closed and than the handler is removed from the routers list - //and that could result in a condition where we might start the new handler while the old - //one is not yet removed from the routers list - for true { - snapshot := w.mux.Handlers() - if _, ok := snapshot[task.Task]; !ok { - log.Info().Msgf("Successfully deleted handler from router for topic: %s", task.Task) - break - } - time.Sleep(1 * time.Second) - } - - w.AddNoPublisherHandler(task.Task, task.TaskCallback, true) - w.mux.RunHandlers(context.Background()) - log.Info().Msgf("Restarted handler for topic: %s", task.Task) - } - } - } -} - -func startWorker(wml watermill.LoggerAdapter, cfg config) error { +func startWorker(cfg config) error { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -238,28 +88,19 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { ingestC := make(chan *kgo.Record, 10000) go utils.StartKafkaProducer(ctx, cfg.KafkaBrokers, ingestC) - // task publisher - publisher, err := kafka.NewPublisher( - kafka.PublisherConfig{ - Brokers: cfg.KafkaBrokers, - Marshaler: kafka.DefaultMarshaler{}, + mux := asynq.NewServeMux() + + srv := asynq.NewServer( + asynq.RedisClientOpt{Addr: cfg.RedisAddr}, + asynq.Config{ + Concurrency: 10, + Queues: map[string]int{ + "critical": 6, + "default": 3, + "low": 1, + }, }, - wml, ) - if err != nil { - cancel() - return err - } - defer publisher.Close() - - // task router - mux, err := message.NewRouter(message.RouterConfig{}, wml) - if err != nil { - cancel() - return err - } - - mux.AddPlugin(plugin.SignalsHandler) retry := workerUtils.Retry{ MaxRetries: 3, @@ -271,84 +112,69 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { OnRetryHook: func(retryNum int, delay time.Duration) { log.Info().Msgf("retry=%d delay=%s", retryNum, delay) }, - Logger: wml, } - mux.AddMiddleware( - middleware.CorrelationID, - middleware.NewThrottle(20, time.Second).Middleware, + mux.Use( retry.Middleware, workerUtils.Recoverer, ) - HandlerMap = make(map[string]*NoPublisherTask) - - cronjobs.TopicData = make(map[string]cronjobs.TopicDataEntry) - - worker := NewWorker(wml, cfg, mux) + worker := NewWorker(directory.NonSaaSDirKey, srv, cfg, mux) // sbom - worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false) - - worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom, - utils.ScanSBOMTask, publisher) + worker.AddHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM) - worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true) + worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom) - worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true) + worker.AddHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB) - worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true) + worker.AddHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat) - worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false) + worker.AddHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB) - worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true) + worker.AddHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent) - worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false) + worker.AddHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB) - worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true) + worker.AddHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs) - worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true) + worker.AddHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade) - worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false) + worker.AddHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls) - worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false) + worker.AddHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks) - worker.AddNoPublisherHandler(utils.SecretScanTask, - secretscan.NewSecretScanner(ingestC).StartSecretScan, false) + worker.AddHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry) - worker.AddNoPublisherHandler(utils.StopSecretScanTask, - secretscan.NewSecretScanner(ingestC).StopSecretScan, false) + worker.AddHandler(utils.SecretScanTask, + secretscan.NewSecretScanner(ingestC).StartSecretScan) - worker.AddNoPublisherHandler(utils.MalwareScanTask, - malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false) + worker.AddHandler(utils.StopSecretScanTask, + secretscan.NewSecretScanner(ingestC).StopSecretScan) - worker.AddNoPublisherHandler(utils.StopMalwareScanTask, - malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false) + worker.AddHandler(utils.MalwareScanTask, + malwarescan.NewMalwareScanner(ingestC).StartMalwareScan) - worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true) + worker.AddHandler(utils.StopMalwareScanTask, + malwarescan.NewMalwareScanner(ingestC).StopMalwareScan) - worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true) + worker.AddHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls) - worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true) + worker.AddHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders) - worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false) + worker.AddHandler(utils.SendNotificationTask, cronjobs.SendNotifications) - worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true) + worker.AddHandler(utils.ReportGeneratorTask, reports.GenerateReport) - worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true) + worker.AddHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports) - worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true) + worker.AddHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources) - worker.AddNoPublisherHandler(utils.StopVulnerabilityScanTask, - sbom.StopVulnerabilityScan, false) + worker.AddHandler(utils.LinkNodesTask, cronjobs.LinkNodes) - go worker.pollHandlers() + worker.AddHandler(utils.StopVulnerabilityScanTask, sbom.StopVulnerabilityScan) - log.Info().Msg("Starting the consumer") - if err = worker.Run(context.Background()); err != nil { - cancel() - return err - } - cancel() - return nil + log.Info().Msg("Starting the worker") + err = worker.Run(context.Background()) + return err }