diff --git a/go.mod b/go.mod index 167f995d439..5207e4195a0 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/hashicorp/go-multierror v1.1.1 github.com/kelseyhightower/envconfig v1.4.0 + github.com/mitchellh/hashstructure v1.0.0 github.com/onsi/ginkgo/v2 v2.22.1 github.com/onsi/gomega v1.36.2 github.com/pkg/errors v0.9.1 @@ -157,7 +158,6 @@ require ( github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect - github.com/mitchellh/hashstructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/locker v1.0.1 // indirect diff --git a/hack/utils/oss_compliance/osa_provided.md b/hack/utils/oss_compliance/osa_provided.md index cde1b025ce8..f3c22b6977b 100644 --- a/hack/utils/oss_compliance/osa_provided.md +++ b/hack/utils/oss_compliance/osa_provided.md @@ -17,6 +17,7 @@ Name|Version|License [google/go-cmp](https://github.com/google/go-cmp)|v0.6.0|BSD 3-clause "New" or "Revised" License [grpc-ecosystem/go-grpc-middleware](https://github.com/grpc-ecosystem/go-grpc-middleware)|v1.4.0|Apache License 2.0 [kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig)|v1.4.0|MIT License +[mitchellh/hashstructure](https://github.com/mitchellh/hashstructure)|v1.0.0|MIT License [ginkgo/v2](https://github.com/onsi/ginkgo)|v2.22.1|MIT License [onsi/gomega](https://github.com/onsi/gomega)|v1.36.2|MIT License [pkg/errors](https://github.com/pkg/errors)|v0.9.1|BSD 2-clause "Simplified" License diff --git a/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go b/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go index b09fb550c4f..d81ca5250c5 100644 --- a/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go +++ b/internal/kgateway/extensions2/plugins/directresponse/direct_response_plugin.go @@ -53,6 +53,11 @@ func (p *directResponsePluginGwPass) ApplyHCM(ctx context.Context, pCtx *ir.HcmC return nil } +func (p *directResponsePluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func registerTypes(ourCli versioned.Interface) { skubeclient.Register[*v1alpha1.DirectResponse]( v1alpha1.DirectResponseGVK.GroupVersion().WithResource("directresponses"), @@ -158,7 +163,7 @@ func (p *directResponsePluginGwPass) HttpFilters(ctx context.Context, fcc ir.Fil return nil, nil } -func (p *directResponsePluginGwPass) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { +func (p *directResponsePluginGwPass) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { return nil, nil } diff --git a/internal/kgateway/extensions2/plugins/httplistenerpolicy/httplistener_plugin.go b/internal/kgateway/extensions2/plugins/httplistenerpolicy/httplistener_plugin.go index cfa2d703d0a..984269707a3 100644 --- a/internal/kgateway/extensions2/plugins/httplistenerpolicy/httplistener_plugin.go +++ b/internal/kgateway/extensions2/plugins/httplistenerpolicy/httplistener_plugin.go @@ -58,6 +58,11 @@ func (d *httpListenerPolicy) Equals(in any) bool { type httpListenerPolicyPluginGwPass struct { } +func (p *httpListenerPolicyPluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func (p *httpListenerPolicyPluginGwPass) ApplyListenerPlugin(ctx context.Context, pCtx *ir.ListenerContext, out *envoy_config_listener_v3.Listener) { // no op } @@ -164,7 +169,7 @@ func (p *httpListenerPolicyPluginGwPass) HttpFilters(ctx context.Context, fcc ir return nil, nil } -func (p *httpListenerPolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { +func (p *httpListenerPolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { return nil, nil } diff --git a/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go b/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go index 72b8e285948..c621db36800 100644 --- a/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go +++ b/internal/kgateway/extensions2/plugins/listenerpolicy/listener_policy_plugin.go @@ -41,6 +41,11 @@ func (d *listenerPolicy) Equals(in any) bool { type listenerPolicyPluginGwPass struct { } +func (p *listenerPolicyPluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensionplug.Plugin { col := krtutil.SetupCollectionDynamic[v1alpha1.ListenerPolicy]( ctx, @@ -136,7 +141,7 @@ func (p *listenerPolicyPluginGwPass) HttpFilters(ctx context.Context, fcc ir.Fil return nil, nil } -func (p *listenerPolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { +func (p *listenerPolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { return nil, nil } diff --git a/internal/kgateway/extensions2/plugins/routepolicy/ai_policy.go b/internal/kgateway/extensions2/plugins/routepolicy/ai_policy.go new file mode 100644 index 00000000000..aa44276b915 --- /dev/null +++ b/internal/kgateway/extensions2/plugins/routepolicy/ai_policy.go @@ -0,0 +1,318 @@ +package routepolicy + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "hash" + "hash/fnv" + "os" + "reflect" + "strings" + + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + "github.com/mitchellh/hashstructure" + envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2" + "google.golang.org/protobuf/types/known/wrapperspb" + "k8s.io/utils/ptr" + + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" + aiutils "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/pluginutils" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" +) + +const ( + contextString = `{"content":"%s","role":"%s"}` +) + +func (p *routePolicyPluginGwPass) processAIRoutePolicy( + ctx context.Context, + aiConfig *v1alpha1.AIRoutePolicy, + pCtx *ir.RouteBackendContext, + extprocSettings *envoy_ext_proc_v3.ExtProcPerRoute, + aiSecret *ir.Secret, +) error { + // If the route options specify this as a chat streaming route, add a header to the ext-proc request + if aiConfig.RouteType != nil && *aiConfig.RouteType == v1alpha1.CHAT_STREAMING { + // append streaming header if it's a streaming route + extprocSettings.GetOverrides().GrpcInitialMetadata = append(extprocSettings.GetOverrides().GetGrpcInitialMetadata(), &envoy_config_core_v3.HeaderValue{ + Key: "x-chat-streaming", + Value: "true", + }) + p.setAIFilter = true + } + + // Setup initial transformation template. This may be modified by further + transformationTemplate := initTransformationTemplate() + err := handleAIRoutePolicy(aiConfig, extprocSettings, transformationTemplate, aiSecret) + if err != nil { + return err + } + + transformation := &envoytransformation.RouteTransformations_RouteTransformation{ + Match: &envoytransformation.RouteTransformations_RouteTransformation_RequestMatch_{ + RequestMatch: &envoytransformation.RouteTransformations_RouteTransformation_RequestMatch{ + RequestTransformation: &envoytransformation.Transformation{ + // Set this env var to true to log the request/response info for each transformation + LogRequestResponseInfo: wrapperspb.Bool(os.Getenv("AI_PLUGIN_DEBUG_TRANSFORMATIONS") == "true"), + TransformationType: &envoytransformation.Transformation_TransformationTemplate{ + TransformationTemplate: transformationTemplate, + }, + }, + }, + }, + } + pCtx.AddTypedConfig(wellknown.AIPolicyTransformationFilterName, transformation) + pCtx.AddTypedConfig(wellknown.AIExtProcFilterName, extprocSettings) + + return nil +} + +func initTransformationTemplate() *envoytransformation.TransformationTemplate { + transformationTemplate := &envoytransformation.TransformationTemplate{ + // We will add the auth token later + Headers: map[string]*envoytransformation.InjaTemplate{}, + } + transformationTemplate.BodyTransformation = &envoytransformation.TransformationTemplate_MergeJsonKeys{ + MergeJsonKeys: &envoytransformation.MergeJsonKeys{ + JsonKeys: map[string]*envoytransformation.MergeJsonKeys_OverridableTemplate{}, + }, + } + return transformationTemplate +} + +func handleAIRoutePolicy( + aiConfig *v1alpha1.AIRoutePolicy, + extProcRouteSettings *envoy_ext_proc_v3.ExtProcPerRoute, + transformation *envoytransformation.TransformationTemplate, + aiSecrets *ir.Secret, +) error { + if err := applyDefaults(aiConfig.Defaults, transformation); err != nil { + return err + } + + if err := applyPromptEnrichment(aiConfig.PromptEnrichment, transformation); err != nil { + return err + } + + if err := applyPromptGuard(aiConfig.PromptGuard, extProcRouteSettings, aiSecrets); err != nil { + return err + } + + return nil +} + +func applyDefaults( + defaults []v1alpha1.FieldDefault, + transformation *envoytransformation.TransformationTemplate, +) error { + if len(defaults) == 0 { + return nil + } + for _, field := range defaults { + marshalled, err := json.Marshal(field.Value) + if err != nil { + return err + } + var tmpl string + if field.Override != nil { + // Inja default function will use the default value if the field provided is falsey + tmpl = fmt.Sprintf("{{ default(%s, %s) }}", field.Value, string(marshalled)) + } else { + tmpl = string(marshalled) + } + if transformation.GetMergeJsonKeys().GetJsonKeys() == nil { + transformation.GetMergeJsonKeys().JsonKeys = make(map[string]*envoytransformation.MergeJsonKeys_OverridableTemplate) + } + transformation.GetMergeJsonKeys().GetJsonKeys()[field.Field] = &envoytransformation.MergeJsonKeys_OverridableTemplate{ + Tmpl: &envoytransformation.InjaTemplate{Text: tmpl}, + } + } + return nil +} + +func applyPromptEnrichment( + pe *v1alpha1.AIPromptEnrichment, + transformation *envoytransformation.TransformationTemplate, +) error { + if pe == nil { + return nil + } + // This function does some slightly complex json string work because we're instructing the transformation filter + // to take the existing `messages` field and potentially prepend and append to it. + // JSON is insensitive to new lines, so we don't need to worry about them. We simply need to join the + // user added messages with the request messages + // For example: + // messages = [{"content": "welcome ", "role": "user"}] + // prepend = [{"content": "hi", "role": "user"}] + // append = [{"content": "bye", "role": "user"}] + // Would result in: + // [{"content": "hi", "role": "user"}, {"content": "welcome ", "role": "user"}, {"content": "bye", "role": "user"}] + bodyChunk1 := `[` + bodyChunk2 := `{{ join(messages, ", ") }}` + bodyChunk3 := `]` + + prependString := make([]string, 0, len(pe.Prepend)) + for _, toPrepend := range pe.Prepend { + prependString = append( + prependString, + fmt.Sprintf( + contextString, + toPrepend.Content, + strings.ToLower(strings.ToLower(toPrepend.Role)), + )+",", + ) + } + appendString := make([]string, 0, len(pe.Append)) + for idx, toAppend := range pe.Append { + formatted := fmt.Sprintf( + contextString, + toAppend.Content, + strings.ToLower(strings.ToLower(toAppend.Role)), + ) + if idx != len(pe.Append)-1 { + formatted += "," + } + appendString = append(appendString, formatted) + } + builder := &strings.Builder{} + builder.WriteString(bodyChunk1) + builder.WriteString(strings.Join(prependString, "")) + builder.WriteString(bodyChunk2) + if len(appendString) > 0 { + builder.WriteString(",") + builder.WriteString(strings.Join(appendString, "")) + } + builder.WriteString(bodyChunk3) + finalBody := builder.String() + // Overwrite the user messages body key with the templated version + transformation.GetMergeJsonKeys().GetJsonKeys()["messages"] = &envoytransformation.MergeJsonKeys_OverridableTemplate{ + Tmpl: &envoytransformation.InjaTemplate{Text: finalBody}, + } + return nil +} + +func applyPromptGuard(pg *v1alpha1.AIPromptGuard, extProcRouteSettings *envoy_ext_proc_v3.ExtProcPerRoute, secret *ir.Secret) error { + if pg == nil { + return nil + } + if req := pg.Request; req != nil { + if mod := req.Moderation; mod != nil { + if mod.OpenAIModeration != nil { + token, err := aiutils.GetAuthToken(mod.OpenAIModeration.AuthToken, secret) + if err != nil { + return err + } + mod.OpenAIModeration.AuthToken = v1alpha1.SingleAuthToken{ + Inline: ptr.To(token), + } + } else { + return fmt.Errorf("OpenAI moderation config must be set for moderation prompt guard") + } + pg.Request.Moderation = mod + } + bin, err := json.Marshal(req) + if err != nil { + return err + } + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-req-guardrails-config", + Value: string(bin), + }, + ) + // Use this in the server to key per-route-config + // Better to do it here because we have generated functions + reqHash, _ := hashUnique(req, nil) + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-req-guardrails-config-hash", + Value: fmt.Sprint(reqHash), + }, + ) + } + + if resp := pg.Response; resp != nil { + // Resp needs to be defined in python ai extensions in the same format + bin, err := json.Marshal(resp) + if err != nil { + return err + } + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-resp-guardrails-config", + Value: string(bin), + }, + ) + // Use this in the server to key per-route-config + // Better to do it here because we have generated functions + respHash, _ := hashUnique(resp, nil) + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-resp-guardrails-config-hash", + Value: fmt.Sprint(respHash), + }, + ) + } + return nil +} + +// hashUnique generates a hash of the struct that is unique to the object by +// hashing field name and value pairs +func hashUnique(obj interface{}, hasher hash.Hash64) (uint64, error) { + if obj == nil { + return 0, nil + } + if hasher == nil { + hasher = fnv.New64() + } + + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + typ := val.Type() + + // Write type name for consistency with proto implementation + _, err := hasher.Write([]byte(typ.PkgPath() + "/" + typ.Name())) + if err != nil { + return 0, err + } + + // Iterate through fields + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Write field name + if _, err := hasher.Write([]byte(fieldType.Name)); err != nil { + return 0, err + } + + // Handle nil pointer fields + if field.Kind() == reflect.Ptr && field.IsNil() { + continue + } + + // Get the actual value if it's a pointer + if field.Kind() == reflect.Ptr { + field = field.Elem() + } + + // Hash the field value + fieldValue, err := hashstructure.Hash(field.Interface(), nil) + if err != nil { + return 0, err + } + + // Write the hash to our hasher + if err := binary.Write(hasher, binary.LittleEndian, fieldValue); err != nil { + return 0, err + } + } + + return hasher.Sum64(), nil +} diff --git a/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go b/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go index 224311fcd98..351dd5ebab8 100644 --- a/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go +++ b/internal/kgateway/extensions2/plugins/routepolicy/route_policy_plugin.go @@ -4,13 +4,20 @@ import ( "context" "time" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + common_set_filter_state_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/set_filter_state/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + http_set_filter_state_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/set_filter_state/v3" envoyhttp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + "github.com/solo-io/go-utils/contextutils" "google.golang.org/protobuf/types/known/durationpb" + "istio.io/istio/pkg/kube/krt" "k8s.io/apimachinery/pkg/runtime/schema" - envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - "istio.io/istio/pkg/kube/krt" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/pluginutils" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections" "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/common" @@ -19,11 +26,13 @@ import ( "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/plugins" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils/krtutil" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" ) type routePolicy struct { - ct time.Time - spec v1alpha1.RoutePolicySpec + ct time.Time + spec v1alpha1.RoutePolicySpec + AISecret *ir.Secret } func (d *routePolicy) CreationTime() time.Time { @@ -39,6 +48,7 @@ func (d *routePolicy) Equals(in any) bool { } type routePolicyPluginGwPass struct { + setAIFilter bool } func (p *routePolicyPluginGwPass) ApplyHCM(ctx context.Context, pCtx *ir.HcmContext, out *envoyhttp.HttpConnectionManager) error { @@ -54,6 +64,8 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi commoncol.KrtOpts.ToOptions("RoutePolicy")..., ) gk := v1alpha1.RoutePolicyGVK.GroupKind() + translate := buildTranslateFunc(ctx, commoncol.Secrets) + // RoutePolicy IR will have TypedConfig -> implement backendroute method to add prompt guard, etc. policyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, policyCR *v1alpha1.RoutePolicy) *ir.PolicyWrapper { var pol = &ir.PolicyWrapper{ ObjectSource: ir.ObjectSource{ @@ -63,7 +75,7 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi Name: policyCR.Name, }, Policy: policyCR, - PolicyIR: &routePolicy{ct: policyCR.CreationTimestamp.Time, spec: policyCR.Spec}, + PolicyIR: translate(krtctx, policyCR), TargetRefs: convert(policyCR.Spec.TargetRef), } return pol @@ -113,6 +125,18 @@ func (p *routePolicyPluginGwPass) ApplyForRoute(ctx context.Context, pCtx *ir.Ro outputRoute.GetRoute().Timeout = durationpb.New(time.Second * time.Duration(policy.spec.Timeout)) } + // TODO: err/warn/ignore if targetRef is set on non-AI Upstream + + return nil +} + +// ApplyForBackend applies regardless if policy is attached +func (p *routePolicyPluginGwPass) ApplyForBackend( + ctx context.Context, + pCtx *ir.RouteBackendContext, + in ir.HttpBackend, + out *envoy_config_route_v3.Route, +) error { return nil } @@ -121,6 +145,28 @@ func (p *routePolicyPluginGwPass) ApplyForRouteBackend( policy ir.PolicyIR, pCtx *ir.RouteBackendContext, ) error { + extprocSettingsProto := pCtx.GetConfig(wellknown.AIExtProcFilterName) + if extprocSettingsProto == nil { + return nil + } + extprocSettings, ok := extprocSettingsProto.(*envoy_ext_proc_v3.ExtProcPerRoute) + if !ok { + // TODO: internal error + return nil + } + + rtPolicy, ok := policy.(*routePolicy) + if !ok { + return nil + } + + err := p.processAIRoutePolicy(ctx, rtPolicy.spec.AI, pCtx, extprocSettings, rtPolicy.AISecret) + if err != nil { + // TODO: report error on status + return err + } + pCtx.AddTypedConfig(wellknown.AIExtProcFilterName, extprocSettings) + return nil } @@ -128,10 +174,46 @@ func (p *routePolicyPluginGwPass) ApplyForRouteBackend( // if a plugin emits new filters, they must be with a plugin unique name. // any filter returned from route config must be disabled, so it doesnt impact other routes. func (p *routePolicyPluginGwPass) HttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedHttpFilter, error) { - return nil, nil + var filters []plugins.StagedHttpFilter + + if p.setAIFilter { + // handle route policy RouteType by setting it in the dynamic metadata + fsConfig := &http_set_filter_state_v3.Config{ + OnRequestHeaders: []*common_set_filter_state_v3.FilterStateValue{ + { + Key: &common_set_filter_state_v3.FilterStateValue_ObjectKey{ + ObjectKey: "envoy.route_type", + }, + FactoryKey: "envoy.route_type", + Value: &common_set_filter_state_v3.FilterStateValue_FormatString{ + FormatString: &envoy_config_core_v3.SubstitutionFormatString{ + Format: &envoy_config_core_v3.SubstitutionFormatString_TextFormatSource{ + TextFormatSource: &envoy_config_core_v3.DataSource{ + Specifier: &envoy_config_core_v3.DataSource_InlineString{ + InlineString: "%DYNAMIC_METADATA(envoy.route_type)%", + }, + }, + }, + }, + }, + }, + }, + } + stagedFilter, err := plugins.NewStagedFilter( + wellknown.SetMetadataFilterName, + fsConfig, + plugins.BeforeStage(plugins.RouteStage), + ) + if err != nil { + return nil, err + } + filters = append(filters, stagedFilter) + } + + return filters, nil } -func (p *routePolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { +func (p *routePolicyPluginGwPass) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { return nil, nil } @@ -143,3 +225,30 @@ func (p *routePolicyPluginGwPass) NetworkFilters(ctx context.Context) ([]plugins func (p *routePolicyPluginGwPass) ResourcesToAdd(ctx context.Context) ir.Resources { return ir.Resources{} } + +func buildTranslateFunc(ctx context.Context, secrets *krtcollections.SecretIndex) func(krtctx krt.HandlerContext, i *v1alpha1.RoutePolicy) *routePolicy { + return func(krtctx krt.HandlerContext, policyCR *v1alpha1.RoutePolicy) *routePolicy { + policyIr := routePolicy{ct: policyCR.CreationTimestamp.Time, spec: policyCR.Spec} + + // Check for the presence of the OpenAI Moderation which may require a secret reference + if policyCR.Spec.AI == nil || policyCR.Spec.AI.PromptGuard == nil || policyCR.Spec.AI.PromptGuard.Request.Moderation == nil { + return &policyIr + } + + secretRef := policyCR.Spec.AI.PromptGuard.Request.Moderation.OpenAIModeration.AuthToken.SecretRef + if secretRef == nil { + // no secret ref is set + return &policyIr + } + + // Retrieve and assign the secret + secret, err := pluginutils.GetSecretIr(secrets, krtctx, secretRef.Name, policyCR.GetNamespace()) + if err != nil { + contextutils.LoggerFrom(ctx).Error(err) + return &policyIr + } + + policyIr.AISecret = secret + return &policyIr + } +} diff --git a/internal/kgateway/extensions2/plugins/upstream/ai/ai_backend.go b/internal/kgateway/extensions2/plugins/upstream/ai/ai_backend.go new file mode 100644 index 00000000000..63c2a51e6df --- /dev/null +++ b/internal/kgateway/extensions2/plugins/upstream/ai/ai_backend.go @@ -0,0 +1,148 @@ +package ai + +import ( + "context" + "os" + + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + "github.com/rotisserie/eris" + envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" +) + +func ApplyAIBackend(ctx context.Context, aiUpstream *v1alpha1.AIUpstream, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // Setup ext-proc route filter config, we will conditionally modify it based on certain route options. + // A heavily used part of this config is the `GrpcInitialMetadata`. + // This is used to add headers to the ext-proc request. + // These headers are used to configure the AI server on a per-request basis. + // This was the best available way to pass per-route configuration to the AI server. + extProcRouteSettingsProto := pCtx.GetConfig(wellknown.AIExtProcFilterName) + var extProcRouteSettings *envoy_ext_proc_v3.ExtProcPerRoute + if extProcRouteSettingsProto == nil { + extProcRouteSettings = &envoy_ext_proc_v3.ExtProcPerRoute{ + Override: &envoy_ext_proc_v3.ExtProcPerRoute_Overrides{ + Overrides: &envoy_ext_proc_v3.ExtProcOverrides{}, + }, + } + } else { + extProcRouteSettings = extProcRouteSettingsProto.(*envoy_ext_proc_v3.ExtProcPerRoute) + } + + var llmModel string + byType := map[string]struct{}{} + if aiUpstream.LLM != nil { + llmModel = getUpstreamModel(aiUpstream.LLM, byType) + } else if aiUpstream.MultiPool != nil { + for _, priority := range aiUpstream.MultiPool.Priorities { + for _, pool := range priority.Pool { + llmModel = getUpstreamModel(&pool, byType) + } + } + } + + if len(byType) != 1 { + return eris.Errorf("multiple AI backend types found for single ai route %+v", byType) + } + + // This is only len(1) + var llmProvider string + for k := range byType { + llmProvider = k + } + + // Add things which require basic AI upstream. + if out == nil { + panic("!!") + } + if out.GetRoute() == nil { + // initialize route action if not set + out.Action = &envoy_config_route_v3.Route_Route{ + Route: &envoy_config_route_v3.RouteAction{}, + } + } + out.GetRoute().HostRewriteSpecifier = &envoy_config_route_v3.RouteAction_AutoHostRewrite{ + AutoHostRewrite: wrapperspb.Bool(true), + } + + //We only want to add the transformation filter if we have a single AI backend + //Otherwise we already have the transformation filter added by the weighted destination. + transformation := createTransformationTemplate(ctx, aiUpstream) + routeTransformation := &envoytransformation.RouteTransformations_RouteTransformation{ + Match: &envoytransformation.RouteTransformations_RouteTransformation_RequestMatch_{ + RequestMatch: &envoytransformation.RouteTransformations_RouteTransformation_RequestMatch{ + RequestTransformation: &envoytransformation.Transformation{ + // Set this env var to true to log the request/response info for each transformation + LogRequestResponseInfo: wrapperspb.Bool(os.Getenv("AI_PLUGIN_DEBUG_TRANSFORMATIONS") == "true"), + TransformationType: &envoytransformation.Transformation_TransformationTemplate{ + TransformationTemplate: transformation, + }, + }, + }, + }, + } + // Sets the transformation for the Upstream. Can be updated in a route policy is attached. + transformations := &envoytransformation.RouteTransformations{ + Transformations: []*envoytransformation.RouteTransformations_RouteTransformation{routeTransformation}, + } + pCtx.AddTypedConfig(wellknown.AIUpstreamTransformationFilterName, transformations) + + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-llm-provider", + Value: llmProvider, + }, + ) + // If the Upstream specifies a model, add a header to the ext-proc request + if llmModel != "" { + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-llm-model", + Value: llmModel, + }) + } + + // Add the x-request-id header to the ext-proc request. + // This is an optimization to allow us to not have to wait for the headers request to + // Initialize our logger/handler classes. + extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(), + &envoy_config_core_v3.HeaderValue{ + Key: "x-request-id", + Value: "%REQ(X-REQUEST-ID)%", + }, + ) + + pCtx.AddTypedConfig(wellknown.AIExtProcFilterName, extProcRouteSettings) + return nil +} + +func getUpstreamModel(llm *v1alpha1.LLMProvider, byType map[string]struct{}) string { + llmModel := "" + provider := llm.Provider + if provider.OpenAI != nil { + byType["openai"] = struct{}{} + if provider.OpenAI.Model != nil { + llmModel = *provider.OpenAI.Model + } + } else if provider.Anthropic != nil { + byType["anthropic"] = struct{}{} + if provider.Anthropic.Model != nil { + llmModel = *provider.Anthropic.Model + } + } else if provider.AzureOpenAI != nil { + byType["azure_openai"] = struct{}{} + llmModel = provider.AzureOpenAI.DeploymentName + } else if provider.Gemini != nil { + byType["gemini"] = struct{}{} + llmModel = provider.Gemini.Model + } else if provider.VertexAI != nil { + byType["vertex-ai"] = struct{}{} + llmModel = provider.VertexAI.Model + } + return llmModel +} diff --git a/internal/kgateway/extensions2/plugins/upstream/ai/ai_httpfilters.go b/internal/kgateway/extensions2/plugins/upstream/ai/ai_httpfilters.go new file mode 100644 index 00000000000..c26a04ae4af --- /dev/null +++ b/internal/kgateway/extensions2/plugins/upstream/ai/ai_httpfilters.go @@ -0,0 +1,124 @@ +package ai + +import ( + "time" + + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2" + upstream_wait "github.com/solo-io/envoy-gloo/go/config/filter/http/upstream_wait/v2" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/plugins" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" +) + +func AddUpstreamHttpFilters() ([]plugins.StagedUpstreamHttpFilter, error) { + transformationMsg, err := utils.MessageToAny(&envoytransformation.FilterTransformations{}) + if err != nil { + return nil, err + } + + upstreamWaitMsg, err := utils.MessageToAny(&upstream_wait.UpstreamWaitFilterConfig{}) + if err != nil { + return nil, err + } + + filters := []plugins.StagedUpstreamHttpFilter{ + // The wait filter essentially blocks filter iteration until a host has been selected. + // This is important because running as an upstream filter allows access to host + // metadata iff the host has already been selected, and that's a + // major benefit of running the filter at this stage. + { + Filter: &envoy_hcm.HttpFilter{ + Name: waitFilterName, + ConfigType: &envoy_hcm.HttpFilter_TypedConfig{ + TypedConfig: upstreamWaitMsg, + }, + }, + Stage: plugins.UpstreamHTTPFilterStage{ + RelativeTo: plugins.TransformationStage, + Weight: -1, + }, + }, + { + Filter: &envoy_hcm.HttpFilter{ + Name: wellknown.AIUpstreamTransformationFilterName, + ConfigType: &envoy_hcm.HttpFilter_TypedConfig{ + TypedConfig: transformationMsg, + }, + }, + Stage: plugins.UpstreamHTTPFilterStage{ + RelativeTo: plugins.TransformationStage, + Weight: 0, + }, + }, + { + Filter: &envoy_hcm.HttpFilter{ + Name: wellknown.AIPolicyTransformationFilterName, + ConfigType: &envoy_hcm.HttpFilter_TypedConfig{ + TypedConfig: transformationMsg, + }, + }, + Stage: plugins.UpstreamHTTPFilterStage{ + RelativeTo: plugins.TransformationStage, + Weight: 0, + }, + }, + } + return filters, nil +} + +func AddExtprocHTTPFilter() ([]plugins.StagedHttpFilter, error) { + result := []plugins.StagedHttpFilter{} + + // TODO: add ratelimit and jwt_authn if AI Upstream is configured + extProcSettings := &envoy_ext_proc_v3.ExternalProcessor{ + GrpcService: &envoy_config_core_v3.GrpcService{ + Timeout: durationpb.New(5 * time.Second), + RetryPolicy: &envoy_config_core_v3.RetryPolicy{ + NumRetries: wrapperspb.UInt32(3), + }, + TargetSpecifier: &envoy_config_core_v3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &envoy_config_core_v3.GrpcService_EnvoyGrpc{ + ClusterName: extProcUDSClusterName, + }, + }, + }, + ProcessingMode: &envoy_ext_proc_v3.ProcessingMode{ + RequestHeaderMode: envoy_ext_proc_v3.ProcessingMode_SEND, + RequestBodyMode: envoy_ext_proc_v3.ProcessingMode_STREAMED, + RequestTrailerMode: envoy_ext_proc_v3.ProcessingMode_SKIP, + ResponseHeaderMode: envoy_ext_proc_v3.ProcessingMode_SEND, + ResponseBodyMode: envoy_ext_proc_v3.ProcessingMode_STREAMED, + ResponseTrailerMode: envoy_ext_proc_v3.ProcessingMode_SKIP, + }, + MessageTimeout: durationpb.New(5 * time.Second), + MetadataOptions: &envoy_ext_proc_v3.MetadataOptions{ + ForwardingNamespaces: &envoy_ext_proc_v3.MetadataOptions_MetadataNamespaces{ + Untyped: []string{"io.solo.transformation", "envoy.filters.ai.solo.io"}, + Typed: []string{"envoy.filters.ai.solo.io"}, + }, + ReceivingNamespaces: &envoy_ext_proc_v3.MetadataOptions_MetadataNamespaces{ + Untyped: []string{"ai.kgateway.io"}, + }, + }, + } + // Run before rate limiting + stagedFilter, err := plugins.NewStagedFilter( + wellknown.AIExtProcFilterName, + extProcSettings, + plugins.FilterStage[plugins.WellKnownFilterStage]{ + RelativeTo: plugins.RateLimitStage, + Weight: -2, + }, + ) + if err != nil { + return nil, err + } + result = append(result, stagedFilter) + return result, nil +} diff --git a/internal/kgateway/extensions2/plugins/upstream/ai/ai_model_cluster.go b/internal/kgateway/extensions2/plugins/upstream/ai/ai_model_cluster.go new file mode 100644 index 00000000000..a90725b18e1 --- /dev/null +++ b/internal/kgateway/extensions2/plugins/upstream/ai/ai_model_cluster.go @@ -0,0 +1,487 @@ +package ai + +import ( + "context" + "fmt" + "maps" + "slices" + "strings" + + envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/rotisserie/eris" + envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2" + "github.com/solo-io/go-utils/contextutils" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" + + aiutils "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/pluginutils" + + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils" +) + +const ( + tlsPort = 443 +) + +func ProcessAIUpstream(ctx context.Context, in *v1alpha1.AIUpstream, aiSecrets *ir.Secret, out *envoy_config_cluster_v3.Cluster) error { + if in == nil { + return nil + } + + if err := buildModelCluster(ctx, in, aiSecrets, out); err != nil { + return err + } + + return nil +} + +// buildModelCluster builds a cluster for the given AI upstream. +// This function is used by the `ProcessUpstream` function to build the cluster for the AI upstream. +// It is ALSO used by `ProcessRoute` to create the cluster in the event of backup models being used +// and fallbacks being required. +func buildModelCluster(ctx context.Context, aiUs *v1alpha1.AIUpstream, aiSecrets *ir.Secret, out *envoy_config_cluster_v3.Cluster) error { + // set the type to strict dns + out.ClusterDiscoveryType = &envoy_config_cluster_v3.Cluster_Type{ + Type: envoy_config_cluster_v3.Cluster_STRICT_DNS, + } + + // fix issue where ipv6 addr cannot bind + out.DnsLookupFamily = envoy_config_cluster_v3.Cluster_V4_ONLY + + // We are reliant on https://github.com/envoyproxy/envoy/pull/34154 to merge + // before we can do OutlierDetection on 429s here + // out.OutlierDetection = getOutlierDetectionConfig(aiUs) + + var prioritized []*envoy_config_endpoint_v3.LocalityLbEndpoints + var matches []*envoy_config_cluster_v3.Cluster_TransportSocketMatch + var err error + + if aiUs.MultiPool != nil { + epByType := map[string]struct{}{} + tsmByHost := make(map[string]*envoy_config_cluster_v3.Cluster_TransportSocketMatch) + prioritized = make([]*envoy_config_endpoint_v3.LocalityLbEndpoints, 0, len(aiUs.MultiPool.Priorities)) + for idx, pool := range aiUs.MultiPool.Priorities { + eps := make([]*envoy_config_endpoint_v3.LbEndpoint, 0, len(pool.Pool)) + for _, ep := range pool.Pool { + var result *envoy_config_endpoint_v3.LbEndpoint + var tlsContext *envoy_tls_v3.UpstreamTlsContext + var err error + epByType[fmt.Sprintf("%T", ep)] = struct{}{} + if ep.Provider.OpenAI != nil { + result, tlsContext, err = buildOpenAIEndpoint(ep.Provider.OpenAI, ep.HostOverride, aiSecrets) + } else if ep.Provider.Anthropic != nil { + result, tlsContext, err = buildAnthropicEndpoint(ep.Provider.Anthropic, ep.HostOverride, aiSecrets) + } else if ep.Provider.AzureOpenAI != nil { + result, tlsContext, err = buildAzureOpenAIEndpoint(ep.Provider.AzureOpenAI, ep.HostOverride, aiSecrets) + } else if ep.Provider.Gemini != nil { + result, tlsContext, err = buildGeminiEndpoint(ep.Provider.Gemini, ep.HostOverride, aiSecrets) + } else if ep.Provider.VertexAI != nil { + result, tlsContext, err = buildVertexAIEndpoint(ctx, ep.Provider.VertexAI, ep.HostOverride, aiSecrets) + } + if err != nil { + return err + } + eps = append(eps, result) + if tlsContext == nil { + continue + } + if _, ok := tsmByHost[tlsContext.GetSni()]; !ok { + tsm, err := buildTsm(tlsContext) + if err != nil { + return err + } + tsmByHost[tlsContext.GetSni()] = tsm + } + } + priority := idx + prioritized = append(prioritized, &envoy_config_endpoint_v3.LocalityLbEndpoints{ + Priority: uint32(priority), + LbEndpoints: eps, + }) + } + if len(epByType) > 1 { + return eris.Errorf("multi backend pools must all be of the same type, got %v", epByType) + } + slice := slices.Collect(maps.Values(tsmByHost)) + slices.SortStableFunc(slice, func(a, b *envoy_config_cluster_v3.Cluster_TransportSocketMatch) int { + return strings.Compare(a.GetName(), b.GetName()) + }) + out.TransportSocketMatches = append(out.GetTransportSocketMatches(), slice...) + } else if aiUs.LLM != nil { + matches, prioritized, err = buildLLMEndpoint(ctx, aiUs, aiSecrets) + if err != nil { + return err + } + out.TransportSocketMatches = matches + } + // Default match on plaintext if nothing else is added + out.TransportSocketMatches = append(out.GetTransportSocketMatches(), &envoy_config_cluster_v3.Cluster_TransportSocketMatch{ + Name: "plaintext", + TransportSocket: &envoy_config_core_v3.TransportSocket{ + Name: wellknown.TransportSocketRawBuffer, + ConfigType: &envoy_config_core_v3.TransportSocket_TypedConfig{ + TypedConfig: &anypb.Any{ + TypeUrl: "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer", + }, + }, + }, + Match: &structpb.Struct{}, + }) + out.LoadAssignment = &envoy_config_endpoint_v3.ClusterLoadAssignment{ + ClusterName: out.GetName(), + Endpoints: prioritized, + } + + return nil +} + +func buildLLMEndpoint(ctx context.Context, aiUs *v1alpha1.AIUpstream, aiSecrets *ir.Secret) ([]*envoy_config_cluster_v3.Cluster_TransportSocketMatch, []*envoy_config_endpoint_v3.LocalityLbEndpoints, error) { + var tsms []*envoy_config_cluster_v3.Cluster_TransportSocketMatch + var prioritized []*envoy_config_endpoint_v3.LocalityLbEndpoints + provider := aiUs.LLM.Provider + if provider.OpenAI != nil { + host, tlsContext, err := buildOpenAIEndpoint(provider.OpenAI, aiUs.LLM.HostOverride, aiSecrets) + if err != nil { + return nil, nil, err + } + prioritized = []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + {LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{host}}, + } + if tlsContext != nil { + tsm, err := buildTsm(tlsContext) + if err != nil { + return nil, nil, err + } + tsms = append(tsms, tsm) + } + } else if provider.Anthropic != nil { + host, tlsContext, err := buildAnthropicEndpoint(provider.Anthropic, aiUs.LLM.HostOverride, aiSecrets) + if err != nil { + return nil, nil, err + } + prioritized = []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + {LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{host}}, + } + if tlsContext != nil { + tsm, err := buildTsm(tlsContext) + if err != nil { + return nil, nil, err + } + tsms = append(tsms, tsm) + } + } else if provider.AzureOpenAI != nil { + host, tlsContext, err := buildAzureOpenAIEndpoint(provider.AzureOpenAI, aiUs.LLM.HostOverride, aiSecrets) + if err != nil { + return nil, nil, err + } + prioritized = []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + {LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{host}}, + } + if tlsContext != nil { + tsm, err := buildTsm(tlsContext) + if err != nil { + return nil, nil, err + } + tsms = append(tsms, tsm) + } + } else if provider.Gemini != nil { + host, tlsContext, err := buildGeminiEndpoint(provider.Gemini, aiUs.LLM.HostOverride, aiSecrets) + if err != nil { + return nil, nil, err + } + prioritized = []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + {LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{host}}, + } + if tlsContext != nil { + tsm, err := buildTsm(tlsContext) + if err != nil { + return nil, nil, err + } + tsms = append(tsms, tsm) + } + } else if provider.VertexAI != nil { + host, tlsContext, err := buildVertexAIEndpoint(ctx, provider.VertexAI, aiUs.LLM.HostOverride, aiSecrets) + if err != nil { + return nil, nil, err + } + prioritized = []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + {LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{host}}, + } + if tlsContext != nil { + tsm, err := buildTsm(tlsContext) + if err != nil { + return nil, nil, err + } + tsms = append(tsms, tsm) + } + } + return tsms, prioritized, nil +} + +// Build a TransoprtSocketMatch for the given UpstreamTlsContext. +func buildTsm(tlsContext *envoy_tls_v3.UpstreamTlsContext) (*envoy_config_cluster_v3.Cluster_TransportSocketMatch, error) { + typedConfig, err := utils.MessageToAny(tlsContext) + if err != nil { + return nil, err + } + return &envoy_config_cluster_v3.Cluster_TransportSocketMatch{ + Name: "tls_" + tlsContext.GetSni(), + Match: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "tls": structpb.NewStringValue(tlsContext.GetSni()), + }, + }, + TransportSocket: &envoy_config_core_v3.TransportSocket{ + Name: wellknown.TransportSocketTls, + ConfigType: &envoy_config_core_v3.TransportSocket_TypedConfig{TypedConfig: typedConfig}, + }, + }, nil +} + +func buildOpenAIEndpoint(data *v1alpha1.OpenAIConfig, hostOverride *v1alpha1.Host, aiSecrets *ir.Secret) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext, error) { + token, err := aiutils.GetAuthToken(data.AuthToken, aiSecrets) + if err != nil { + return nil, nil, err + } + model := "" + if data.Model != nil { + model = *data.Model + } + ep, host := buildLocalityLbEndpoint( + "api.openai.com", + tlsPort, + hostOverride, + buildEndpointMeta(token, model, nil), + ) + return ep, host, nil +} +func buildAnthropicEndpoint(data *v1alpha1.AnthropicConfig, hostOverride *v1alpha1.Host, aiSecrets *ir.Secret) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext, error) { + token, err := aiutils.GetAuthToken(data.AuthToken, aiSecrets) + if err != nil { + return nil, nil, err + } + model := "" + if data.Model != nil { + model = *data.Model + } + ep, host := buildLocalityLbEndpoint( + "api.anthropic.com", + tlsPort, + hostOverride, + buildEndpointMeta(token, model, nil), + ) + return ep, host, nil +} +func buildAzureOpenAIEndpoint(data *v1alpha1.AzureOpenAIConfig, hostOverride *v1alpha1.Host, aiSecrets *ir.Secret) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext, error) { + token, err := aiutils.GetAuthToken(data.AuthToken, aiSecrets) + if err != nil { + return nil, nil, err + } + ep, host := buildLocalityLbEndpoint( + data.Endpoint, + tlsPort, + hostOverride, + buildEndpointMeta(token, data.DeploymentName, map[string]string{"api_version": data.ApiVersion}), + ) + return ep, host, nil +} +func buildGeminiEndpoint(data *v1alpha1.GeminiConfig, hostOverride *v1alpha1.Host, aiSecrets *ir.Secret) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext, error) { + token, err := aiutils.GetAuthToken(data.AuthToken, aiSecrets) + if err != nil { + return nil, nil, err + } + ep, host := buildLocalityLbEndpoint( + "generativelanguage.googleapis.com", + tlsPort, + hostOverride, + buildEndpointMeta(token, data.Model, map[string]string{"api_version": data.ApiVersion}), + ) + return ep, host, nil +} +func buildVertexAIEndpoint(ctx context.Context, data *v1alpha1.VertexAIConfig, hostOverride *v1alpha1.Host, aiSecrets *ir.Secret) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext, error) { + token, err := aiutils.GetAuthToken(data.AuthToken, aiSecrets) + if err != nil { + return nil, nil, err + } + var publisher string + switch data.Publisher { + case v1alpha1.GOOGLE: + publisher = "google" + default: + // TODO(npolshak): add support for other publishers + contextutils.LoggerFrom(ctx).Warnf("unsupported Vertex AI publisher: %v. Defaulting to Google.", data.Publisher) + publisher = "google" + } + ep, host := buildLocalityLbEndpoint( + fmt.Sprintf("%s-aiplatform.googleapis.com", data.Location), + tlsPort, + hostOverride, + buildEndpointMeta(token, data.Model, map[string]string{"api_version": data.ApiVersion, "location": data.Location, "project": data.ProjectId, "publisher": publisher}), + ) + return ep, host, nil +} +func buildLocalityLbEndpoint( + host string, + port int32, + customHost *v1alpha1.Host, + metadata *envoy_config_core_v3.Metadata, +) (*envoy_config_endpoint_v3.LbEndpoint, *envoy_tls_v3.UpstreamTlsContext) { + if customHost != nil { + if customHost.Host != "" { + host = customHost.Host + } + if customHost.Port != 0 { + port = int32(customHost.Port) + } + } + var tlsContext *envoy_tls_v3.UpstreamTlsContext + if port == tlsPort { + // Used for transport socket matching + metadata.GetFilterMetadata()["envoy.transport_socket_match"] = &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "tls": structpb.NewStringValue(host), + }, + } + tlsContext = &envoy_tls_v3.UpstreamTlsContext{ + CommonTlsContext: &envoy_tls_v3.CommonTlsContext{}, + Sni: host, + } + } + return &envoy_config_endpoint_v3.LbEndpoint{ + Metadata: metadata, + HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{ + Endpoint: &envoy_config_endpoint_v3.Endpoint{ + Hostname: host, + Address: &envoy_config_core_v3.Address{ + Address: &envoy_config_core_v3.Address_SocketAddress{ + SocketAddress: &envoy_config_core_v3.SocketAddress{ + Protocol: envoy_config_core_v3.SocketAddress_TCP, + Address: host, + PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{ + PortValue: uint32(port), + }, + }, + }, + }, + }, + }, + }, tlsContext +} + +// `buildEndpointMeta` builds the metadata for the endpoint. +// This metadata is used by the post routing transformation filter to modify the request body. +func buildEndpointMeta(token, model string, additionalFields map[string]string) *envoy_config_core_v3.Metadata { + fields := map[string]*structpb.Value{ + "auth_token": structpb.NewStringValue(token), + } + if model != "" { + fields["model"] = structpb.NewStringValue(model) + } + for k, v := range additionalFields { + fields[k] = structpb.NewStringValue(v) + } + return &envoy_config_core_v3.Metadata{ + FilterMetadata: map[string]*structpb.Struct{ + "io.solo.transformation": { + Fields: fields, + }, + }, + } +} + +func createTransformationTemplate(ctx context.Context, aiUpstream *v1alpha1.AIUpstream) *envoytransformation.TransformationTemplate { + // Setup initial transformation template. This may be modified by further + transformationTemplate := &envoytransformation.TransformationTemplate{ + // We will add the auth token later + Headers: map[string]*envoytransformation.InjaTemplate{}, + } + + var headerName, prefix, path string + var bodyTransformation *envoytransformation.TransformationTemplate_MergeJsonKeys + if aiUpstream.LLM != nil { + headerName, prefix, path, bodyTransformation = getTransformation(ctx, aiUpstream.LLM) + } else if aiUpstream.MultiPool != nil { + // We already know that all the backends are the same type so we can just take the first one + llmMultiPool := aiUpstream.MultiPool.Priorities[0].Pool[0] + headerName, prefix, path, bodyTransformation = getTransformation(ctx, &llmMultiPool) + } + transformationTemplate.GetHeaders()[headerName] = &envoytransformation.InjaTemplate{ + Text: prefix + `{% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% else %}{{dynamic_metadata("auth_token","ai.kgateway.io")}}{% endif %}`, + } + transformationTemplate.GetHeaders()[":path"] = &envoytransformation.InjaTemplate{ + Text: path, + } + transformationTemplate.BodyTransformation = bodyTransformation + return transformationTemplate +} + +func getTransformation(ctx context.Context, llm *v1alpha1.LLMProvider) (string, string, string, *envoytransformation.TransformationTemplate_MergeJsonKeys) { + headerName := "Authorization" + var prefix, path string + var bodyTransformation *envoytransformation.TransformationTemplate_MergeJsonKeys + provider := llm.Provider + if provider.OpenAI != nil { + prefix = "Bearer " + path = "/v1/chat/completions" + bodyTransformation = defaultBodyTransformation() + } else if provider.Anthropic != nil { + headerName = "x-api-key" + path = "/v1/messages" + bodyTransformation = defaultBodyTransformation() + } else if provider.AzureOpenAI != nil { + headerName = "api-key" + path = `/openai/deployments/{{ host_metadata("model") }}/chat/completions?api-version={{ host_metadata("api_version" )}}` + } else if provider.Gemini != nil { + headerName = "key" + path = getGeminiPath() + } else if provider.VertexAI != nil { + prefix = "Bearer " + var modelPath string + modelCall := provider.VertexAI.ModelPath + if modelCall == nil { + switch provider.VertexAI.Publisher { + case v1alpha1.GOOGLE: + modelPath = getVertexAIGeminiModelPath() + default: + // TODO(npolshak): add support for other publishers + contextutils.LoggerFrom(ctx).Warnf("Unsupported Vertex AI publisher: %v. Defaulting to Google", provider.VertexAI.Publisher) + modelPath = getVertexAIGeminiModelPath() + } + } else { + // Use user provided model path + modelPath = fmt.Sprintf(`models/{{host_metadata("model")}}:%s`, *modelCall) + } + // https://${LOCATION}-aiplatform.googleapis.com/{VERSION}/projects/${PROJECT_ID}/locations/${LOCATION}/ + path = fmt.Sprintf(`/{{host_metadata("api_version")}}/projects/{{host_metadata("project")}}/locations/{{host_metadata("location")}}/publishers/{{host_metadata("publisher")}}/%s`, modelPath) + } + return headerName, prefix, path, bodyTransformation +} + +func getGeminiPath() string { + return `/{{host_metadata("api_version")}}/models/{{host_metadata("model")}}:{% if host_metadata("route_type") == "CHAT_STREAMING" %}streamGenerateContent?key={{host_metadata("auth_token")}}&alt=sse{% else %}generateContent?key={{host_metadata("auth_token")}}{% endif %}` +} + +func getVertexAIGeminiModelPath() string { + return `models/{{host_metadata("model")}}:{% if host_metadata("route_type") == "CHAT_STREAMING" %}streamGenerateContent?alt=sse{% else %}generateContent{% endif %}` +} + +func defaultBodyTransformation() *envoytransformation.TransformationTemplate_MergeJsonKeys { + return &envoytransformation.TransformationTemplate_MergeJsonKeys{ + MergeJsonKeys: &envoytransformation.MergeJsonKeys{ + JsonKeys: map[string]*envoytransformation.MergeJsonKeys_OverridableTemplate{ + "model": { + Tmpl: &envoytransformation.InjaTemplate{ + // Merge the model into the body + Text: `{% if host_metadata("model") != "" %}"{{host_metadata("model")}}"{% else %}"{{model}}"{% endif %}`, + }, + }, + }, + }, + } +} diff --git a/internal/kgateway/extensions2/plugins/upstream/ai/ai_resources.go b/internal/kgateway/extensions2/plugins/upstream/ai/ai_resources.go new file mode 100644 index 00000000000..38336cc37fd --- /dev/null +++ b/internal/kgateway/extensions2/plugins/upstream/ai/ai_resources.go @@ -0,0 +1,81 @@ +package ai + +import ( + "os" + "strconv" + "strings" + + envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" +) + +const ( + extProcUDSClusterName = "ai_ext_proc_uds_cluster" + extProcUDSSocketPath = "@kgateway-ai-sock" + waitFilterName = "io.kgateway.wait" +) + +func GetAIAdditionalResources() []*envoy_config_cluster_v3.Cluster { + // This env var can be used to test the ext-proc filter locally. + // On linux this should be set to `172.17.0.1` and on mac to `host.docker.internal` + // Note: Mac doesn't work yet because it needs to be a DNS cluster + // The port can be whatever you want. + // When running the ext-proc filter locally, you also need to set + // `LISTEN_ADDR` to `0.0.0.0:PORT`. Where port is the same port as above. + listenAddr := strings.Split(os.Getenv("AI_PLUGIN_LISTEN_ADDR"), ":") + + var ep *envoy_config_endpoint_v3.LbEndpoint + if len(listenAddr) == 2 { + port, _ := strconv.Atoi(listenAddr[1]) + ep = &envoy_config_endpoint_v3.LbEndpoint{ + HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{ + Endpoint: &envoy_config_endpoint_v3.Endpoint{ + Address: &envoy_config_core_v3.Address{ + Address: &envoy_config_core_v3.Address_SocketAddress{ + SocketAddress: &envoy_config_core_v3.SocketAddress{ + Address: listenAddr[0], + PortSpecifier: &envoy_config_core_v3.SocketAddress_PortValue{ + PortValue: uint32(port), + }, + }, + }, + }, + }, + }, + } + } else { + ep = &envoy_config_endpoint_v3.LbEndpoint{ + HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{ + Endpoint: &envoy_config_endpoint_v3.Endpoint{ + Address: &envoy_config_core_v3.Address{ + Address: &envoy_config_core_v3.Address_Pipe{ + Pipe: &envoy_config_core_v3.Pipe{ + Path: extProcUDSSocketPath, + }, + }, + }, + }, + }, + } + } + udsCluster := &envoy_config_cluster_v3.Cluster{ + Name: extProcUDSClusterName, + ClusterDiscoveryType: &envoy_config_cluster_v3.Cluster_Type{ + Type: envoy_config_cluster_v3.Cluster_STATIC, + }, + Http2ProtocolOptions: &envoy_config_core_v3.Http2ProtocolOptions{}, + LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{ + ClusterName: extProcUDSClusterName, + Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{ + { + LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{ + ep, + }, + }, + }, + }, + } + // Add UDS cluster for the ext-proc filter + return []*envoy_config_cluster_v3.Cluster{udsCluster} +} diff --git a/internal/kgateway/extensions2/plugins/upstream/aws.go b/internal/kgateway/extensions2/plugins/upstream/aws.go index 37499b8c8ab..458c9e858b6 100644 --- a/internal/kgateway/extensions2/plugins/upstream/aws.go +++ b/internal/kgateway/extensions2/plugins/upstream/aws.go @@ -122,11 +122,7 @@ func (p *upstreamPlugin) processBackendAws( //UnwrapAsAlb: destination.GetUnwrapAsAlb(), //TransformerConfig: transformerConfig, } - lambdaRouteFuncAny, err := anypb.New(lambdaRouteFunc) - if err != nil { - return err - } - pCtx.AddTypedConfig(FilterName, lambdaRouteFuncAny) + pCtx.AddTypedConfig(FilterName, lambdaRouteFunc) return nil } diff --git a/internal/kgateway/extensions2/plugins/upstream/plugin.go b/internal/kgateway/extensions2/plugins/upstream/plugin.go index 38121a11349..1bd3fa11943 100644 --- a/internal/kgateway/extensions2/plugins/upstream/plugin.go +++ b/internal/kgateway/extensions2/plugins/upstream/plugin.go @@ -3,40 +3,43 @@ package upstream import ( "bytes" "context" + "fmt" "maps" "time" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" - envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" awspb "github.com/solo-io/envoy-gloo/go/config/filter/http/aws_lambda/v2" + "github.com/solo-io/go-utils/contextutils" skubeclient "istio.io/istio/pkg/config/schema/kubeclient" "istio.io/istio/pkg/kube/kclient" "istio.io/istio/pkg/kube/krt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - gwv1 "sigs.k8s.io/gateway-api/apis/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/common" extensionsplug "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/plugin" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/plugins/upstream/ai" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/pluginutils" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/plugins" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown" "github.com/kgateway-dev/kgateway/v2/pkg/client/clientset/versioned" ) const ( - ParameterGroup = "gloo.solo.io" + ParameterGroup = "kgateway.io" ParameterKind = "Parameter" -) -const ( - ExtensionName = "Upstream" - FilterName = "io.solo.aws_lambda" + + FilterName = "io.solo.aws_lambda" ) var ( @@ -63,14 +66,16 @@ func (d *upstreamDestination) Equals(in any) bool { } type UpstreamIr struct { - AwsSecret *ir.Secret + AwsSecret *ir.Secret + AISecret *ir.Secret + AIMultiSecret map[string]*ir.Secret } -func (u *UpstreamIr) data() map[string][]byte { - if u.AwsSecret == nil { +func data(s *ir.Secret) map[string][]byte { + if s == nil { return nil } - return u.AwsSecret.Data + return s.Data } func (u *UpstreamIr) Equals(other any) bool { @@ -78,13 +83,30 @@ func (u *UpstreamIr) Equals(other any) bool { if !ok { return false } - return maps.EqualFunc(u.data(), otherUpstream.data(), func(a, b []byte) bool { + if !maps.EqualFunc(data(u.AwsSecret), data(otherUpstream.AwsSecret), func(a, b []byte) bool { return bytes.Equal(a, b) - }) + }) { + return false + } + if !maps.EqualFunc(data(u.AISecret), data(otherUpstream.AISecret), func(a, b []byte) bool { + return bytes.Equal(a, b) + }) { + return false + } + if !maps.EqualFunc(u.AIMultiSecret, otherUpstream.AIMultiSecret, func(a, b *ir.Secret) bool { + return maps.EqualFunc(data(a), data(b), func(a, b []byte) bool { + return bytes.Equal(a, b) + }) + }) { + return false + } + + return true } type upstreamPlugin struct { - needFilter map[string]bool + needFilter map[string]bool + aiGatewayEnabled map[string]bool } func registerTypes(ourCli versioned.Interface) { @@ -106,7 +128,7 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi col := krt.WrapClient(kclient.New[*v1alpha1.Upstream](commoncol.Client), commoncol.KrtOpts.ToOptions("Upstreams")...) gk := v1alpha1.UpstreamGVK.GroupKind() - translate := buildTranslateFunc(commoncol.Secrets) + translate := buildTranslateFunc(ctx, commoncol.Secrets) ucol := krt.NewCollection(col, func(krtctx krt.HandlerContext, i *v1alpha1.Upstream) *ir.Upstream { // resolve secrets return &ir.Upstream{ @@ -137,6 +159,7 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi }, }, ContributesPolicies: map[schema.GroupKind]extensionsplug.PolicyPlugin{ + // TODO: remove Parameters? ParameterGK: { Name: "upstream", NewGatewayTranslationPass: newPlug, @@ -148,29 +171,81 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi } }, }, + v1alpha1.UpstreamGVK.GroupKind(): { + Name: "upstream", + NewGatewayTranslationPass: newPlug, + // AttachmentPoints: []ir.AttachmentPoints{ir.HttpBackendRefAttachmentPoint}, + PoliciesFetch: func(n, ns string) ir.PolicyIR { + // virtual policy - we don't have a real policy object + return &upstreamDestination{ + FunctionName: n, + } + }, + }, }, } } -func buildTranslateFunc(secrets *krtcollections.SecretIndex) func(krtctx krt.HandlerContext, i *v1alpha1.Upstream) *UpstreamIr { +func buildTranslateFunc(ctx context.Context, secrets *krtcollections.SecretIndex) func(krtctx krt.HandlerContext, i *v1alpha1.Upstream) *UpstreamIr { return func(krtctx krt.HandlerContext, i *v1alpha1.Upstream) *UpstreamIr { // resolve secrets - var ir UpstreamIr + var upstreamIr UpstreamIr if i.Spec.Aws != nil { ns := i.GetNamespace() - secretRef := gwv1.SecretObjectReference{ - Name: gwv1.ObjectName(i.Spec.Aws.SecretRef.Name), + secret, err := pluginutils.GetSecretIr(secrets, krtctx, i.Spec.Aws.SecretRef.Name, ns) + if err != nil { + contextutils.LoggerFrom(ctx).Error(err) } - secret, _ := secrets.GetSecret(krtctx, krtcollections.From{GroupKind: v1alpha1.UpstreamGVK.GroupKind(), Namespace: ns}, secretRef) - if secret != nil { - ir.AwsSecret = secret - } else { - // TODO: handle error and write it to status - // return error + upstreamIr.AwsSecret = secret + } + if i.Spec.AI != nil { + ns := i.GetNamespace() + if i.Spec.AI.LLM != nil { + secretRef := getAISecretRef(i.Spec.AI.LLM.Provider) + // if secretRef is used, set the secret on the upstream ir + if secretRef != nil { + secret, err := pluginutils.GetSecretIr(secrets, krtctx, secretRef.Name, ns) + if err != nil { + contextutils.LoggerFrom(ctx).Error(err) + } + upstreamIr.AISecret = secret + } + } else if i.Spec.AI.MultiPool != nil { + upstreamIr.AIMultiSecret = map[string]*ir.Secret{} + for idx, priority := range i.Spec.AI.MultiPool.Priorities { + for jdx, pool := range priority.Pool { + secretRef := getAISecretRef(pool.Provider) + // if secretRef is used, set the secret on the upstream ir + if secretRef != nil { + secret, err := pluginutils.GetSecretIr(secrets, krtctx, secretRef.Name, ns) + if err != nil { + contextutils.LoggerFrom(ctx).Error(err) + } + upstreamIr.AIMultiSecret[getMultiPoolSecretKey(idx, jdx, secretRef.Name)] = secret + } + } + } } } - return &ir + return &upstreamIr + } +} + +func getAISecretRef(llm v1alpha1.SupportedLLMProvider) *corev1.LocalObjectReference { + var secretRef *corev1.LocalObjectReference + if llm.OpenAI != nil { + secretRef = llm.OpenAI.AuthToken.SecretRef + } else if llm.Anthropic != nil { + secretRef = llm.Anthropic.AuthToken.SecretRef + } else if llm.AzureOpenAI != nil { + secretRef = llm.AzureOpenAI.AuthToken.SecretRef + } else if llm.Gemini != nil { + secretRef = llm.Gemini.AuthToken.SecretRef + } else if llm.VertexAI != nil { + secretRef = llm.VertexAI.AuthToken.SecretRef } + + return secretRef } func processUpstream(ctx context.Context, in ir.Upstream, out *envoy_config_cluster_v3.Cluster) { @@ -193,13 +268,19 @@ func processUpstream(ctx context.Context, in ir.Upstream, out *envoy_config_clus processStatic(ctx, spec.Static, out) case spec.Aws != nil: processAws(ctx, spec.Aws, ir, out) + case spec.AI != nil: + err := ai.ProcessAIUpstream(ctx, spec.AI, ir.AISecret, out) + if err != nil { + // TODO: report error on status + contextutils.LoggerFrom(ctx).Error(err) + } } } func hostname(in *v1alpha1.Upstream) string { if in.Spec.Static != nil { if len(in.Spec.Static.Hosts) > 0 { - return string(in.Spec.Static.Hosts[0].Host) + return in.Spec.Static.Hosts[0].Host } } return "" @@ -242,6 +323,37 @@ func (p *upstreamPlugin) ApplyForRoute(ctx context.Context, pCtx *ir.RouteContex return nil } +// Run on upstream, regardless of policy (based on upstream gvk) +// share route proto message +func (p *upstreamPlugin) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + upstream := pCtx.Upstream.Obj.(*v1alpha1.Upstream) + if upstream.Spec.AI != nil { + err := ai.ApplyAIBackend(ctx, upstream.Spec.AI, pCtx, in, out) + if err != nil { + return err + } + + if p.aiGatewayEnabled == nil { + p.aiGatewayEnabled = make(map[string]bool) + } + p.aiGatewayEnabled[pCtx.FilterChainName] = true + } else { + // If it's not an AI route we want to disable our ext-proc filter just in case. + // This will have no effect if we don't add the listener filter + disabledExtprocSettings := &envoy_ext_proc_v3.ExtProcPerRoute{ + Override: &envoy_ext_proc_v3.ExtProcPerRoute_Disabled{ + Disabled: true, + }, + } + pCtx.AddTypedConfig(wellknown.AIExtProcFilterName, disabledExtprocSettings) + } + + return nil +} + +// Only called if policy attatched (extension ref) +// Can implement in route policy for ai (prompt guard, etc.) +// Alt. apply regardless if policy is present...? func (p *upstreamPlugin) ApplyForRouteBackend( ctx context.Context, policy ir.PolicyIR, pCtx *ir.RouteBackendContext, @@ -251,6 +363,9 @@ func (p *upstreamPlugin) ApplyForRouteBackend( return nil // todo: should we return fmt.Errorf("internal error: policy is not a upstreamDestination") } + + // TODO: AI config for ApplyToRouteBackend + return p.processBackendAws(ctx, pCtx, pol) } @@ -258,20 +373,36 @@ func (p *upstreamPlugin) ApplyForRouteBackend( // if a plugin emits new filters, they must be with a plugin unique name. // any filter returned from route config must be disabled, so it doesnt impact other routes. func (p *upstreamPlugin) HttpFilters(ctx context.Context, fc ir.FilterChainCommon) ([]plugins.StagedHttpFilter, error) { - if !p.needFilter[fc.FilterChainName] { - return nil, nil + result := []plugins.StagedHttpFilter{} + + if p.aiGatewayEnabled[fc.FilterChainName] { + aiFilters, err := ai.AddExtprocHTTPFilter() + if err != nil { + return nil, err + } + result = append(result, aiFilters...) } - filterConfig := &awspb.AWSLambdaConfig{} - pluginStage := plugins.DuringStage(plugins.OutAuthStage) - f, _ := plugins.NewStagedFilter(FilterName, filterConfig, pluginStage) + if p.needFilter[fc.FilterChainName] { + filterConfig := &awspb.AWSLambdaConfig{} + pluginStage := plugins.DuringStage(plugins.OutAuthStage) + f, _ := plugins.NewStagedFilter(FilterName, filterConfig, pluginStage) - return []plugins.StagedHttpFilter{ - f, - }, nil + result = append(result, f) + } + return result, nil } -func (p *upstreamPlugin) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { - return nil, nil +func (p *upstreamPlugin) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { + filters := []plugins.StagedUpstreamHttpFilter{} + if p.aiGatewayEnabled[fcc.FilterChainName] { + aiFilters, err := ai.AddUpstreamHttpFilters() + if err != nil { + return nil, err + } + filters = append(filters, aiFilters...) + } + + return filters, nil } func (p *upstreamPlugin) NetworkFilters(ctx context.Context) ([]plugins.StagedNetworkFilter, error) { @@ -280,5 +411,18 @@ func (p *upstreamPlugin) NetworkFilters(ctx context.Context) ([]plugins.StagedNe // called 1 time (per envoy proxy). replaces GeneratedResources func (p *upstreamPlugin) ResourcesToAdd(ctx context.Context) ir.Resources { - return ir.Resources{} + var additionalClusters []*envoy_config_cluster_v3.Cluster + + if len(p.aiGatewayEnabled) > 0 { + aiClusters := ai.GetAIAdditionalResources() + + additionalClusters = append(additionalClusters, aiClusters...) + } + return ir.Resources{ + Clusters: additionalClusters, + } +} + +func getMultiPoolSecretKey(priorityIdx, poolIdx int, secretName string) string { + return fmt.Sprintf("%d-%d-%s", priorityIdx, poolIdx, secretName) } diff --git a/internal/kgateway/extensions2/pluginutils/secrets.go b/internal/kgateway/extensions2/pluginutils/secrets.go new file mode 100644 index 00000000000..e3a4d6f620f --- /dev/null +++ b/internal/kgateway/extensions2/pluginutils/secrets.go @@ -0,0 +1,25 @@ +package pluginutils + +import ( + "fmt" + + "github.com/rotisserie/eris" + "istio.io/istio/pkg/kube/krt" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections" +) + +func GetSecretIr(secrets *krtcollections.SecretIndex, krtctx krt.HandlerContext, secretName, ns string) (*ir.Secret, error) { + secretRef := gwv1.SecretObjectReference{ + Name: gwv1.ObjectName(secretName), + } + secret, err := secrets.GetSecret(krtctx, krtcollections.From{GroupKind: v1alpha1.UpstreamGVK.GroupKind(), Namespace: ns}, secretRef) + if secret != nil { + return secret, nil + } else { + return nil, eris.Wrapf(err, fmt.Sprintf("unable to find the secret %s", secretRef.Name)) + } +} diff --git a/internal/kgateway/extensions2/pluginutils/utils.go b/internal/kgateway/extensions2/pluginutils/utils.go new file mode 100644 index 00000000000..b8977d913c5 --- /dev/null +++ b/internal/kgateway/extensions2/pluginutils/utils.go @@ -0,0 +1,62 @@ +package pluginutils + +import ( + "errors" + "fmt" + "strings" + "unicode/utf8" + + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" + "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" +) + +const ( + AuthKey = "Authorization" +) + +func GetAuthToken(in v1alpha1.SingleAuthToken, aiSecrets *ir.Secret) (token string, err error) { + switch in.Kind { + case v1alpha1.Inline: + if in.Inline == nil { + return "", errors.New("inline auth token must be set if kind is type Inline") + } + token = *in.Inline + case v1alpha1.SecretRef: + if aiSecrets == nil { + return "", fmt.Errorf("secret not found for %s", in.SecretRef.Name) + } + secret, err := deriveHeaderSecret(aiSecrets) + if err != nil { + return "", err + } + token = getTokenFromHeaderSecret(secret) + } + return token, err +} + +type headerSecretDerivation struct { + authorization string +} + +// deriveHeaderSecret from ingest if we are using a kubernetes secretref +// Named returns with the derived string contents or an error due to retrieval or format. +func deriveHeaderSecret(aiSecrets *ir.Secret) (headerSecretDerivation, error) { + var errs []error + derived := headerSecretDerivation{ + authorization: string(aiSecrets.Data[AuthKey]), + } + if derived.authorization == "" || !utf8.Valid([]byte(derived.authorization)) { + // err is nil here but this is still safe + errs = append(errs, errors.New("access_key is not a valid string")) + } + return derived, errors.Join(errs...) +} + +// `getTokenFromHeaderSecret` retrieves the auth token from the secret reference. +// Currently, this function will return an error if there are more than one header in the secret +// as we do not know which one to select. +// In addition, this function will strip the "Bearer " prefix from the token as it will get conditionally +// added later depending on the provider. +func getTokenFromHeaderSecret(secret headerSecretDerivation) string { + return strings.TrimPrefix(secret.authorization, "Bearer ") +} diff --git a/internal/kgateway/ir/iface.go b/internal/kgateway/ir/iface.go index f18d2635642..0def27fb098 100644 --- a/internal/kgateway/ir/iface.go +++ b/internal/kgateway/ir/iface.go @@ -9,7 +9,7 @@ import ( envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - anypb "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/proto" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/plugins" @@ -25,14 +25,21 @@ type RouteBackendContext struct { FilterChainName string Upstream *Upstream // todo: make this not public - TypedFiledConfig *map[string]*anypb.Any + TypedFilterConfig *map[string]proto.Message } -func (r *RouteBackendContext) AddTypedConfig(key string, v *anypb.Any) { - if *r.TypedFiledConfig == nil { - *r.TypedFiledConfig = make(map[string]*anypb.Any) +func (r *RouteBackendContext) AddTypedConfig(key string, v proto.Message) { + if *r.TypedFilterConfig == nil { + *r.TypedFilterConfig = make(map[string]proto.Message) } - (*r.TypedFiledConfig)[key] = v + (*r.TypedFilterConfig)[key] = v +} + +func (r *RouteBackendContext) GetConfig(key string) proto.Message { + if *r.TypedFilterConfig == nil { + return nil + } + return (*r.TypedFilterConfig)[key] } type RouteContext struct { @@ -67,16 +74,25 @@ type ProxyTranslationPass interface { ctx context.Context, pCtx *RouteContext, out *envoy_config_route_v3.Route) error + // runs for policy applied ApplyForRouteBackend( ctx context.Context, policy PolicyIR, pCtx *RouteBackendContext, ) error + // no policy applied + ApplyForBackend( + ctx context.Context, + pCtx *RouteBackendContext, + in HttpBackend, + out *envoy_config_route_v3.Route, + ) error + // called 1 time per listener // if a plugin emits new filters, they must be with a plugin unique name. // any filter returned from route config must be disabled, so it doesnt impact other routes. HttpFilters(ctx context.Context, fc FilterChainCommon) ([]plugins.StagedHttpFilter, error) - UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) + UpstreamHttpFilters(ctx context.Context, fc FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) NetworkFilters(ctx context.Context) ([]plugins.StagedNetworkFilter, error) // called 1 time (per envoy proxy). replaces GeneratedResources @@ -84,7 +100,7 @@ type ProxyTranslationPass interface { } type Resources struct { - Clusters []envoy_config_cluster_v3.Cluster + Clusters []*envoy_config_cluster_v3.Cluster } type GwTranslationCtx struct { diff --git a/internal/kgateway/ir/routes.go b/internal/kgateway/ir/routes.go index c1bc1e6a4be..afde33c14a1 100644 --- a/internal/kgateway/ir/routes.go +++ b/internal/kgateway/ir/routes.go @@ -63,11 +63,18 @@ func (c HttpRouteIR) backendsEqual(in HttpRouteIR) bool { return false } for j, backend := range backendsa { - if backend.Backend == nil && backendsb[j].Backend == nil { + otherbackend := backendsb[j] + if backend.Backend == nil && otherbackend.Backend == nil { continue } - if backend.Backend != nil && backendsb[j].Backend != nil { - if backend.Backend.ClusterName != backendsb[j].Backend.ClusterName { + if backend.Backend != nil && otherbackend.Backend != nil { + if backend.Backend.ClusterName != otherbackend.Backend.ClusterName { + return false + } + if backend.Backend.Weight != otherbackend.Backend.Weight { + return false + } + if !backend.AttachedPolicies.Equals(otherbackend.AttachedPolicies) { return false } } else { diff --git a/internal/kgateway/ir/upstream.go b/internal/kgateway/ir/upstream.go index c8c91bf9668..b630e6d2dee 100644 --- a/internal/kgateway/ir/upstream.go +++ b/internal/kgateway/ir/upstream.go @@ -54,7 +54,7 @@ type Upstream struct { // optional port for if ObjectSource is a service that can have multiple ports. Port int32 - // prefix the cluster name with this string to distringuish it from other GVKs. + // prefix the cluster name with this string to distinguish it from other GVKs. // here explicitly as it shows up in stats. each (group, kind) pair should have a unique prefix. GvPrefix string // for things that integrate with destination rule, we need to know what hostname to use. @@ -78,7 +78,7 @@ func UpstreamResourceName(objSource ObjectSource, port int32) string { } func (c Upstream) Equals(in Upstream) bool { - return c.ObjectSource.Equals(in.ObjectSource) && versionEquals(c.Obj, in.Obj) && c.AttachedPolicies.Equals(in.AttachedPolicies) + return c.ObjectSource.Equals(in.ObjectSource) && versionEquals(c.Obj, in.Obj) && c.ObjIr.Equals(in.ObjIr) && c.AttachedPolicies.Equals(in.AttachedPolicies) } func (c Upstream) ClusterName() string { diff --git a/internal/kgateway/krtcollections/builtin.go b/internal/kgateway/krtcollections/builtin.go index d9341456dca..b0e61fcb53d 100644 --- a/internal/kgateway/krtcollections/builtin.go +++ b/internal/kgateway/krtcollections/builtin.go @@ -58,6 +58,11 @@ func (d *builtinPlugin) Equals(in any) bool { type builtinPluginGwPass struct { } +func (p *builtinPluginGwPass) ApplyForBackend(ctx context.Context, pCtx *ir.RouteBackendContext, in ir.HttpBackend, out *envoy_config_route_v3.Route) error { + // no op + return nil +} + func (p *builtinPluginGwPass) ApplyHCM(ctx context.Context, pCtx *ir.HcmContext, out *envoyhttp.HttpConnectionManager) error { // no-op return nil @@ -427,7 +432,7 @@ func (p *builtinPluginGwPass) HttpFilters(ctx context.Context, fcc ir.FilterChai return nil, nil } -func (p *builtinPluginGwPass) UpstreamHttpFilters(ctx context.Context) ([]plugins.StagedUpstreamHttpFilter, error) { +func (p *builtinPluginGwPass) UpstreamHttpFilters(ctx context.Context, fcc ir.FilterChainCommon) ([]plugins.StagedUpstreamHttpFilter, error) { return nil, nil } diff --git a/internal/kgateway/krtcollections/secrets.go b/internal/kgateway/krtcollections/secrets.go index 095234cf9e8..1aaa35b88d4 100644 --- a/internal/kgateway/krtcollections/secrets.go +++ b/internal/kgateway/krtcollections/secrets.go @@ -61,9 +61,9 @@ func (s *SecretIndex) GetSecret(kctx krt.HandlerContext, from From, secretRef gw if !s.refgrants.ReferenceAllowed(kctx, from.GroupKind, from.Namespace, to) { return nil, ErrMissingReferenceGrant } - up := krt.FetchOne(kctx, col, krt.FilterKey(to.ResourceName())) - if up == nil { + secret := krt.FetchOne(kctx, col, krt.FilterKey(to.ResourceName())) + if secret == nil { return nil, &NotFoundError{NotFoundObj: to} } - return up, nil + return secret, nil } diff --git a/internal/kgateway/setup/ggv2setup_test.go b/internal/kgateway/setup/ggv2setup_test.go index cae3321da02..aac4bef7afe 100644 --- a/internal/kgateway/setup/ggv2setup_test.go +++ b/internal/kgateway/setup/ggv2setup_test.go @@ -21,13 +21,6 @@ import ( envoylistener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoyhttp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - jsonpb "google.golang.org/protobuf/encoding/protojson" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/go-logr/zapr" "github.com/solo-io/go-utils/contextutils" @@ -36,11 +29,17 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" + jsonpb "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" istiokube "istio.io/istio/pkg/kube" "istio.io/istio/pkg/kube/krt" "istio.io/istio/pkg/slices" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/yaml" @@ -207,6 +206,10 @@ func TestScenarios(t *testing.T) { t.Fatalf("failed to read dir: %v", err) } for _, f := range files { + if !strings.Contains(f.Name(), "ai-deepseek") { + continue + } + // run tests with the yaml files (but not -out.yaml files)/s parentT := t if strings.HasSuffix(f.Name(), ".yaml") && !strings.HasSuffix(f.Name(), "-out.yaml") { diff --git a/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml b/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml new file mode 100644 index 00000000000..6f3f67e2f31 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-anthropic-passthrough-out.yaml @@ -0,0 +1,242 @@ +clusters: +- http2ProtocolOptions: {} + loadAssignment: + clusterName: ai_ext_proc_uds_cluster + endpoints: + - lbEndpoints: + - endpoint: + address: + pipe: + path: '@kgateway-ai-sock' + name: ai_ext_proc_uds_cluster + type: STATIC +- connectTimeout: 5s + edsClusterConfig: + edsConfig: + ads: {} + resourceApiVersion: V3 + metadata: {} + name: kube_default_kubernetes_443 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + sni: outbound_.443_._.kubernetes.default.svc.cluster.local + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: EDS +- connectTimeout: 5s + dnsLookupFamily: V4_ONLY + loadAssignment: + clusterName: upstream_gwtest_anthropic_0 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: api.anthropic.com + portValue: 443 + hostname: api.anthropic.com + metadata: + filterMetadata: + envoy.transport_socket_match: + tls: api.anthropic.com + io.solo.transformation: + auth_token: "" + metadata: {} + name: upstream_gwtest_anthropic_0 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: STRICT_DNS +listeners: +- address: + socketAddress: + address: '::' + ipv4Compat: true + portValue: 8080 + filterChains: + - filters: + - name: envoy.filters.network.http_connection_manager + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + httpFilters: + - name: ai.extproc.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + grpcService: + envoyGrpc: + clusterName: ai_ext_proc_uds_cluster + retryPolicy: + numRetries: 3 + timeout: 5s + messageTimeout: 5s + metadataOptions: + forwardingNamespaces: + typed: + - envoy.filters.ai.solo.io + untyped: + - io.solo.transformation + - envoy.filters.ai.solo.io + receivingNamespaces: + untyped: + - ai.kgateway.io + processingMode: + requestBodyMode: STREAMED + requestHeaderMode: SEND + requestTrailerMode: SKIP + responseBodyMode: STREAMED + responseHeaderMode: SEND + responseTrailerMode: SKIP + - name: envoy.filters.http.router + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upstreamHttpFilters: + - name: io.kgateway.wait + typedConfig: + '@type': type.googleapis.com/envoy.config.filter.http.upstream_wait.v2.UpstreamWaitFilterConfig + - name: ai.policy.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: ai.upstream.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: envoy.filters.http.upstream_codec + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.upstream_codec.v3.UpstreamCodec + mergeSlashes: true + normalizePath: true + rds: + configSource: + ads: {} + resourceApiVersion: V3 + routeConfigName: http + statPrefix: http + useRemoteAddress: true + name: http + name: http +routes: +- ignorePortInHostMatching: true + name: http + virtualHosts: + - domains: + - test + name: http~test + routes: + - match: + prefix: / + name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 + route: + autoHostRewrite: true + cluster: upstream_gwtest_anthropic_0 + typedPerFilterConfig: + ai.extproc.kgateway.io: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + grpcInitialMetadata: + - key: x-llm-provider + value: anthropic + - key: x-request-id + value: '%REQ(X-REQUEST-ID)%' + ai.policy.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations.RouteTransformation + requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + mergeJsonKeys: + jsonKeys: + temperature: + tmpl: + text: '{{ default(0.5, "0.5") }}' + ai.upstream.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations + transformations: + - requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + headers: + :path: + text: /v1/messages + x-api-key: + text: '{% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% + else %}{{dynamic_metadata("auth_token","ai.kgateway.io")}}{% + endif %}' + mergeJsonKeys: + jsonKeys: + model: + tmpl: + text: '{% if host_metadata("model") != "" %}"{{host_metadata("model")}}"{% + else %}"{{model}}"{% endif %}' diff --git a/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml b/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml new file mode 100644 index 00000000000..62527b7b5ee --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-anthropic-passthrough.yaml @@ -0,0 +1,66 @@ +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-gw-for-test + namespace: gwtest +spec: + gatewayClassName: kgateway + listeners: + - protocol: HTTP + port: 8080 + name: http + allowedRoutes: + namespaces: + from: All +--- +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: HTTPRoute +metadata: + name: route-to-upstream + namespace: gwtest +spec: + parentRefs: + - name: http-gw-for-test + hostnames: + - "test" + rules: + - matches: + - path: + type: Exact + value: /anthropic + backendRefs: + - name: anthropic + kind: Upstream + group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev + kind: RoutePolicy + name: route-test +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: RoutePolicy +metadata: + name: route-test + namespace: gwtest +spec: + ai: + defaults: + - field: "temperature" + value: "0.5" +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: Upstream +metadata: + labels: + app: kgateway + name: anthropic + namespace: gwtest +spec: + ai: + llm: + provider: + anthropic: + authToken: + kind: "Passthrough" \ No newline at end of file diff --git a/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard-out.yaml b/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard-out.yaml new file mode 100644 index 00000000000..fa209e3d262 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard-out.yaml @@ -0,0 +1,252 @@ +clusters: +- http2ProtocolOptions: {} + loadAssignment: + clusterName: ai_ext_proc_uds_cluster + endpoints: + - lbEndpoints: + - endpoint: + address: + pipe: + path: '@kgateway-ai-sock' + name: ai_ext_proc_uds_cluster + type: STATIC +- connectTimeout: 5s + edsClusterConfig: + edsConfig: + ads: {} + resourceApiVersion: V3 + metadata: {} + name: kube_default_kubernetes_443 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + sni: outbound_.443_._.kubernetes.default.svc.cluster.local + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: EDS +- connectTimeout: 5s + dnsLookupFamily: V4_ONLY + loadAssignment: + clusterName: upstream_gwtest_deepseek_0 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: ollama-deepseek-r1.ollama.svc.cluster.local + portValue: 11434 + hostname: ollama-deepseek-r1.ollama.svc.cluster.local + metadata: + filterMetadata: + io.solo.transformation: + auth_token: mysecretkey + metadata: {} + name: upstream_gwtest_deepseek_0 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: STRICT_DNS +listeners: +- address: + socketAddress: + address: '::' + ipv4Compat: true + portValue: 8080 + filterChains: + - filters: + - name: envoy.filters.network.http_connection_manager + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + httpFilters: + - name: ai.extproc.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + grpcService: + envoyGrpc: + clusterName: ai_ext_proc_uds_cluster + retryPolicy: + numRetries: 3 + timeout: 5s + messageTimeout: 5s + metadataOptions: + forwardingNamespaces: + typed: + - envoy.filters.ai.solo.io + untyped: + - io.solo.transformation + - envoy.filters.ai.solo.io + receivingNamespaces: + untyped: + - ai.kgateway.io + processingMode: + requestBodyMode: STREAMED + requestHeaderMode: SEND + requestTrailerMode: SKIP + responseBodyMode: STREAMED + responseHeaderMode: SEND + responseTrailerMode: SKIP + - name: envoy.filters.http.router + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upstreamHttpFilters: + - name: io.kgateway.wait + typedConfig: + '@type': type.googleapis.com/envoy.config.filter.http.upstream_wait.v2.UpstreamWaitFilterConfig + - name: ai.policy.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: ai.upstream.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: envoy.filters.http.upstream_codec + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.upstream_codec.v3.UpstreamCodec + mergeSlashes: true + normalizePath: true + rds: + configSource: + ads: {} + resourceApiVersion: V3 + routeConfigName: http + statPrefix: http + useRemoteAddress: true + name: http + name: http +routes: +- ignorePortInHostMatching: true + name: http + virtualHosts: + - domains: + - test + name: http~test + routes: + - match: + prefix: / + name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 + route: + autoHostRewrite: true + cluster: upstream_gwtest_deepseek_0 + typedPerFilterConfig: + ai.extproc.kgateway.io: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + grpcInitialMetadata: + - key: x-llm-provider + value: openai + - key: x-request-id + value: '%REQ(X-REQUEST-ID)%' + - key: x-req-guardrails-config + value: '{"customResponse":{"message":"Rejected due to inappropriate + content","statusCode":400},"regex":{"matches":[{"pattern":"credit + card"}],"action":"MASK"}}' + - key: x-req-guardrails-config-hash + value: "10507036172039765021" + - key: x-resp-guardrails-config + value: '{"regex":{"builtins":["PHONE_NUMBER","EMAIL","SSN","CREDIT_CARD"],"action":"MASK"}}' + - key: x-resp-guardrails-config-hash + value: "15744476576820868899" + ai.policy.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations.RouteTransformation + requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + mergeJsonKeys: + jsonKeys: + messages: + tmpl: + text: '[{"content":"respond to all questions in French","role":"system"},{"content":"Say + hello before each response","role":"system"},{{ join(messages, + ", ") }}]' + ai.upstream.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations + transformations: + - requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + headers: + :path: + text: /v1/chat/completions + Authorization: + text: Bearer {% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% + else %}{{dynamic_metadata("auth_token","ai.kgateway.io")}}{% + endif %} + mergeJsonKeys: + jsonKeys: + model: + tmpl: + text: '{% if host_metadata("model") != "" %}"{{host_metadata("model")}}"{% + else %}"{{model}}"{% endif %}' diff --git a/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard.yaml b/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard.yaml new file mode 100644 index 00000000000..3c9002f68a9 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-deepseek-prompt-guard.yaml @@ -0,0 +1,99 @@ +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-gw-for-test + namespace: gwtest +spec: + gatewayClassName: kgateway + listeners: + - protocol: HTTP + port: 8080 + name: http + allowedRoutes: + namespaces: + from: All +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: Upstream +metadata: + labels: + app: kgateway + name: deepseek + namespace: gwtest +spec: + ai: + llm: + hostOverride: + host: ollama-deepseek-r1.ollama.svc.cluster.local + port: 11434 + provider: + openai: + authToken: + kind: "SecretRef" + secretRef: + name: deepseek-secret +--- +apiVersion: v1 +kind: Secret +metadata: + name: deepseek-secret + namespace: gwtest +type: Opaque +data: + Authorization: bXlzZWNyZXRrZXk= +--- +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: HTTPRoute +metadata: + name: route-to-upstream + namespace: gwtest +spec: + parentRefs: + - name: http-gw-for-test + hostnames: + - "test" + rules: + - matches: + - path: + type: Exact + value: /v1/chat/completions + backendRefs: + - name: deepseek + kind: Upstream + group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev + kind: RoutePolicy + name: route-test +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: RoutePolicy +metadata: + name: route-test + namespace: gwtest +spec: + ai: + promptEnrichment: + prepend: + - role: SYSTEM + content: "respond to all questions in French" + - role: SYSTEM + content: "Say hello before each response" + promptGuard: + request: + customResponse: + message: "Rejected due to inappropriate content" + statusCode: 400 + regex: + matches: + - pattern: "credit card" + response: + regex: + builtins: + - PHONE_NUMBER + - EMAIL + - SSN + - CREDIT_CARD +--- \ No newline at end of file diff --git a/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard-out.yaml b/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard-out.yaml new file mode 100644 index 00000000000..b3d0bf32901 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard-out.yaml @@ -0,0 +1,247 @@ +clusters: +- http2ProtocolOptions: {} + loadAssignment: + clusterName: ai_ext_proc_uds_cluster + endpoints: + - lbEndpoints: + - endpoint: + address: + pipe: + path: '@kgateway-ai-sock' + name: ai_ext_proc_uds_cluster + type: STATIC +- connectTimeout: 5s + edsClusterConfig: + edsConfig: + ads: {} + resourceApiVersion: V3 + metadata: {} + name: kube_default_kubernetes_443 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + sni: outbound_.443_._.kubernetes.default.svc.cluster.local + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: EDS +- connectTimeout: 5s + dnsLookupFamily: V4_ONLY + loadAssignment: + clusterName: upstream_gwtest_openai_0 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: api.openai.com + portValue: 443 + hostname: api.openai.com + metadata: + filterMetadata: + envoy.transport_socket_match: + tls: api.openai.com + io.solo.transformation: + auth_token: mysecretkey + metadata: {} + name: upstream_gwtest_openai_0 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: STRICT_DNS +listeners: +- address: + socketAddress: + address: '::' + ipv4Compat: true + portValue: 8080 + filterChains: + - filters: + - name: envoy.filters.network.http_connection_manager + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + httpFilters: + - name: ai.extproc.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + grpcService: + envoyGrpc: + clusterName: ai_ext_proc_uds_cluster + retryPolicy: + numRetries: 3 + timeout: 5s + messageTimeout: 5s + metadataOptions: + forwardingNamespaces: + typed: + - envoy.filters.ai.solo.io + untyped: + - io.solo.transformation + - envoy.filters.ai.solo.io + receivingNamespaces: + untyped: + - ai.kgateway.io + processingMode: + requestBodyMode: STREAMED + requestHeaderMode: SEND + requestTrailerMode: SKIP + responseBodyMode: STREAMED + responseHeaderMode: SEND + responseTrailerMode: SKIP + - name: envoy.filters.http.router + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upstreamHttpFilters: + - name: io.kgateway.wait + typedConfig: + '@type': type.googleapis.com/envoy.config.filter.http.upstream_wait.v2.UpstreamWaitFilterConfig + - name: ai.policy.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: ai.upstream.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: envoy.filters.http.upstream_codec + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.upstream_codec.v3.UpstreamCodec + mergeSlashes: true + normalizePath: true + rds: + configSource: + ads: {} + resourceApiVersion: V3 + routeConfigName: http + statPrefix: http + useRemoteAddress: true + name: http + name: http +routes: +- ignorePortInHostMatching: true + name: http + virtualHosts: + - domains: + - test + name: http~test + routes: + - match: + prefix: / + name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 + route: + autoHostRewrite: true + cluster: upstream_gwtest_openai_0 + typedPerFilterConfig: + ai.extproc.kgateway.io: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + grpcInitialMetadata: + - key: x-llm-provider + value: openai + - key: x-request-id + value: '%REQ(X-REQUEST-ID)%' + - key: x-req-guardrails-config + value: '{"moderation":{"openAIModeration":{"authToken":{"kind":"","inline":"mysecretkey"}}}}' + - key: x-req-guardrails-config-hash + value: "7996329749952833575" + ai.policy.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations.RouteTransformation + requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + mergeJsonKeys: + jsonKeys: + messages: + tmpl: + text: '[{{ join(messages, ", ") }},{"content":"Make sure the + tone is friendly and professional.","role":"system"}]' + ai.upstream.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations + transformations: + - requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + headers: + :path: + text: /v1/chat/completions + Authorization: + text: Bearer {% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% + else %}{{dynamic_metadata("auth_token","ai.kgateway.io")}}{% + endif %} + mergeJsonKeys: + jsonKeys: + model: + tmpl: + text: '{% if host_metadata("model") != "" %}"{{host_metadata("model")}}"{% + else %}"{{model}}"{% endif %}' diff --git a/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard.yaml b/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard.yaml new file mode 100644 index 00000000000..ac63a8464cf --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-openai-moderation-promptguard.yaml @@ -0,0 +1,86 @@ +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-gw-for-test + namespace: gwtest +spec: + gatewayClassName: kgateway + listeners: + - protocol: HTTP + port: 8080 + name: http + allowedRoutes: + namespaces: + from: All +--- +apiVersion: v1 +kind: Secret +metadata: + name: openai-secret + namespace: gwtest +type: Opaque +data: + Authorization: bXlzZWNyZXRrZXk= +--- +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: HTTPRoute +metadata: + name: route-to-upstream + namespace: gwtest +spec: + parentRefs: + - name: http-gw-for-test + hostnames: + - "test" + rules: + - matches: + - path: + type: Exact + value: /v1/chat/completions + backendRefs: + - name: openai + kind: Upstream + group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev + kind: RoutePolicy + name: route-test +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: RoutePolicy +metadata: + name: route-test + namespace: gwtest +spec: + ai: + promptEnrichment: + append: + - role: SYSTEM + content: "Make sure the tone is friendly and professional." + promptGuard: + request: + moderation: + openAIModeration: + authToken: + kind: "SecretRef" + secretRef: + name: openai-secret +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: Upstream +metadata: + labels: + app: kgateway + name: openai + namespace: gwtest +spec: + ai: + llm: + provider: + openai: + authToken: + kind: "SecretRef" + secretRef: + name: openai-secret \ No newline at end of file diff --git a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml new file mode 100644 index 00000000000..06980bc12f9 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming-out.yaml @@ -0,0 +1,252 @@ +clusters: +- http2ProtocolOptions: {} + loadAssignment: + clusterName: ai_ext_proc_uds_cluster + endpoints: + - lbEndpoints: + - endpoint: + address: + pipe: + path: '@kgateway-ai-sock' + name: ai_ext_proc_uds_cluster + type: STATIC +- connectTimeout: 5s + edsClusterConfig: + edsConfig: + ads: {} + resourceApiVersion: V3 + metadata: {} + name: kube_default_kubernetes_443 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + sni: outbound_.443_._.kubernetes.default.svc.cluster.local + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: EDS +- connectTimeout: 5s + dnsLookupFamily: V4_ONLY + loadAssignment: + clusterName: upstream_gwtest_vertexai_0 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: us-central1-aiplatform.googleapis.com + portValue: 443 + hostname: us-central1-aiplatform.googleapis.com + metadata: + filterMetadata: + envoy.transport_socket_match: + tls: us-central1-aiplatform.googleapis.com + io.solo.transformation: + api_version: v1 + auth_token: mysecretkey + location: us-central1 + model: gemini-1.5-flash-001 + project: gloo-ee + publisher: google + metadata: {} + name: upstream_gwtest_vertexai_0 + transportSocketMatches: + - match: + tlsMode: istio + name: tlsMode-istio + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + alpnProtocols: + - istio + tlsCertificateSdsSecretConfigs: + - name: istio_server_cert + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + tlsParams: {} + validationContextSdsSecretConfig: + name: istio_validation_context + sdsConfig: + apiConfigSource: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: gateway_proxy_sds + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + resourceApiVersion: V3 + - match: {} + name: tlsMode-disabled + transportSocket: + name: envoy.transport_sockets.raw_buffer + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer + type: STRICT_DNS +listeners: +- address: + socketAddress: + address: '::' + ipv4Compat: true + portValue: 8080 + filterChains: + - filters: + - name: envoy.filters.network.http_connection_manager + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + httpFilters: + - name: ai.extproc.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + grpcService: + envoyGrpc: + clusterName: ai_ext_proc_uds_cluster + retryPolicy: + numRetries: 3 + timeout: 5s + messageTimeout: 5s + metadataOptions: + forwardingNamespaces: + typed: + - envoy.filters.ai.solo.io + untyped: + - io.solo.transformation + - envoy.filters.ai.solo.io + receivingNamespaces: + untyped: + - ai.kgateway.io + processingMode: + requestBodyMode: STREAMED + requestHeaderMode: SEND + requestTrailerMode: SKIP + responseBodyMode: STREAMED + responseHeaderMode: SEND + responseTrailerMode: SKIP + - name: envoy.filters.http.set_filter_state + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.set_filter_state.v3.Config + onRequestHeaders: + - factoryKey: envoy.route_type + formatString: + textFormatSource: + inlineString: '%DYNAMIC_METADATA(envoy.route_type)%' + objectKey: envoy.route_type + - name: envoy.filters.http.router + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upstreamHttpFilters: + - name: io.kgateway.wait + typedConfig: + '@type': type.googleapis.com/envoy.config.filter.http.upstream_wait.v2.UpstreamWaitFilterConfig + - name: ai.policy.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: ai.upstream.transformation.kgateway.io + typedConfig: + '@type': type.googleapis.com/envoy.api.v2.filter.http.FilterTransformations + - name: envoy.filters.http.upstream_codec + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.upstream_codec.v3.UpstreamCodec + mergeSlashes: true + normalizePath: true + rds: + configSource: + ads: {} + resourceApiVersion: V3 + routeConfigName: http + statPrefix: http + useRemoteAddress: true + name: http + name: http +routes: +- ignorePortInHostMatching: true + name: http + virtualHosts: + - domains: + - test + name: http~test + routes: + - match: + prefix: / + name: http~test-route-1-httproute-route-to-upstream-gwtest-1-0-matcher-0 + route: + autoHostRewrite: true + cluster: upstream_gwtest_vertexai_0 + typedPerFilterConfig: + ai.extproc.kgateway.io: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute + overrides: + grpcInitialMetadata: + - key: x-llm-provider + value: vertex-ai + - key: x-llm-model + value: gemini-1.5-flash-001 + - key: x-request-id + value: '%REQ(X-REQUEST-ID)%' + - key: x-chat-streaming + value: "true" + ai.policy.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations.RouteTransformation + requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + mergeJsonKeys: {} + ai.upstream.transformation.kgateway.io: + '@type': type.googleapis.com/envoy.api.v2.filter.http.RouteTransformations + transformations: + - requestMatch: + requestTransformation: + logRequestResponseInfo: false + transformationTemplate: + headers: + :path: + text: /{{host_metadata("api_version")}}/projects/{{host_metadata("project")}}/locations/{{host_metadata("location")}}/publishers/{{host_metadata("publisher")}}/models/{{host_metadata("model")}}:{% + if host_metadata("route_type") == "CHAT_STREAMING" %}streamGenerateContent?alt=sse{% + else %}generateContent{% endif %} + Authorization: + text: Bearer {% if host_metadata("auth_token") != "" %}{{host_metadata("auth_token")}}{% + else %}{{dynamic_metadata("auth_token","ai.kgateway.io")}}{% + endif %} diff --git a/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml new file mode 100644 index 00000000000..9c326cb4297 --- /dev/null +++ b/internal/kgateway/setup/testdata/ai-vertex-ai-streaming.yaml @@ -0,0 +1,71 @@ +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-gw-for-test + namespace: gwtest +spec: + gatewayClassName: kgateway + listeners: + - protocol: HTTP + port: 8080 + name: http + allowedRoutes: + namespaces: + from: All +--- +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: HTTPRoute +metadata: + name: route-to-upstream + namespace: gwtest +spec: + parentRefs: + - name: http-gw-for-test + hostnames: + - "test" + rules: + - matches: + - path: + type: Exact + value: /vertexai + backendRefs: + - name: vertexai + kind: Upstream + group: gateway.kgateway.dev + filters: + - type: ExtensionRef + extensionRef: + group: gateway.kgateway.dev + kind: RoutePolicy + name: route-test +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: RoutePolicy +metadata: + name: route-test + namespace: gwtest +spec: + ai: + routeType: CHAT_STREAMING +--- +apiVersion: gateway.kgateway.dev/v1alpha1 +kind: Upstream +metadata: + labels: + app: kgateway + name: vertexai + namespace: gwtest +spec: + ai: + llm: + provider: + vertexai: + model: gemini-1.5-flash-001 + apiVersion: v1 + location: us-central1 + projectId: gloo-ee + publisher: GOOGLE + authToken: + kind: "Inline" + inline: mysecretkey +--- \ No newline at end of file diff --git a/internal/kgateway/translator/irtranslator/fc.go b/internal/kgateway/translator/irtranslator/fc.go index ce4df6b29fd..cd12bba0abb 100644 --- a/internal/kgateway/translator/irtranslator/fc.go +++ b/internal/kgateway/translator/irtranslator/fc.go @@ -339,7 +339,7 @@ func (h *hcmNetworkFilterTranslator) computeUpstreamHTTPFilters(ctx context.Cont upstreamHttpFilters := plugins.StagedUpstreamHttpFilterList{} log := contextutils.LoggerFrom(ctx).Desugar() for _, plug := range h.PluginPass { - stagedFilters, err := plug.UpstreamHttpFilters(ctx) + stagedFilters, err := plug.UpstreamHttpFilters(ctx, l.FilterChainCommon) if err != nil { // what to do with errors here? ignore the listener?? h.reporter.SetCondition(reports.ListenerCondition{ diff --git a/internal/kgateway/translator/irtranslator/gateway.go b/internal/kgateway/translator/irtranslator/gateway.go index 3280e2c5a49..75d5fb215b9 100644 --- a/internal/kgateway/translator/irtranslator/gateway.go +++ b/internal/kgateway/translator/irtranslator/gateway.go @@ -36,6 +36,13 @@ func (t *Translator) Translate(gw ir.GatewayIR, reporter reports.Reporter) Trans res.Routes = append(res.Routes, routes...) } + for _, c := range pass { + if c != nil { + r := c.ResourcesToAdd(context.TODO()) + res.ExtraClusters = append(res.ExtraClusters, r.Clusters...) + } + } + return res } diff --git a/internal/kgateway/translator/irtranslator/route.go b/internal/kgateway/translator/irtranslator/route.go index 81a9747929c..ef867ce85bd 100644 --- a/internal/kgateway/translator/irtranslator/route.go +++ b/internal/kgateway/translator/irtranslator/route.go @@ -11,10 +11,13 @@ import ( envoy_type_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" "github.com/solo-io/go-utils/contextutils" "go.uber.org/zap" - wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/wrapperspb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" gwv1 "sigs.k8s.io/gateway-api/apis/v1" + "github.com/kgateway-dev/kgateway/v2/api/v1alpha1" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/reports" "github.com/kgateway-dev/kgateway/v2/internal/kgateway/translator/routeutils" @@ -119,10 +122,9 @@ func (h *httpRouteConfigurationTranslator) envoyRoutes(ctx context.Context, ) *envoy_config_route_v3.Route { out := h.initRoutes(in, generatedName) if len(in.Backends) > 0 { - out.Action = h.translateRouteAction(in, out) + out.Action = h.translateRouteAction(ctx, in, out) } - - // run plugins here that may set actoin + // run plugins here that may set action err := h.runRoutePlugins(ctx, routeReport, in, out) if err == nil { err = validateEnvoyRoute(out) @@ -240,6 +242,7 @@ func (h *httpRouteConfigurationTranslator) runBackendPolicies(ctx context.Contex continue } for _, pol := range pols { + // Policy on extension ref err := pass.ApplyForRouteBackend(ctx, pol.PolicyIr, pCtx) if err != nil { errs = append(errs, err) @@ -250,7 +253,22 @@ func (h *httpRouteConfigurationTranslator) runBackendPolicies(ctx context.Contex return errors.Join(errs...) } +func (h *httpRouteConfigurationTranslator) runBackend(ctx context.Context, in ir.HttpBackend, pCtx *ir.RouteBackendContext, outRoute *envoy_config_route_v3.Route) error { + var errs []error + if in.Backend.Upstream != nil { + if in.Backend.Upstream.GetGroupKind().Kind == v1alpha1.UpstreamGVK.Kind { + err := h.PluginPass[in.Backend.Upstream.GetGroupKind()].ApplyForBackend(ctx, pCtx, in, outRoute) + if err != nil { + errs = append(errs, err) + } + } + } + // TODO: check return value, if error returned, log error and report condition + return errors.Join(errs...) +} + func (h *httpRouteConfigurationTranslator) translateRouteAction( + ctx context.Context, in ir.HttpRouteRuleMatchIR, outRoute *envoy_config_route_v3.Route, ) *envoy_config_route_v3.Route_Route { @@ -265,24 +283,57 @@ func (h *httpRouteConfigurationTranslator) translateRouteAction( Name: clusterName, Weight: wrapperspb.UInt32(backend.Backend.Weight), } + + typedPerFilterConfig := map[string]proto.Message{} + pCtx := ir.RouteBackendContext{ - FilterChainName: h.fc.FilterChainName, - Upstream: backend.Backend.Upstream, - TypedFiledConfig: &cw.TypedPerFilterConfig, + FilterChainName: h.fc.FilterChainName, + Upstream: backend.Backend.Upstream, + TypedFilterConfig: &typedPerFilterConfig, } - h.runBackendPolicies( - context.TODO(), + // non attached policy translation + err := h.runBackend( + ctx, backend, &pCtx, + outRoute, ) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) + } + + err = h.runBackendPolicies( + ctx, + backend, + &pCtx, + ) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) + } + + typedPerFilterConfigAny := map[string]*anypb.Any{} + for k, v := range typedPerFilterConfig { + config, err := utils.MessageToAny(v) + if err != nil { + // TODO: error on status + contextutils.LoggerFrom(ctx).Error(err) + continue + } + typedPerFilterConfigAny[k] = config + } + cw.TypedPerFilterConfig = typedPerFilterConfigAny clusters = append(clusters, cw) } // TODO: i think envoy nacks if all weights are 0, we should error on that. - - action := &envoy_config_route_v3.RouteAction{ - ClusterNotFoundResponseCode: envoy_config_route_v3.RouteAction_INTERNAL_SERVER_ERROR, + action := outRoute.GetRoute() + if action == nil { + action = &envoy_config_route_v3.RouteAction{ + ClusterNotFoundResponseCode: envoy_config_route_v3.RouteAction_INTERNAL_SERVER_ERROR, + } } routeAction := &envoy_config_route_v3.Route_Route{ Route: action, diff --git a/internal/kgateway/wellknown/constants.go b/internal/kgateway/wellknown/constants.go index 79480cdae9c..d5e667d7ea5 100644 --- a/internal/kgateway/wellknown/constants.go +++ b/internal/kgateway/wellknown/constants.go @@ -26,3 +26,11 @@ const ( SdsClusterName = "gateway_proxy_sds" SdsTargetURI = "127.0.0.1:8234" ) + +const ( + // TODO: create a policy and upstream + AIUpstreamTransformationFilterName = "ai.upstream.transformation.kgateway.io" + AIPolicyTransformationFilterName = "ai.policy.transformation.kgateway.io" + AIExtProcFilterName = "ai.extproc.kgateway.io" + SetMetadataFilterName = "envoy.filters.http.set_filter_state" +)