Skip to content

Commit

Permalink
purge services on disable of sync catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jm96441n committed Oct 1, 2024
1 parent e959d33 commit 2cc3fe3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 63 deletions.
4 changes: 3 additions & 1 deletion charts/consul/templates/sync-catalog-deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{{- if (or (and (ne (.Values.syncCatalog.enabled | toString) "-") .Values.syncCatalog.enabled) (and (eq (.Values.syncCatalog.enabled | toString) "-") .Values.global.enabled)) }}
{{ - $syncCatalogEnabled := (or (and (ne (.Values.syncCatalog.enabled | toString) "-") .Values.syncCatalog.enabled) (and (eq (.Values.syncCatalog.enabled | toString) "-") .Values.global.enabled)) }}
{{- if (or $syncCatalogEnabled .Values.syncCatalog.purgeServicesOnDisable) }}
{{- template "consul.reservedNamesFailer" (list .Values.syncCatalog.consulNamespaces.consulDestinationNamespace "syncCatalog.consulNamespaces.consulDestinationNamespace") }}
{{ template "consul.validateRequiredCloudSecretsExist" . }}
{{ template "consul.validateCloudSecretKeys" . }}
Expand Down Expand Up @@ -213,6 +214,7 @@ spec:
-metrics-port={{ .Values.syncCatalog.metrics.port }} \
{{- end }}
-prometheus-retention-time={{ .Values.global.metrics.agentMetricsRetentionTime }} \
-purgeServicesOnDisable={{ and (not $syncCatalogEnabled) .Values.syncCatalog.purgeServicesOnDisable }} \
livenessProbe:
httpGet:
path: /health/ready
Expand Down
4 changes: 4 additions & 0 deletions charts/consul/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,10 @@ syncCatalog:
# global.enabled.
enabled: false

# True if you want to deregister all services in this cluster from consul that have been registered by the catalog sync when the `enabled` flag is set to false.
# @type: boolean
purgeServicesOnDisable: false

# The name of the Docker image (including any tag) for consul-k8s-control-plane
# to run the sync program.
# @type: string
Expand Down
71 changes: 9 additions & 62 deletions control-plane/subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"

"github.com/armon/go-metrics/prometheus"
"github.com/cenkalti/backoff"
mapset "github.com/deckarep/golang-set"
"github.com/hashicorp/consul-server-connection-manager/discovery"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/cli"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand All @@ -37,7 +34,6 @@ import (
"github.com/hashicorp/consul-k8s/control-plane/helper/controller"
"github.com/hashicorp/consul-k8s/control-plane/subcommand"
"github.com/hashicorp/consul-k8s/control-plane/subcommand/common"
metricsutil "github.com/hashicorp/consul-k8s/control-plane/subcommand/common"
"github.com/hashicorp/consul-k8s/control-plane/subcommand/flags"
)

Expand Down Expand Up @@ -67,7 +63,7 @@ type Command struct {
flagAddK8SNamespaceSuffix bool
flagLogLevel string
flagLogJSON bool
flagPurgeK8SServicesFromNode string
flagPurgeK8SServicesFromNode bool
flagFilter string

// Flags to support namespaces
Expand Down Expand Up @@ -156,8 +152,8 @@ func (c *Command) init() {
"\"debug\", \"info\", \"warn\", and \"error\".")
c.flags.BoolVar(&c.flagLogJSON, "log-json", false,
"Enable or disable JSON output format for logging.")
c.flags.StringVar(&c.flagPurgeK8SServicesFromNode, "purge-k8s-services-from-node", "",
"Specifies the name of the Consul node for which to deregister synced Kubernetes services.")
c.flags.BoolVar(&c.flagPurgeK8SServicesFromNode, "purge-k8s-services-from-node", false,
"Specifies if services should be purged from the Consul node. If set, all K8S services will be removed from the Consul node.")
c.flags.StringVar(&c.flagFilter, "filter", "",
"Specifies the expression used to filter the services on the Consul node that will be deregistered. "+
"The syntax for this filter is the same as the syntax used in the List Services for Node API in the Consul catalog.")
Expand Down Expand Up @@ -462,64 +458,15 @@ func (c *Command) Run(args []string) int {

// remove all k8s services from Consul.
func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) error {
node, _, err := consulClient.Catalog().NodeServiceList(c.flagPurgeK8SServicesFromNode, &api.QueryOptions{Filter: c.flagFilter})
_, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: c.flagConsulNodeName,
Partition: c.consul.Partition,
}, nil)
if err != nil {
c.UI.Error(fmt.Sprintf("unable to deregister all K8S services from Consul: %s", err))
return err
}

var firstErr error
services := node.Services
batchSize := 300
maxRetries := 2
retryDelay := 200 * time.Millisecond

// Ask for user confirmation before purging services
for {
c.UI.Info(fmt.Sprintf("Are you sure you want to delete %v K8S services from %v? (y/n): ", len(services), c.flagPurgeK8SServicesFromNode))
var input string
fmt.Scanln(&input)
if input = strings.ToLower(input); input == "y" {
break
} else if input == "n" {
return nil
} else {
c.UI.Info("Invalid input. Please enter 'y' or 'n'.")
}
}

for i := 0; i < len(services); i += batchSize {
end := i + batchSize
if end > len(services) {
end = len(services)
}

var eg errgroup.Group
for _, service := range services[i:end] {
s := service
eg.Go(func() error {
var b backoff.BackOff = backoff.NewConstantBackOff(retryDelay)
b = backoff.WithMaxRetries(b, uint64(maxRetries))
return backoff.Retry(func() error {
_, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: c.flagPurgeK8SServicesFromNode,
ServiceID: s.ID,
}, nil)
return err
}, b)
})
}
if err := eg.Wait(); err != nil {
if firstErr == nil {
c.UI.Info("Some K8S services were not deregistered from Consul")
firstErr = err
}
}
c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", end-i, c.flagPurgeK8SServicesFromNode))
}

if firstErr != nil {
return firstErr
}
c.UI.Info("All K8S services were deregistered from Consul")
return nil
}
Expand Down Expand Up @@ -570,7 +517,7 @@ func (c *Command) validateFlags() error {
}

if c.flagMetricsPort != "" {
if _, valid := metricsutil.ParseScrapePort(c.flagMetricsPort); !valid {
if _, valid := common.ParseScrapePort(c.flagMetricsPort); !valid {
return errors.New("-metrics-port must be a valid unprivileged port number")
}
}
Expand Down

0 comments on commit 2cc3fe3

Please sign in to comment.