Skip to content

Commit

Permalink
PR Review: cleaning up tests, fix tests for ent/ce, reuse value for
Browse files Browse the repository at this point in the history
default sleep time
  • Loading branch information
jm96441n committed May 28, 2024
1 parent 679f448 commit d4c801b
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 85 deletions.
2 changes: 1 addition & 1 deletion dependency/catalog_datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

// CatalogDatacentersQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
CatalogDatacentersQuerySleepTime = 15 * time.Second
CatalogDatacentersQuerySleepTime = DefaultNonBlockingQuerySleepTime
)

// CatalogDatacentersQuery is the dependency to query all datacenters
Expand Down
43 changes: 11 additions & 32 deletions dependency/consul_exported_services.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
package dependency

import (
"fmt"
"log"
"net/url"
"slices"
"time"

capi "github.com/hashicorp/consul/api"
"github.com/pkg/errors"
)

const (
exportedServicesEndpointLabel = "list.exportedServices"

// ListExportedServicesQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
ListExportedServicesQuerySleepTime = 15 * time.Second
)
const exportedServicesEndpointLabel = "list.exportedServices"

// Ensure implements
var _ Dependency = (*ListExportedServicesQuery)(nil)
Expand Down Expand Up @@ -77,12 +70,18 @@ func fromConsulExportedService(svc capi.ExportedService) ExportedService {
// NewListExportedServicesQuery parses a string of the format @dc.
func NewListExportedServicesQuery(s string) (*ListExportedServicesQuery, error) {
return &ListExportedServicesQuery{
stopCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
partition: s,
}, nil
}

func (c *ListExportedServicesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-c.stopCh:
return nil, nil, ErrStopped
default:
}

opts = opts.Merge(&QueryOptions{
ConsulPartition: c.partition,
})
Expand All @@ -92,28 +91,9 @@ func (c *ListExportedServicesQuery) Fetch(clients *ClientSet, opts *QueryOptions
RawQuery: opts.String(),
})

// This is certainly not elegant, but the partitions endpoint does not support
// blocking queries, so we are going to "fake it until we make it". When we
// first query, the LastIndex will be "0", meaning we should immediately
// return data, but future calls will include a LastIndex. If we have a
// LastIndex in the query metadata, sleep for 15 seconds before asking Consul
// again.
//
// This is probably okay given the frequency in which partitions actually
// change, but is technically not edge-triggering.
if opts.WaitIndex != 0 {
log.Printf("[TRACE] %s: long polling for %s", c, ListExportedServicesQuerySleepTime)

select {
case <-c.stopCh:
return nil, nil, ErrStopped
case <-time.After(ListExportedServicesQuerySleepTime):
}
}

consulExportedServices, qm, err := clients.Consul().ConfigEntries().List(capi.ExportedServices, opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrapf(err, c.String())
return nil, nil, fmt.Errorf("%s: %w", c.String(), err)
}

exportedServices := make([]ExportedService, 0, len(consulExportedServices))
Expand Down Expand Up @@ -143,8 +123,7 @@ func (c *ListExportedServicesQuery) Fetch(clients *ClientSet, opts *QueryOptions
return exportedServices, rm, nil
}

// CanShare returns if this dependency is shareable.
// TODO What is this?
// CanShare returns if this dependency is shareable when consul-template is running in de-duplication mode.
func (c *ListExportedServicesQuery) CanShare() bool {
return true
}
Expand Down
124 changes: 111 additions & 13 deletions dependency/consul_exported_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ import (

func TestListExportedServicesQuery_Fetch(t *testing.T) {
testCases := map[string]struct {
partition string
exportedServices *capi.ExportedServicesConfigEntry
expected []ExportedService
partition string
skipIfNonEnterprise bool
exportedServices *capi.ExportedServicesConfigEntry
expected []ExportedService
}{
//"no services": {},
"no services": {
partition: defaultOrEmtpyString(),
exportedServices: nil,
expected: []ExportedService{},
},
"default partition - one exported service - partitions set": {
partition: "default",
partition: "default",
skipIfNonEnterprise: !tenancyHelper.IsConsulEnterprise(),
exportedServices: &capi.ExportedServicesConfigEntry{
Name: "default",
Partition: "default",
Expand All @@ -43,7 +49,8 @@ func TestListExportedServicesQuery_Fetch(t *testing.T) {
},
},
"default partition - multiple exported services - partitions set": {
partition: "default",
partition: "default",
skipIfNonEnterprise: !tenancyHelper.IsConsulEnterprise(),
exportedServices: &capi.ExportedServicesConfigEntry{
Name: "default",
Partition: "default",
Expand Down Expand Up @@ -86,7 +93,8 @@ func TestListExportedServicesQuery_Fetch(t *testing.T) {
},
},
"non default partition - multiple exported services - partitions set": {
partition: "foo",
partition: "foo",
skipIfNonEnterprise: !tenancyHelper.IsConsulEnterprise(),
exportedServices: &capi.ExportedServicesConfigEntry{
Name: "foo",
Partition: "foo",
Expand Down Expand Up @@ -128,13 +136,93 @@ func TestListExportedServicesQuery_Fetch(t *testing.T) {
},
},
},
"default partition - one exported service - peers set": {
partition: defaultOrEmtpyString(),
skipIfNonEnterprise: false,
exportedServices: &capi.ExportedServicesConfigEntry{
Name: "default",
Partition: defaultOrEmtpyString(),
Services: []capi.ExportedService{
{
Name: "service1",
Consumers: []capi.ServiceConsumer{
{
Peer: "another",
},
},
},
},
},
expected: []ExportedService{
{
Service: "service1",
Consumers: ResolvedConsumers{
Peers: []string{"another"},
Partitions: []string{},
SamenessGroups: []string{},
},
},
},
},
"default partition - multiple exported services - peers set": {
partition: defaultOrEmtpyString(),
skipIfNonEnterprise: false,
exportedServices: &capi.ExportedServicesConfigEntry{
Name: "default",
Partition: defaultOrEmtpyString(),
Services: []capi.ExportedService{
{
Name: "service1",
Consumers: []capi.ServiceConsumer{
{
Peer: "another",
},
},
},
{
Name: "service2",
Consumers: []capi.ServiceConsumer{
{
Peer: "another",
},
},
},
},
},
expected: []ExportedService{
{
Service: "service1",
Consumers: ResolvedConsumers{
Peers: []string{"another"},
Partitions: []string{},
SamenessGroups: []string{},
},
},
{
Service: "service2",
Consumers: ResolvedConsumers{
Peers: []string{"another"},
Partitions: []string{},
SamenessGroups: []string{},
},
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
_, _, err := testClients.Consul().ConfigEntries().Set(tc.exportedServices, &capi.WriteOptions{Partition: tc.partition})
if err != nil {
t.Fatalf("unexpected error: %v", err)
if tc.skipIfNonEnterprise {
t.Skipf("skipping test %q as Consul is not enterprise", name)
}

opts := &capi.WriteOptions{Partition: tc.partition}

if tc.exportedServices != nil {
_, _, err := testClients.Consul().ConfigEntries().Set(tc.exportedServices, opts)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

q, err := NewListExportedServicesQuery(tc.partition)
Expand All @@ -145,9 +233,19 @@ func TestListExportedServicesQuery_Fetch(t *testing.T) {

require.ElementsMatch(t, tc.expected, actual)

// need to clean up because we use a single shared consul instance
_, err = testClients.Consul().ConfigEntries().Delete(capi.ExportedServices, tc.exportedServices.Name, &capi.WriteOptions{Partition: tc.partition})
require.NoError(t, err)
if tc.exportedServices != nil {
// need to clean up because we use a single shared consul instance
_, err = testClients.Consul().ConfigEntries().Delete(capi.ExportedServices, tc.exportedServices.Name, opts)
require.NoError(t, err)
}
})
}
}

func defaultOrEmtpyString() string {
if tenancyHelper.IsConsulEnterprise() {
return "default"
}

return ""
}
16 changes: 9 additions & 7 deletions dependency/consul_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package dependency

import (
"context"
"fmt"
"log"
"net/url"
"slices"
"strings"
"time"

"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
)

// Ensure implements
var (
// Ensure implements
_ Dependency = (*ListPartitionsQuery)(nil)

// ListPartitionsQuerySleepTime is the amount of time to sleep between
// queries, since the endpoint does not support blocking queries.
ListPartitionsQuerySleepTime = 15 * time.Second
ListPartitionsQuerySleepTime = DefaultNonBlockingQuerySleepTime
)

// Partition is a partition in Consul.
Expand Down Expand Up @@ -66,10 +66,13 @@ func (c *ListPartitionsQuery) Fetch(clients *ClientSet, opts *QueryOptions) (int
}
}

// TODO Consider using a proper context
partitions, _, err := clients.Consul().Partitions().List(context.Background(), opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrapf(err, c.String())
if strings.Contains(err.Error(), "Invalid URL path") {
return nil, nil, fmt.Errorf("%s: Partitions are an enterprise feature: %w", c.String(), err)
}

return nil, nil, fmt.Errorf("%s: %w", c.String(), err)
}

log.Printf("[TRACE] %s: returned %d results", c, len(partitions))
Expand All @@ -93,8 +96,7 @@ func (c *ListPartitionsQuery) Fetch(clients *ClientSet, opts *QueryOptions) (int
return respWithMetadata(resp)
}

// CanShare returns if this dependency is shareable.
// TODO What is this?
// CanShare returns if this dependency is shareable when consul-template is running in de-duplication mode.
func (c *ListPartitionsQuery) CanShare() bool {
return true
}
Expand Down
46 changes: 15 additions & 31 deletions dependency/consul_partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,16 @@ func init() {
ListPartitionsQuerySleepTime = 50 * time.Millisecond
}

func TestNewListPartitionsQuery(t *testing.T) {
cases := []struct {
name string
exp *ListPartitionsQuery
err bool
}{
{
"empty",
&ListPartitionsQuery{},
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
act, err := NewListPartitionsQuery()
if !tc.err {
require.NoError(t, err)
} else {
assert.Error(t, err)
}

if act != nil {
act.stopCh = nil
}

assert.Equal(t, tc.exp, act)
})
func TestListPartitionsQuery_Fetch(t *testing.T) {
if !tenancyHelper.IsConsulEnterprise() {
t.Skip("Enterprise only test")
}
}

func TestListPartitionsQuery_Fetch(t *testing.T) {
expected := []*Partition{
{
Name: "default",
Description: "Builtin Default Partition",
},

{
Name: "foo",
Description: "",
Expand All @@ -63,3 +35,15 @@ func TestListPartitionsQuery_Fetch(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, expected, act)
}

func TestListPartitionsQuery_FetchError(t *testing.T) {
if tenancyHelper.IsConsulEnterprise() {
t.Skip("CE only test")
}

d, err := NewListPartitionsQuery()
require.NoError(t, err)

_, _, err = d.Fetch(testClients, nil)
require.Error(t, err)
}
2 changes: 2 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
nvListPrefixRe = `/?(?P<prefix>[^@]*)`
nvListNSRe = `(@(?P<namespace>([[:word:]\-\_]+|\*)))?`
nvRegionRe = `(\.(?P<region>[[:word:]\-\_]+))?`

DefaultNonBlockingQuerySleepTime = 15 * time.Second
)

type Type int
Expand Down
2 changes: 1 addition & 1 deletion dependency/vault_agent_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ Dependency = (*VaultAgentTokenQuery)(nil)
const (
// VaultAgentTokenSleepTime is the amount of time to sleep between queries, since
// the fsnotify library is not compatible with solaris and other OSes yet.
VaultAgentTokenSleepTime = 15 * time.Second
VaultAgentTokenSleepTime = DefaultNonBlockingQuerySleepTime
)

// VaultAgentTokenQuery is the dependency to Vault Agent token
Expand Down

0 comments on commit d4c801b

Please sign in to comment.