Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
GGabriele committed Aug 2, 2023
1 parent b2b24d3 commit f7222bc
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cmd/common_konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func resetKonnectV2(ctx context.Context) error {
if err != nil {
return err
}
dumpConfig.IsConsumerGroupScopedPluginSupported = true
if dumpConfig.KonnectRuntimeGroup == "" {
dumpConfig.KonnectRuntimeGroup = defaultRuntimeGroupName
dumpConfig.IsConsumerGroupScopedPluginSupported = true
}
currentState, err := fetchCurrentState(ctx, client, dumpConfig)
if err != nil {
Expand Down
124 changes: 75 additions & 49 deletions file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,46 @@ func (b *stateBuilder) build() (*utils.KongRawState, *utils.KonnectRawState, err
return b.rawState, b.konnectRawState, nil
}

func (b *stateBuilder) ingestConsumerGroupScopedPlugins(cg FConsumerGroupObject) error {
var plugins []FPlugin
for _, plugin := range cg.Plugins {
plugin.ConsumerGroup = utils.GetConsumerGroupReference(cg.ConsumerGroup)
plugins = append(plugins, FPlugin{
Plugin: kong.Plugin{
ID: plugin.ID,
Name: plugin.Name,
Config: plugin.Config,
ConsumerGroup: &kong.ConsumerGroup{
ID: cg.ID,
},
},
})
}
return b.ingestPlugins(plugins)
}

func (b *stateBuilder) addConsumerGroupPlugins(
cg FConsumerGroupObject, cgo *kong.ConsumerGroupObject,
) error {
for _, plugin := range cg.Plugins {
if utils.Empty(plugin.ID) {
current, err := b.currentState.ConsumerGroupPlugins.Get(
*plugin.Name, *cg.ConsumerGroup.ID,
)
if errors.Is(err, state.ErrNotFound) {
plugin.ID = uuid()
} else if err != nil {
return err
} else {
plugin.ID = kong.String(*current.ID)
}
}
b.defaulter.MustSet(plugin)
cgo.Plugins = append(cgo.Plugins, plugin)
}
return nil
}

func (b *stateBuilder) consumerGroups() {
if b.err != nil {
return
Expand Down Expand Up @@ -130,43 +170,22 @@ func (b *stateBuilder) consumerGroups() {
return
}

// Plugins and Consumer Groups can be handled in two ways:
// 1. directly in the ConsumerGroup object
// 2. by scoping the plugin to the ConsumerGroup (Kong >= 3.4.0)
//
// The first method is deprecated and will be removed in the future, but
// we still need to support it for now. The isConsumerGroupScopedPluginSupported
// flag is used to determine which method to use based on the Kong version.
if b.isConsumerGroupScopedPluginSupported {
var plugins []FPlugin
for _, plugin := range cg.Plugins {
plugin.ConsumerGroup = utils.GetConsumerGroupReference(cg.ConsumerGroup)
plugins = append(plugins, FPlugin{
Plugin: kong.Plugin{
ID: plugin.ID,
Name: plugin.Name,
Config: plugin.Config,
ConsumerGroup: &kong.ConsumerGroup{
ID: cg.ID,
},
},
})
}

if err := b.ingestPlugins(plugins); err != nil {
if err := b.ingestConsumerGroupScopedPlugins(cg); err != nil {
b.err = err
return
}
} else {
for _, plugin := range cg.Plugins {
if utils.Empty(plugin.ID) {
current, err := b.currentState.ConsumerGroupPlugins.Get(
*plugin.Name, *cg.ConsumerGroup.ID,
)
if errors.Is(err, state.ErrNotFound) {
plugin.ID = uuid()
} else if err != nil {
b.err = err
return
} else {
plugin.ID = kong.String(*current.ID)
}
}
b.defaulter.MustSet(plugin)
cgo.Plugins = append(cgo.Plugins, plugin)
if err := b.addConsumerGroupPlugins(cg, &cgo); err != nil {
b.err = err
return
}
}
b.rawState.ConsumerGroups = append(b.rawState.ConsumerGroups, &cgo)
Expand Down Expand Up @@ -931,23 +950,9 @@ func (b *stateBuilder) plugins() {
p.ConsumerGroup = utils.GetConsumerGroupReference(cg.ConsumerGroup)
}

if (b.isConsumerGroupScopedPluginSupported && !b.isKonnect) && *p.Name == ratelimitingAdvancedPluginName {
// check if deprecated consumer-groups configuration is present in the config
var consumerGroupsFound bool
if groups, ok := p.Config["consumer_groups"]; ok {
// if groups is an array of length > 0, then consumer_groups is set
if groupsArray, ok := groups.([]interface{}); ok && len(groupsArray) > 0 {
consumerGroupsFound = true
}
}
_, enforceConsumerGroupsFound := p.Config["enforce_consumer_groups"]
if consumerGroupsFound || enforceConsumerGroupsFound {
b.err = errors.New("a rate-limiting-advanced plugin with config.consumer_groups\n" +
"and/or config.enforce_consumer_groups was found. Please use Consumer Groups scoped\n" +
"Plugins when running against Kong Enterprise 3.4.0 and above.\n\n" +
"Check DOC_LINK for more information")
return
}
if err := b.validatePlugin(p); err != nil {
b.err = err
return
}
plugins = append(plugins, p)
}
Expand All @@ -957,6 +962,27 @@ func (b *stateBuilder) plugins() {
}
}

func (b *stateBuilder) validatePlugin(p FPlugin) error {
if (b.isConsumerGroupScopedPluginSupported && !b.isKonnect) && *p.Name == ratelimitingAdvancedPluginName {
// check if deprecated consumer-groups configuration is present in the config
var consumerGroupsFound bool
if groups, ok := p.Config["consumer_groups"]; ok {
// if groups is an array of length > 0, then consumer_groups is set
if groupsArray, ok := groups.([]interface{}); ok && len(groupsArray) > 0 {
consumerGroupsFound = true
}
}
_, enforceConsumerGroupsFound := p.Config["enforce_consumer_groups"]
if consumerGroupsFound || enforceConsumerGroupsFound {
return errors.New("a rate-limiting-advanced plugin with config.consumer_groups\n" +
"and/or config.enforce_consumer_groups was found. Please use Consumer Groups scoped\n" +
"Plugins when running against Kong Enterprise 3.4.0 and above.\n\n" +
"Check DOC_LINK for more information")
}
}
return nil
}

// strip_path schema default value is 'true', but it cannot be set when
// protocols include 'grpc' and/or 'grpcs'. When users explicitly set
// strip_path to 'true' with grpc/s protocols, deck returns a schema violation error.
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ func Test_Dump_SkipConsumers_35x(t *testing.T) {
}

func Test_Dump_SkipConsumers_Konnect(t *testing.T) {
// TODO: remove skip once Konnect support is enabled.
t.Skip()
t.Skip("remove skip once Konnect support is enabled.")

tests := []struct {
name string
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3716,8 +3716,7 @@ func Test_Sync_SkipConsumers_34x(t *testing.T) {
// test scope:
// - konnect
func Test_Sync_SkipConsumers_Konnect(t *testing.T) {
// TODO: remove skip once Konnect support is enabled.
t.Skip()
t.Skip("remove skip once Konnect support is enabled.")
runWhenKonnect(t)
// setup stage
client, err := getTestClient()
Expand Down Expand Up @@ -4544,8 +4543,7 @@ func Test_Sync_ConsumerGroupsScopedPlugins_Post340(t *testing.T) {
}

func Test_Sync_ConsumerGroupsScopedPluginsKonnect(t *testing.T) {
// TODO: remove skip once Konnect support is enabled.
t.Skip()
t.Skip("remove skip once Konnect support is enabled.")

client, err := getTestClient()
if err != nil {
Expand Down

0 comments on commit f7222bc

Please sign in to comment.