diff --git a/boilerplate/flyte/golang_support_tools/go.mod b/boilerplate/flyte/golang_support_tools/go.mod index dbf94f411..feab0b2d5 100644 --- a/boilerplate/flyte/golang_support_tools/go.mod +++ b/boilerplate/flyte/golang_support_tools/go.mod @@ -8,6 +8,7 @@ require ( github.com/flyteorg/flytestdlib v0.4.16 github.com/golangci/golangci-lint v1.38.0 github.com/pseudomuto/protoc-gen-doc v1.4.1 + golang.org/x/tools v0.11.0 // indirect ) require ( @@ -163,16 +164,14 @@ require ( github.com/ultraware/whitespace v0.0.4 // indirect github.com/uudashr/gocognit v1.0.1 // indirect go.opencensus.io v0.22.6 // indirect - golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect + golang.org/x/crypto v0.11.0 // indirect golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect - golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/net v0.12.0 // indirect golang.org/x/oauth2 v0.0.0-20210126194326-f9ce19ea3013 // indirect - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect - golang.org/x/tools v0.1.10 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/api v0.38.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210126160654-44e461bb6506 // indirect diff --git a/boilerplate/flyte/golang_support_tools/go.sum b/boilerplate/flyte/golang_support_tools/go.sum index 02895fb57..111e7adb0 100644 --- a/boilerplate/flyte/golang_support_tools/go.sum +++ b/boilerplate/flyte/golang_support_tools/go.sum @@ -808,6 +808,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= @@ -842,8 +843,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -878,8 +880,11 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -928,8 +933,12 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -952,6 +961,10 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1011,10 +1024,19 @@ golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1022,8 +1044,11 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1125,8 +1150,11 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8= +golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/cmd/controller/cmd/root.go b/cmd/controller/cmd/root.go index 2bebc269b..e99a230a6 100644 --- a/cmd/controller/cmd/root.go +++ b/cmd/controller/cmd/root.go @@ -19,6 +19,7 @@ import ( "github.com/flyteorg/flytestdlib/profutils" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/flyteorg/flytestdlib/telemetryutils" "github.com/flyteorg/flytestdlib/version" "github.com/spf13/cobra" @@ -119,6 +120,15 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { labeled.SetMetricKeys(keys...) } + // register opentelementry tracer providers + for _, serviceName := range []string{telemetryutils.AdminClientTracer, telemetryutils.BlobstoreClientTracer, + telemetryutils.DataCatalogClientTracer, telemetryutils.FlytePropellerTracer, telemetryutils.K8sClientTracer} { + if err := telemetryutils.RegisterTracerProvider(serviceName, telemetryutils.GetConfig()); err != nil { + logger.Errorf(ctx, "Failed to create telemetry tracer provider. %v", err) + return err + } + } + // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace) limitNamespace := "" diff --git a/events/admin_eventsink.go b/events/admin_eventsink.go index debf10bd5..8aadcf41c 100644 --- a/events/admin_eventsink.go +++ b/events/admin_eventsink.go @@ -13,8 +13,13 @@ import ( "github.com/flyteorg/flytestdlib/fastcheck" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/telemetryutils" "github.com/golang/protobuf/proto" "golang.org/x/time/rate" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + + "google.golang.org/grpc" ) type adminEventSink struct { @@ -126,7 +131,13 @@ func IDFromMessage(message proto.Message) ([]byte, error) { func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) { cfg := admin2.GetConfig(ctx) - clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx) + tracerProvider := telemetryutils.GetTracerProvider(telemetryutils.AdminClientTracer) + opt := grpc.WithUnaryInterceptor( + otelgrpc.UnaryClientInterceptor( + otelgrpc.WithTracerProvider(tracerProvider), + ), + ) + clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx) if err != nil { return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err) } diff --git a/go.mod b/go.mod index 3da1b2d77..06fd3aea0 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,8 @@ require ( github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 + go.opentelemetry.io/otel/trace v1.11.1 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e golang.org/x/sync v0.1.0 golang.org/x/time v0.1.0 @@ -77,6 +79,7 @@ require ( github.com/flyteorg/stow v0.3.6 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect github.com/go-openapi/swag v0.22.3 // indirect @@ -121,6 +124,10 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel v1.11.1 // indirect + go.opentelemetry.io/otel/exporters/jaeger v1.11.1 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 // indirect + go.opentelemetry.io/otel/sdk v1.11.1 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect @@ -146,3 +153,5 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d + +replace github.com/flyteorg/flytestdlib => github.com/flyteorg/flytestdlib v1.0.22-0.20230721154524-9cbeef9f7b2e diff --git a/go.sum b/go.sum index ebafdba07..24089adf6 100644 --- a/go.sum +++ b/go.sum @@ -176,7 +176,12 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= @@ -230,7 +235,9 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b/go.mod h1:NAJj0yf/KaRKURN6nyi7A9IZydMivZEm9oQLWNjfKDc= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -246,8 +253,8 @@ github.com/flyteorg/flyteidl v1.5.10 h1:SHeiaWRt8EAVuFsat+BJswtc07HTZ4DqhfTEYSm6 github.com/flyteorg/flyteidl v1.5.10/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y= github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E= -github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ= -github.com/flyteorg/flytestdlib v1.0.20/go.mod h1:v3ua7HfHDXXTCrAt2yZERGKCuilP5Rh+L8TdAbfVcBg= +github.com/flyteorg/flytestdlib v1.0.22-0.20230721154524-9cbeef9f7b2e h1:KsxTeqYC5aMAEkFMahn3dh6/Vs/4Cfcu9l1UqPXrlvc= +github.com/flyteorg/flytestdlib v1.0.22-0.20230721154524-9cbeef9f7b2e/go.mod h1:Ytuer31/7zIVHb7IdItSEDkCJMskE2eJ1ccP+PGPZ3g= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -276,8 +283,11 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.2.1/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v0.1.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk= github.com/go-logr/zapr v0.2.0/go.mod h1:qhKdvif7YF5GI9NWEpyxTSSBdGmzkNguibrdCNVPunU= github.com/go-logr/zapr v1.2.0 h1:n4JnPI1T3Qq1SFEi/F8rwLrZERp2bso19PJZDB9dayk= @@ -408,6 +418,8 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -778,15 +790,29 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 h1:WenoaOMNP71oq3KkMZ/jnxI9xU/JSCLw8yZILSI2lfU= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= +go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= +go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4= +go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE= +go.opentelemetry.io/otel/exporters/jaeger v1.11.1 h1:F9Io8lqWdGyIbY3/SOGki34LX/l+7OL0gXNxjqwcbuQ= +go.opentelemetry.io/otel/exporters/jaeger v1.11.1/go.mod h1:lRa2w3bQ4R4QN6zYsDgy7tEezgoKEu7Ow2g35Y75+KI= go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 h1:3Yvzs7lgOw8MmbxmLRsQGwYdCubFmUHSooKaEhQunFQ= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1/go.mod h1:pyHDt0YlyuENkD2VwHsiRDf+5DfI3EH7pfhUYW6sQUE= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= +go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= +go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE= go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= +go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= +go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= +go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -1235,6 +1261,7 @@ google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.56.1 h1:z0dNfjIl0VpaZ9iSVjA6daGatAYwPGstTjt5vkRMFkQ= google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d7b8d9c28..d8d0a5f9b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -549,6 +549,8 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s ctx, cancel := context.WithCancel(ctx) defer cancel() + // TODO @hamersaw wrap mgr in telemetryclient + kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg) if err != nil { return errors.Wrapf(err, "error building Kubernetes Clientset") diff --git a/pkg/controller/executors/kube.go b/pkg/controller/executors/kube.go index a37e7a216..8c15561e4 100644 --- a/pkg/controller/executors/kube.go +++ b/pkg/controller/executors/kube.go @@ -6,6 +6,7 @@ import ( "github.com/flyteorg/flytestdlib/fastcheck" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/telemetryutils" "k8s.io/client-go/rest" @@ -69,6 +70,7 @@ func (f *FallbackClientBuilder) WithUncached(objs ...client.Object) ClientBuilde } func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + // TODO @hamersaw - wrap client and cache in telemetryutils.K8sClientWrapper c, err := client.New(config, options) if err != nil { return nil, err @@ -79,6 +81,8 @@ func (f FallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, opt return nil, err } + c = telemetryutils.WrapK8sClient(c) + cache = telemetryutils.WrapK8sCache(cache) return client.NewDelegatingClient(client.NewDelegatingClientInput{ Client: c, CacheReader: fallbackClientReader{ diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index a16755787..15004dbe0 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -22,8 +22,11 @@ import ( "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flytestdlib/telemetryutils" "github.com/prometheus/client_golang/prometheus" + + "go.opentelemetry.io/otel/trace" ) // TODO Lets move everything to use controller runtime @@ -178,11 +181,17 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F // // func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { + var span trace.Span + ctx, span = telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "pkg.controller.Propeller/Handle") + defer span.End() + logger.Infof(ctx, "Processing Workflow.") defer logger.Infof(ctx, "Completed processing workflow.") // Get the FlyteWorkflow resource with this namespace/name + _, wfStoreGetSpan := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "WorkflowStore.Get") w, fetchErr := p.wfStore.Get(ctx, namespace, name) + wfStoreGetSpan.End() if fetchErr != nil { if workflowstore.IsNotFound(fetchErr) { p.metrics.WorkflowNotFound.Inc() @@ -215,25 +224,12 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // static fields to the blobstore to reduce CRD size. we must read and parse the workflow // closure so that these fields may be temporarily repopulated. var wfClosureCrdFields *k8s.WfClosureCrdFields + var err error if len(w.WorkflowClosureReference) > 0 { - t := p.metrics.WorkflowClosureReadTime.Start(ctx) - - wfClosure := &admin.WorkflowClosure{} - err := p.store.ReadProtobuf(ctx, w.WorkflowClosureReference, wfClosure) - if err != nil { - t.Stop() - logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) - return err - } - - wfClosureCrdFields, err = k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow) + wfClosureCrdFields, err = p.parseWorkflowClosureCrdFields(ctx, w.WorkflowClosureReference) if err != nil { - t.Stop() - logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) return err } - - t.Stop() } streak := 0 @@ -245,140 +241,178 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } for streak = 0; streak < maxLength; streak++ { - // if the wfClosureCrdFields struct is not nil then it contains static workflow data which - // has been offloaded to the blobstore. we must set these fields so they're available - // during workflow processing and immediately remove them afterwards so they do not - // accidentally get written to the workflow store once the new state is stored. - if wfClosureCrdFields != nil { - w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec - w.Tasks = wfClosureCrdFields.Tasks - w.SubWorkflows = wfClosureCrdFields.SubWorkflows + w, err = p.streak(ctx, w, wfClosureCrdFields) + if err != nil { + return err + } else if w == nil { + break } - t := p.metrics.RoundTime.Start(ctx) - mutatedWf, err := p.TryMutateWorkflow(ctx, w) + logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) + } - if wfClosureCrdFields != nil { - // strip data populated from WorkflowClosureReference - w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil - if mutatedWf != nil { - mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil - } + logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength) + return nil +} + +// parseWorkflowClosureCrdFields attempts to retrieve offloaded static workflow closure data from the specified +// DataReference. +func (p *Propeller) parseWorkflowClosureCrdFields(ctx context.Context, dataReference storage.DataReference) (*k8s.WfClosureCrdFields, error) { + _, span := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "pkg.controller.Propeller/parseWorkflowClosureCrdFields") + defer span.End() + + t := p.metrics.WorkflowClosureReadTime.Start(ctx) + defer t.Stop() + + wfClosure := &admin.WorkflowClosure{} + err := p.store.ReadProtobuf(ctx, dataReference, wfClosure) + if err != nil { + logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", dataReference, err) + return nil, err + } + + wfClosureCrdFields, err := k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow) + if err != nil { + logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", dataReference, err) + return nil, err + } + + return wfClosureCrdFields, nil +} + +// streak performs a single iteration of mutating a workflow returning the newly mutated workflow on success or nil if +// the workflow was not updated. +func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClosureCrdFields *k8s.WfClosureCrdFields) (*v1alpha1.FlyteWorkflow, error) { + ctx, span := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "pkg.controller.Propeller/streak") + defer span.End() + + t := p.metrics.RoundTime.Start(ctx) + defer t.Stop() + + // if the wfClosureCrdFields struct is not nil then it contains static workflow data which + // has been offloaded to the blobstore. we must set these fields so they're available + // during workflow processing and immediately remove them afterwards so they do not + // accidentally get written to the workflow store once the new state is stored. + if wfClosureCrdFields != nil { + w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec + w.Tasks = wfClosureCrdFields.Tasks + w.SubWorkflows = wfClosureCrdFields.SubWorkflows + } + + mutatedWf, err := p.TryMutateWorkflow(ctx, w) + + if wfClosureCrdFields != nil { + // strip data populated from WorkflowClosureReference + w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil + if mutatedWf != nil { + mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil } + } - if err != nil { - // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations - // We only want to increase failed attempts and discard any other partial changes to the CRD. - mutatedWf = RecordSystemError(w, err) - p.metrics.SystemError.Inc(ctx) - } else if mutatedWf == nil { - logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!") - return nil - } else { - if !w.GetExecutionStatus().IsTerminated() { - // No updates in the status we detected, we will skip writing to KubeAPI - if mutatedWf.Status.Equals(&w.Status) { - logger.Info(ctx, "WF hasn't been updated in this round.") - t.Stop() - return nil - } - } - if mutatedWf.GetExecutionStatus().IsTerminated() { - // If the end result is a terminated workflow, we remove the labels - // We add a completed label so that we can avoid polling for this workflow - SetCompletedLabel(mutatedWf, time.Now()) - ResetFinalizers(mutatedWf) + if err != nil { + // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations + // We only want to increase failed attempts and discard any other partial changes to the CRD. + mutatedWf = RecordSystemError(w, err) + p.metrics.SystemError.Inc(ctx) + } else if mutatedWf == nil { + logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!") + return nil, nil + } else { + if !w.GetExecutionStatus().IsTerminated() { + // No updates in the status we detected, we will skip writing to KubeAPI + if mutatedWf.Status.Equals(&w.Status) { + logger.Info(ctx, "WF hasn't been updated in this round.") + return nil, nil } } + if mutatedWf.GetExecutionStatus().IsTerminated() { + // If the end result is a terminated workflow, we remove the labels + // We add a completed label so that we can avoid polling for this workflow + SetCompletedLabel(mutatedWf, time.Now()) + ResetFinalizers(mutatedWf) + } + } - // ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not - // a valid state unless we are experiencing a race condition where the workflow has not yet - // been inserted into the db (ie. workflow phase is WorkflowPhaseReady). - if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady { - t.Stop() - logger.Errorf(ctx, "Failed to process workflow, failing: %s", err) - - // We set the workflow status to failing to abort any active tasks in the next round. - mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "ExecutionNotFound", - Message: "Workflow execution not found in flyteadmin.", - }) - if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e) - return e - } - return nil + // ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not + // a valid state unless we are experiencing a race condition where the workflow has not yet + // been inserted into the db (ie. workflow phase is WorkflowPhaseReady). + if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady { + logger.Errorf(ctx, "Failed to process workflow, failing: %s", err) + + // We set the workflow status to failing to abort any active tasks in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "ExecutionNotFound", + Message: "Workflow execution not found in flyteadmin.", + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e) + return nil, e } + return nil, nil + } - // Incompatible cluster means that another cluster has been designated to handle this workflow execution. - // We should early abort in this case, since any events originating from this cluster for this execution will - // be rejected. - if err != nil && eventsErr.IsEventIncompatibleClusterError(err) { - t.Stop() - logger.Errorf(ctx, "No longer designated to process workflow, failing: %s", err) + // Incompatible cluster means that another cluster has been designated to handle this workflow execution. + // We should early abort in this case, since any events originating from this cluster for this execution will + // be rejected. + if err != nil && eventsErr.IsEventIncompatibleClusterError(err) { + logger.Errorf(ctx, "No longer designated to process workflow, failing: %s", err) + + // We set the workflow status to failing to abort any active tasks in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution cluster reassigned, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: string(eventsErr.EventIncompatibleCusterError), + Message: fmt.Sprintf("Workflow execution cluster reassigned: %v", err), + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed to record an EventIncompatibleClusterError workflow as failed, reason: %s. Retrying...", e) + return nil, e + } + return nil, nil + } - // We set the workflow status to failing to abort any active tasks in the next round. + // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update + + // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + _, wfStoreUpdateSpan := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "WorkflowStore.Update") + newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) + wfStoreUpdateSpan.End() + if updateErr != nil { + // The update has failed, lets check if this is because the size is too large. If so + if workflowstore.IsWorkflowTooLarge(updateErr) { + logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr) + p.metrics.SystemError.Inc(ctx) + // Workflow is too large, we will mark the workflow as failing and record it. This will automatically + // propagate the failure in the next round. mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution cluster reassigned, aborting", &core.ExecutionError{ + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ Kind: core.ExecutionError_SYSTEM, - Code: string(eventsErr.EventIncompatibleCusterError), - Message: fmt.Sprintf("Workflow execution cluster reassigned: %v", err), + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", }) if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed to record an EventIncompatibleClusterError workflow as failed, reason: %s. Retrying...", e) - return e + logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) + return nil, e } - return nil - } - - // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update - - // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) - if updateErr != nil { - t.Stop() - // The update has failed, lets check if this is because the size is too large. If so - if workflowstore.IsWorkflowTooLarge(updateErr) { - logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr) - p.metrics.SystemError.Inc(ctx) - // Workflow is too large, we will mark the workflow as failing and record it. This will automatically - // propagate the failure in the next round. - mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "WorkflowTooLarge", - Message: "Workflow execution state is too large for Flyte to handle.", - }) - if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) - return e - } - return nil - } - return updateErr - } - if err != nil { - t.Stop() - // An error was encountered during the round. Let us return, so that we can back-off gracefully - return err + return nil, nil } - if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { - // Workflow is terminated (no need to continue) or no status was changed, we can wait - logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v", - mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) - t.Stop() - return nil - } - logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) - w = newWf - t.Stop() + return nil, updateErr } - logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength) - return nil + if err != nil { + // An error was encountered during the round. Let us return, so that we can back-off gracefully + return nil, err + } + if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { + // Workflow is terminated (no need to continue) or no status was changed, we can wait + logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v", + mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + return nil, nil + } + return newWf, nil } // NewPropellerHandler creates a new Propeller and initializes metrics diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index c447b779c..8f169e30b 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -47,9 +47,13 @@ import ( "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flytestdlib/telemetryutils" "github.com/golang/protobuf/ptypes" + //"go.opentelemetry.io/otel/attribute" + //"go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -740,6 +744,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur logger.Debugf(ctx, "Handling Node [%s]", nCtx.NodeID()) defer logger.Debugf(ctx, "Completed node [%s]", nCtx.NodeID()) + ctx, span := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/handleNode") + defer span.End() + nodeStatus := nCtx.NodeStatus() currentPhase := nodeStatus.GetPhase() diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 3009af038..611e7b772 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -8,19 +8,28 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/telemetryutils" + + "github.com/golang/protobuf/ptypes" + grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pkg/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/golang/protobuf/ptypes" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/util/uuid" ) @@ -472,7 +481,12 @@ func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection boo retryInterceptor := grpcRetry.UnaryClientInterceptor(grpcOptions...) - opts = append(opts, grpc.WithChainUnaryInterceptor(grpcPrometheus.UnaryClientInterceptor, + tracerProvider := telemetryutils.GetTracerProvider(telemetryutils.DataCatalogClientTracer) + opts = append(opts, grpc.WithChainUnaryInterceptor( + grpcPrometheus.UnaryClientInterceptor, + otelgrpc.UnaryClientInterceptor( + otelgrpc.WithTracerProvider(tracerProvider), + ), retryInterceptor)) clientConn, err := grpc.Dial(endpoint, opts...) if err != nil { diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index e0faf8f85..9e198e0b3 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -28,6 +28,7 @@ import ( "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flytestdlib/telemetryutils" "github.com/golang/protobuf/ptypes" regErrors "github.com/pkg/errors" @@ -535,6 +536,10 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { ttype := nCtx.TaskReader().GetTaskType() ctx = contextutils.WithTaskType(ctx, ttype) + + ctx, span := telemetryutils.NewSpan(ctx, telemetryutils.FlytePropellerTracer, "pkg.controller.nodes.task.Handler/HandleTask") + defer span.End() + p, err := t.ResolvePlugin(ctx, ttype, nCtx.ExecutionContext().GetExecutionConfig()) if err != nil { return handler.UnknownTransition, errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin") diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 37534dc32..06edd773e 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -209,6 +209,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas key := backoff.ComposeResourceKey(o) pod, casted := o.(*v1.Pod) + k8sCreateTimestamp := time.Now() if e.backOffController != nil && casted { podRequestedResources := e.getPodEffectiveResourceLimits(ctx, pod) @@ -249,7 +250,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, errors.Wrapf(stdErrors.ErrorCode(reason), err, "failed to create resource") } - return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil + return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(k8sCreateTimestamp, pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil } func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) {