Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-9467, NET-9468] Add partitions + exportedServices funcs #1940

Merged
merged 11 commits into from
May 28, 2024
Merged
2 changes: 1 addition & 1 deletion dependency/catalog_datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

// CatalogDatacentersQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
CatalogDatacentersQuerySleepTime = 15 * time.Second
CatalogDatacentersQuerySleepTime = DefaultNonBlockingQuerySleepTime
)

// CatalogDatacentersQuery is the dependency to query all datacenters
Expand Down
2 changes: 1 addition & 1 deletion dependency/catalog_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
// Ensure implements
_ Dependency = (*CatalogServicesQuery)(nil)

// CatalogServicesQueryRe is the regular expression to use for CatalogNodesQuery.
// CatalogServicesQueryRe is the regular expression to use for CatalogServicesQuery.
CatalogServicesQueryRe = regexp.MustCompile(`\A` + queryRe + dcRe + `\z`)
)

Expand Down
141 changes: 141 additions & 0 deletions dependency/consul_exported_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package dependency

import (
"fmt"
"log"
"net/url"
"slices"

capi "github.com/hashicorp/consul/api"
)

const exportedServicesEndpointLabel = "list.exportedServices"

// Ensure implements
var _ Dependency = (*ListExportedServicesQuery)(nil)

// ListExportedServicesQuery is the representation of a requested exported services
// dependency from inside a template.
type ListExportedServicesQuery struct {
stopCh chan struct{}
partition string
}

type ExportedService struct {
// Name of the service
Service string

// Partition of the service
Partition string

// Namespace of the service
Namespace string

// Consumers is a list of downstream consumers of the service.
Consumers ResolvedConsumers
}

type ResolvedConsumers struct {
Peers []string
Partitions []string
SamenessGroups []string
}

func fromConsulExportedService(svc capi.ExportedService) ExportedService {
peers := make([]string, 0, len(svc.Consumers))
partitions := make([]string, 0, len(svc.Consumers))
samenessGroups := make([]string, 0, len(svc.Consumers))
for _, consumer := range svc.Consumers {
if consumer.Peer != "" {
peers = append(peers, consumer.Peer)
}
if consumer.Partition != "" {
partitions = append(partitions, consumer.Partition)
}
if consumer.SamenessGroup != "" {
samenessGroups = append(samenessGroups, consumer.SamenessGroup)
}
}

return ExportedService{
Service: svc.Name,
Consumers: ResolvedConsumers{
Peers: peers,
Partitions: partitions,
SamenessGroups: samenessGroups,
},
}
}

// NewListExportedServicesQuery parses a string of the format @dc.
func NewListExportedServicesQuery(s string) (*ListExportedServicesQuery, error) {
return &ListExportedServicesQuery{
stopCh: make(chan struct{}),
partition: s,
}, nil
}

func (c *ListExportedServicesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-c.stopCh:
return nil, nil, ErrStopped
default:
}

opts = opts.Merge(&QueryOptions{
ConsulPartition: c.partition,
})

log.Printf("[TRACE] %s: GET %s", c, &url.URL{
Path: "/v1/config/exported-services",
RawQuery: opts.String(),
})

consulExportedServices, qm, err := clients.Consul().ConfigEntries().List(capi.ExportedServices, opts.ToConsulOpts())
if err != nil {
return nil, nil, fmt.Errorf("%s: %w", c.String(), err)
}

exportedServices := make([]ExportedService, 0, len(consulExportedServices))
for _, cfgEntry := range consulExportedServices {
svc := cfgEntry.(*capi.ExportedServicesConfigEntry)
for _, svc := range svc.Services {
exportedServices = append(exportedServices, fromConsulExportedService(svc))
}
}

log.Printf("[TRACE] %s: returned %d results", c, len(exportedServices))

slices.SortStableFunc(exportedServices, func(i, j ExportedService) int {
if i.Service < j.Service {
return -1
} else if i.Service > j.Service {
return 1
}
return 0
})

rm := &ResponseMetadata{
LastContact: qm.LastContact,
LastIndex: qm.LastIndex,
}

return exportedServices, rm, nil
}

// CanShare returns if this dependency is shareable when consul-template is running in de-duplication mode.
func (c *ListExportedServicesQuery) CanShare() bool {
return true
}

func (c *ListExportedServicesQuery) String() string {
return exportedServicesEndpointLabel
}

func (c *ListExportedServicesQuery) Stop() {
close(c.stopCh)
}

func (c *ListExportedServicesQuery) Type() Type {
return TypeConsul
}
Loading