Skip to content

Commit

Permalink
feat: support scoping plugins to consumer-groups
Browse files Browse the repository at this point in the history
  • Loading branch information
GGabriele committed Aug 4, 2023
1 parent 3e4196d commit af7eaa2
Show file tree
Hide file tree
Showing 30 changed files with 3,487 additions and 1,548 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ setup-kong-ee:

.PHONY: test-integration
test-integration:
go test -v -tags=integration \
go test -v -count=1 -tags=integration \
-race \
./tests/integration/...
4 changes: 4 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func syncMain(ctx context.Context, filenames []string, dry bool, parallelism,
return err
}

if utils.Kong340Version.LTE(parsedKongVersion) {
dumpConfig.IsConsumerGroupScopedPluginSupported = true
}

// read the current state
var currentState *state.KongState
if workspaceExists {
Expand Down
8 changes: 8 additions & 0 deletions cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,16 @@ By default, this command will ask for confirmation.`,
if err != nil {
return fmt.Errorf("reading Kong version: %w", err)
}
parsedKongVersion, err := utils.ParseKongVersion(kongVersion)
if err != nil {
return fmt.Errorf("parsing Kong version: %w", err)
}
_ = sendAnalytics("reset", kongVersion, mode)

if utils.Kong340Version.LTE(parsedKongVersion) {
dumpConfig.IsConsumerGroupScopedPluginSupported = true
}

var workspaces []string
// Kong OSS or default workspace
if !resetAllWorkspaces && resetWorkspace == "" {
Expand Down
16 changes: 16 additions & 0 deletions dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {

// KonnectRuntimeGroup
KonnectRuntimeGroup string

// IsConsumerGroupScopedPluginSupported
IsConsumerGroupScopedPluginSupported bool
}

func deduplicate(stringSlice []string) []string {
Expand Down Expand Up @@ -196,6 +199,7 @@ func getProxyConfiguration(ctx context.Context, group *errgroup.Group,
plugins = excludeKonnectManagedPlugins(plugins)
if config.SkipConsumers {
plugins = excludeConsumersPlugins(plugins)
plugins = excludeConsumerGroupsPlugins(plugins)
}
state.Plugins = plugins
return nil
Expand Down Expand Up @@ -870,3 +874,15 @@ func excludeConsumersPlugins(plugins []*kong.Plugin) []*kong.Plugin {
}
return filtered
}

// excludeConsumerGroupsPlugins filter out consumer-groups plugins
func excludeConsumerGroupsPlugins(plugins []*kong.Plugin) []*kong.Plugin {
var filtered []*kong.Plugin
for _, p := range plugins {
if p.ConsumerGroup != nil && !utils.Empty(p.ConsumerGroup.ID) {
continue
}
filtered = append(filtered, p)
}
return filtered
}
134 changes: 116 additions & 18 deletions file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/kong/go-kong/kong"
)

const ratelimitingAdvancedPluginName = "rate-limiting-advanced"

type stateBuilder struct {
targetContent *Content
rawState *utils.KongRawState
Expand All @@ -35,6 +37,8 @@ type stateBuilder struct {

checkRoutePaths bool

isConsumerGroupScopedPluginSupported bool

err error
}

Expand Down Expand Up @@ -69,6 +73,10 @@ func (b *stateBuilder) build() (*utils.KongRawState, *utils.KonnectRawState, err
b.checkRoutePaths = true
}

if utils.Kong340Version.LTE(b.kongVersion) {
b.isConsumerGroupScopedPluginSupported = true
}

// build
b.certificates()
if !b.skipCACerts {
Expand All @@ -92,6 +100,46 @@ func (b *stateBuilder) build() (*utils.KongRawState, *utils.KonnectRawState, err
return b.rawState, b.konnectRawState, nil
}

func (b *stateBuilder) ingestConsumerGroupScopedPlugins(cg FConsumerGroupObject) error {
var plugins []FPlugin
for _, plugin := range cg.Plugins {
plugin.ConsumerGroup = utils.GetConsumerGroupReference(cg.ConsumerGroup)
plugins = append(plugins, FPlugin{
Plugin: kong.Plugin{
ID: plugin.ID,
Name: plugin.Name,
Config: plugin.Config,
ConsumerGroup: &kong.ConsumerGroup{
ID: cg.ID,
},
},
})
}
return b.ingestPlugins(plugins)
}

func (b *stateBuilder) addConsumerGroupPlugins(
cg FConsumerGroupObject, cgo *kong.ConsumerGroupObject,
) error {
for _, plugin := range cg.Plugins {
if utils.Empty(plugin.ID) {
current, err := b.currentState.ConsumerGroupPlugins.Get(
*plugin.Name, *cg.ConsumerGroup.ID,
)
if errors.Is(err, state.ErrNotFound) {
plugin.ID = uuid()
} else if err != nil {
return err
} else {
plugin.ID = kong.String(*current.ID)
}
}
b.defaulter.MustSet(plugin)
cgo.Plugins = append(cgo.Plugins, plugin)
}
return nil
}

func (b *stateBuilder) consumerGroups() {
if b.err != nil {
return
Expand All @@ -116,22 +164,29 @@ func (b *stateBuilder) consumerGroups() {
ConsumerGroup: &cg.ConsumerGroup,
}

for _, plugin := range cg.Plugins {
if utils.Empty(plugin.ID) {
current, err := b.currentState.ConsumerGroupPlugins.Get(
*plugin.Name, *cg.ConsumerGroup.ID,
)
if errors.Is(err, state.ErrNotFound) {
plugin.ID = uuid()
} else if err != nil {
b.err = err
return
} else {
plugin.ID = kong.String(*current.ID)
}
err := b.intermediate.ConsumerGroups.Add(state.ConsumerGroup{ConsumerGroup: cg.ConsumerGroup})
if err != nil {
b.err = err
return
}

// Plugins and Consumer Groups can be handled in two ways:
// 1. directly in the ConsumerGroup object
// 2. by scoping the plugin to the ConsumerGroup (Kong >= 3.4.0)
//
// The first method is deprecated and will be removed in the future, but
// we still need to support it for now. The isConsumerGroupScopedPluginSupported
// flag is used to determine which method to use based on the Kong version.
if b.isConsumerGroupScopedPluginSupported {
if err := b.ingestConsumerGroupScopedPlugins(cg); err != nil {
b.err = err
return
}
} else {
if err := b.addConsumerGroupPlugins(cg, &cgo); err != nil {
b.err = err
return
}
b.defaulter.MustSet(plugin)
cgo.Plugins = append(cgo.Plugins, plugin)
}
b.rawState.ConsumerGroups = append(b.rawState.ConsumerGroups, &cgo)
}
Expand Down Expand Up @@ -882,6 +937,23 @@ func (b *stateBuilder) plugins() {
}
p.Route = utils.GetRouteReference(r.Route)
}
if p.ConsumerGroup != nil && !utils.Empty(p.ConsumerGroup.ID) {
cg, err := b.intermediate.ConsumerGroups.Get(*p.ConsumerGroup.ID)
if errors.Is(err, state.ErrNotFound) {
b.err = fmt.Errorf("consumer-group %v for plugin %v: %w",
p.ConsumerGroup.FriendlyName(), *p.Name, err)
return
} else if err != nil {
b.err = err
return
}
p.ConsumerGroup = utils.GetConsumerGroupReference(cg.ConsumerGroup)
}

if err := b.validatePlugin(p); err != nil {
b.err = err
return
}
plugins = append(plugins, p)
}
if err := b.ingestPlugins(plugins); err != nil {
Expand All @@ -890,6 +962,29 @@ func (b *stateBuilder) plugins() {
}
}

func (b *stateBuilder) validatePlugin(p FPlugin) error {
if (b.isConsumerGroupScopedPluginSupported && !b.isKonnect) && *p.Name == ratelimitingAdvancedPluginName {
// check if deprecated consumer-groups configuration is present in the config
var consumerGroupsFound bool
if groups, ok := p.Config["consumer_groups"]; ok {
// if groups is an array of length > 0, then consumer_groups is set
if groupsArray, ok := groups.([]interface{}); ok && len(groupsArray) > 0 {
consumerGroupsFound = true
}
}
var enforceConsumerGroupsFound bool
if enforceConsumerGroups, ok := p.Config["enforce_consumer_groups"]; ok {
if enforceConsumerGroupsBool, ok := enforceConsumerGroups.(bool); ok && enforceConsumerGroupsBool {
enforceConsumerGroupsFound = true
}
}
if consumerGroupsFound || enforceConsumerGroupsFound {
return utils.ErrorConsumerGroupUpgrade
}
}
return nil
}

// strip_path schema default value is 'true', but it cannot be set when
// protocols include 'grpc' and/or 'grpcs'. When users explicitly set
// strip_path to 'true' with grpc/s protocols, deck returns a schema violation error.
Expand Down Expand Up @@ -997,9 +1092,9 @@ func (b *stateBuilder) ingestPlugins(plugins []FPlugin) error {
for _, p := range plugins {
p := p
if utils.Empty(p.ID) {
cID, rID, sID := pluginRelations(&p.Plugin)
cID, rID, sID, cgID := pluginRelations(&p.Plugin)
plugin, err := b.currentState.Plugins.GetByProp(*p.Name,
sID, rID, cID)
sID, rID, cID, cgID)
if errors.Is(err, state.ErrNotFound) {
p.ID = uuid()
} else if err != nil {
Expand Down Expand Up @@ -1044,7 +1139,7 @@ func (b *stateBuilder) fillPluginConfig(plugin *FPlugin) error {
return nil
}

func pluginRelations(plugin *kong.Plugin) (cID, rID, sID string) {
func pluginRelations(plugin *kong.Plugin) (cID, rID, sID, cgID string) {
if plugin.Consumer != nil && !utils.Empty(plugin.Consumer.ID) {
cID = *plugin.Consumer.ID
}
Expand All @@ -1054,6 +1149,9 @@ func pluginRelations(plugin *kong.Plugin) (cID, rID, sID string) {
if plugin.Service != nil && !utils.Empty(plugin.Service.ID) {
sID = *plugin.Service.ID
}
if plugin.ConsumerGroup != nil && !utils.Empty(plugin.ConsumerGroup.ID) {
cgID = *plugin.ConsumerGroup.ID
}
return
}

Expand Down
42 changes: 30 additions & 12 deletions file/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ func existingPluginState() *state.KongState {
Route: &kong.Route{
ID: kong.String("700bc504-b2b1-4abd-bd38-cec92779659e"),
},
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("69ed4618-a653-4b54-8bb6-dc33bd6fe048"),
},
},
})
return s
Expand Down Expand Up @@ -751,6 +754,9 @@ func Test_stateBuilder_ingestPlugins(t *testing.T) {
Route: &kong.Route{
ID: kong.String("700bc504-b2b1-4abd-bd38-cec92779659e"),
},
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("69ed4618-a653-4b54-8bb6-dc33bd6fe048"),
},
},
},
},
Expand Down Expand Up @@ -780,6 +786,9 @@ func Test_stateBuilder_ingestPlugins(t *testing.T) {
Route: &kong.Route{
ID: kong.String("700bc504-b2b1-4abd-bd38-cec92779659e"),
},
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("69ed4618-a653-4b54-8bb6-dc33bd6fe048"),
},
Config: kong.Configuration{},
},
},
Expand All @@ -805,21 +814,23 @@ func Test_pluginRelations(t *testing.T) {
plugin *kong.Plugin
}
tests := []struct {
name string
args args
wantCID string
wantRID string
wantSID string
name string
args args
wantCID string
wantRID string
wantSID string
wantCGID string
}{
{
args: args{
plugin: &kong.Plugin{
Name: kong.String("foo"),
},
},
wantCID: "",
wantRID: "",
wantSID: "",
wantCID: "",
wantRID: "",
wantSID: "",
wantCGID: "",
},
{
args: args{
Expand All @@ -834,16 +845,20 @@ func Test_pluginRelations(t *testing.T) {
Service: &kong.Service{
ID: kong.String("sID"),
},
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("cgID"),
},
},
},
wantCID: "cID",
wantRID: "rID",
wantSID: "sID",
wantCID: "cID",
wantRID: "rID",
wantSID: "sID",
wantCGID: "cgID",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotCID, gotRID, gotSID := pluginRelations(tt.args.plugin)
gotCID, gotRID, gotSID, gotCGID := pluginRelations(tt.args.plugin)
if gotCID != tt.wantCID {
t.Errorf("pluginRelations() gotCID = %v, want %v", gotCID, tt.wantCID)
}
Expand All @@ -853,6 +868,9 @@ func Test_pluginRelations(t *testing.T) {
if gotSID != tt.wantSID {
t.Errorf("pluginRelations() gotSID = %v, want %v", gotSID, tt.wantSID)
}
if gotCGID != tt.wantCGID {
t.Errorf("pluginRelations() gotCGID = %v, want %v", gotCGID, tt.wantCGID)
}
})
}
}
Expand Down
1 change: 1 addition & 0 deletions file/codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func main() {
schema.Definitions["FPlugin"].Properties["consumer"] = stringType
schema.Definitions["FPlugin"].Properties["service"] = stringType
schema.Definitions["FPlugin"].Properties["route"] = stringType
schema.Definitions["FPlugin"].Properties["consumer_group"] = stringType

schema.Definitions["FService"].Properties["client_certificate"] = stringType

Expand Down
4 changes: 3 additions & 1 deletion file/kong_json_schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit af7eaa2

Please sign in to comment.