Skip to content

Commit

Permalink
feat: add support to consumer_groups
Browse files Browse the repository at this point in the history
  • Loading branch information
GGabriele committed Nov 18, 2022
1 parent d5e4563 commit 562c595
Show file tree
Hide file tree
Showing 28 changed files with 2,223 additions and 21 deletions.
1 change: 1 addition & 0 deletions diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (sc *Syncer) init() error {
types.Upstream, types.Target,

types.Consumer,
types.ConsumerGroup, types.ConsumerGroupConsumer, types.ConsumerGroupPlugin,
types.ACLGroup, types.BasicAuth, types.KeyAuth,
types.HMACAuth, types.JWTAuth, types.OAuth2Cred,
types.MTLSAuth,
Expand Down
3 changes: 3 additions & 0 deletions diff/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var dependencyOrder = [][]types.EntityType{
types.Vault,
},
{
types.ConsumerGroup,
types.RBACEndpointPermission,
types.SNI,
types.Service,
Expand All @@ -51,6 +52,8 @@ var dependencyOrder = [][]types.EntityType{
types.ServiceVersion,
types.Route,
types.Target,
types.ConsumerGroupConsumer,
types.ConsumerGroupPlugin,
},
{
types.Plugin,
Expand Down
48 changes: 48 additions & 0 deletions dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func getProxyConfiguration(ctx context.Context, group *errgroup.Group,
state.Vaults = vaults
return nil
})

group.Go(func() error {
consumerGroups, err := GetAllConsumerGroups(ctx, client, config.SelectorTags)
if err != nil {
return fmt.Errorf("consumer_groups: %w", err)
}
state.ConsumerGroups = consumerGroups
return nil
})
}

func getEnterpriseRBACConfiguration(ctx context.Context, group *errgroup.Group,
Expand Down Expand Up @@ -501,6 +510,45 @@ func GetAllUpstreams(ctx context.Context,
return upstreams, nil
}

// GetAllConsumerGroups queries Kong for all the ConsumerGroups using client.
func GetAllConsumerGroups(ctx context.Context,
client *kong.Client, tags []string,
) ([]*kong.ConsumerGroupObject, error) {
var consumerGroupObjects []*kong.ConsumerGroupObject
opt := newOpt(tags)

for {
cgs, nextopt, err := client.ConsumerGroups.List(ctx, opt)
if err != nil {
return nil, err
}
if err := ctx.Err(); err != nil {
return nil, err
}

for _, cg := range cgs {
r, err := client.ConsumerGroups.Get(ctx, cg.Name)
if err != nil {
return nil, err
}
if err := ctx.Err(); err != nil {
return nil, err
}
group := &kong.ConsumerGroupObject{
ConsumerGroup: r.ConsumerGroup,
Consumers: r.Consumers,
Plugins: r.Plugins,
}
consumerGroupObjects = append(consumerGroupObjects, group)
}
if nextopt == nil {
break
}
opt = nextopt
}
return consumerGroupObjects, nil
}

// GetAllTargets queries Kong for all the Targets of upstreams using client.
// Targets are queries per upstream as there exists no endpoint in Kong
// to list all targets of all upstreams.
Expand Down
60 changes: 60 additions & 0 deletions file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (b *stateBuilder) build() (*utils.KongRawState, *utils.KonnectRawState, err
b.routes()
b.upstreams()
b.consumers()
b.consumerGroups()
b.plugins()
b.enterprise()

Expand All @@ -91,6 +92,65 @@ func (b *stateBuilder) build() (*utils.KongRawState, *utils.KonnectRawState, err
return b.rawState, b.konnectRawState, nil
}

func (b *stateBuilder) consumerGroups() {
if b.err != nil {
return
}

for _, cg := range b.targetContent.ConsumerGroups {
cg := cg
if utils.Empty(cg.ID) {
current, err := b.currentState.ConsumerGroups.Get(*cg.Name)
if err == state.ErrNotFound {
cg.ID = uuid()
} else if err != nil {
b.err = err
return
} else {
cg.ID = kong.String(*current.ID)
}
}
utils.MustMergeTags(&cg.ConsumerGroup, b.selectTags)

cgo := kong.ConsumerGroupObject{
ConsumerGroup: &cg.ConsumerGroup,
}

for _, consumer := range cg.Consumers {
if utils.Empty(consumer.ID) {
current, err := b.currentState.Consumers.Get(*consumer.Username)
if err == state.ErrNotFound {
consumer.ID = uuid()
} else if err != nil {
b.err = err
return
} else {
consumer.ID = kong.String(*current.ID)
}
}
cgo.Consumers = append(cgo.Consumers, consumer)
}

for _, plugin := range cg.Plugins {
if utils.Empty(plugin.ID) {
current, err := b.currentState.ConsumerGroupPlugins.Get(
*plugin.Name, *cg.ConsumerGroup.ID,
)
if err == state.ErrNotFound {
plugin.ID = uuid()
} else if err != nil {
b.err = err
return
} else {
plugin.ID = kong.String(*current.ID)
}
}
cgo.Plugins = append(cgo.Plugins, plugin)
}
b.rawState.ConsumerGroups = append(b.rawState.ConsumerGroups, &cgo)
}
}

func (b *stateBuilder) certificates() {
if b.err != nil {
return
Expand Down
73 changes: 73 additions & 0 deletions file/kong_json_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
},
"type": "array"
},
"consumer_groups": {
"items": {
"$schema": "http://json-schema.org/draft-04/schema#",
"$ref": "#/definitions/FConsumerGroupObject"
},
"type": "array"
},
"consumers": {
"items": {
"$schema": "http://json-schema.org/draft-04/schema#",
Expand Down Expand Up @@ -285,6 +292,44 @@
"additionalProperties": false,
"type": "object"
},
"ConsumerGroup": {
"properties": {
"created_at": {
"type": "integer"
},
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},
"additionalProperties": false,
"type": "object"
},
"ConsumerGroupPlugin": {
"properties": {
"config": {
"additionalProperties": true,
"type": "object"
},
"consumer_group": {
"$schema": "http://json-schema.org/draft-04/schema#",
"$ref": "#/definitions/ConsumerGroup"
},
"created_at": {
"type": "integer"
},
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},
"additionalProperties": false,
"type": "object"
},
"FCACertificate": {
"required": [
"cert"
Expand Down Expand Up @@ -443,6 +488,34 @@
}
]
},
"FConsumerGroupObject": {
"properties": {
"consumers": {
"items": {
"$ref": "#/definitions/Consumer"
},
"type": "array"
},
"created_at": {
"type": "integer"
},
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"plugins": {
"items": {
"$schema": "http://json-schema.org/draft-04/schema#",
"$ref": "#/definitions/ConsumerGroupPlugin"
},
"type": "array"
}
},
"additionalProperties": false,
"type": "object"
},
"FDocument": {
"properties": {
"id": {
Expand Down
34 changes: 27 additions & 7 deletions file/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ func (c FConsumer) sortKey() string {
return ""
}

// FConsumerGroupObject represents a Kong ConsumerGroup and its associated consumers and plugins.
// +k8s:deepcopy-gen=true
type FConsumerGroupObject struct {
kong.ConsumerGroup `yaml:",inline,omitempty"`
Consumers []*kong.Consumer `json:"consumers,omitempty" yaml:",omitempty"`
Plugins []*kong.ConsumerGroupPlugin `json:"plugins,omitempty" yaml:",omitempty"`
}

// sortKey is used for sorting.
func (u FConsumerGroupObject) sortKey() string {
if u.Name != nil {
return *u.Name
}
if u.ID != nil {
return *u.ID
}
return ""
}

// FRBACRole represents an RBACRole in Kong
// +k8s:deepcopy-gen=true
type FRBACRole struct {
Expand Down Expand Up @@ -664,13 +683,14 @@ type Content struct {
Workspace string `json:"_workspace,omitempty" yaml:"_workspace,omitempty"`
Konnect *Konnect `json:"_konnect,omitempty" yaml:"_konnect,omitempty"`

Services []FService `json:"services,omitempty" yaml:",omitempty"`
Routes []FRoute `json:"routes,omitempty" yaml:",omitempty"`
Consumers []FConsumer `json:"consumers,omitempty" yaml:",omitempty"`
Plugins []FPlugin `json:"plugins,omitempty" yaml:",omitempty"`
Upstreams []FUpstream `json:"upstreams,omitempty" yaml:",omitempty"`
Certificates []FCertificate `json:"certificates,omitempty" yaml:",omitempty"`
CACertificates []FCACertificate `json:"ca_certificates,omitempty" yaml:"ca_certificates,omitempty"`
Services []FService `json:"services,omitempty" yaml:",omitempty"`
Routes []FRoute `json:"routes,omitempty" yaml:",omitempty"`
Consumers []FConsumer `json:"consumers,omitempty" yaml:",omitempty"`
ConsumerGroups []FConsumerGroupObject `json:"consumer_groups,omitempty" yaml:",omitempty"`
Plugins []FPlugin `json:"plugins,omitempty" yaml:",omitempty"`
Upstreams []FUpstream `json:"upstreams,omitempty" yaml:",omitempty"`
Certificates []FCertificate `json:"certificates,omitempty" yaml:",omitempty"`
CACertificates []FCACertificate `json:"ca_certificates,omitempty" yaml:"ca_certificates,omitempty"`

RBACRoles []FRBACRole `json:"rbac_roles,omitempty" yaml:"rbac_roles,omitempty"`

Expand Down
51 changes: 51 additions & 0 deletions file/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func KongStateToFile(kongState *state.KongState, config WriteConfig) error {
return err
}

err = populateConsumerGroups(kongState, file, config)
if err != nil {
return err
}

return WriteContentToFile(file, config.Filename, config.FileFormat)
}

Expand Down Expand Up @@ -668,6 +673,52 @@ func populateConsumers(kongState *state.KongState, file *Content,
return nil
}

func populateConsumerGroups(kongState *state.KongState, file *Content,
config WriteConfig,
) error {
consumerGroups, err := kongState.ConsumerGroups.GetAll()
if err != nil {
return err
}
consumers, err := kongState.ConsumerGroupConsumers.GetAll()
if err != nil {
return err
}
plugins, err := kongState.ConsumerGroupPlugins.GetAll()
if err != nil {
return err
}
for _, cg := range consumerGroups {
group := FConsumerGroupObject{ConsumerGroup: cg.ConsumerGroup}
for _, consumer := range consumers {
if consumer.ConsumerGroup.ID != nil && cg.ID != nil {
if *consumer.ConsumerGroup.ID == *cg.ID {
utils.ZeroOutID(consumer.Consumer, consumer.Consumer.Username, config.WithID)
utils.ZeroOutTimestamps(consumer.Consumer)
group.Consumers = append(group.Consumers, consumer.Consumer)
}
}
}
for _, plugin := range plugins {
if plugin.ID != nil && cg.ID != nil {
if *plugin.ConsumerGroup.ID == *cg.ID {
utils.ZeroOutID(plugin, plugin.Name, config.WithID)
utils.ZeroOutID(plugin.ConsumerGroup, plugin.ConsumerGroup.Name, config.WithID)
utils.ZeroOutTimestamps(plugin.ConsumerGroupPlugin.ConsumerGroup)
group.Plugins = append(group.Plugins, &plugin.ConsumerGroupPlugin)
}
}
}
utils.ZeroOutID(&group, group.Name, config.WithID)
utils.ZeroOutTimestamps(&group)
file.ConsumerGroups = append(file.ConsumerGroups, group)
}
sort.SliceStable(file.ConsumerGroups, func(i, j int) bool {
return compareOrder(file.ConsumerGroups[i], file.ConsumerGroups[j])
})
return nil
}

func WriteContentToFile(content *Content, filename string, format Format) error {
var c []byte
var err error
Expand Down
Loading

0 comments on commit 562c595

Please sign in to comment.