Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor service registration #8976

Merged
merged 6 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,6 @@ func (c *ServerCommand) Run(args []string) int {
return 1
}

// Instantiate the wait group
c.WaitGroup = &sync.WaitGroup{}

// Initialize the Service Discovery, if there is one
var configSR sr.ServiceRegistration
if config.ServiceRegistration != nil {
Expand All @@ -990,15 +987,11 @@ func (c *ServerCommand) Run(args []string) int {
IsActive: false,
IsPerformanceStandby: false,
}
configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger, state, config.Storage.RedirectAddr)
configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger, state)
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
if err := configSR.Run(c.ShutdownCh, c.WaitGroup); err != nil {
c.UI.Error(fmt.Sprintf("Error running service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
}

infoKeys := make([]string, 0, 10)
Expand Down Expand Up @@ -1311,7 +1304,7 @@ CLUSTER_SYNTHESIS_COMPLETE:

// If ServiceRegistration is configured, then the backend must support HA
isBackendHA := coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled()
if !c.flagDev && (coreConfig.ServiceRegistration != nil) && !isBackendHA {
if !c.flagDev && (coreConfig.GetServiceRegistration() != nil) && !isBackendHA {
c.UI.Output("service_registration is configured, but storage does not support HA")
return 1
}
Expand Down Expand Up @@ -1578,6 +1571,18 @@ CLUSTER_SYNTHESIS_COMPLETE:
}

// Perform initialization of HTTP server after the verifyOnly check.

// Instantiate the wait group
c.WaitGroup = &sync.WaitGroup{}

// If service discovery is available, run service discovery
if sd := coreConfig.GetServiceRegistration(); sd != nil {
if err := configSR.Run(c.ShutdownCh, c.WaitGroup, coreConfig.RedirectAddr); err != nil {
c.UI.Error(fmt.Sprintf("Error running service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
}

// If we're in Dev mode, then initialize the core
if c.flagDev && !c.flagDevSkipInit {
init, err := c.enableDev(core, coreConfig)
Expand Down
13 changes: 5 additions & 8 deletions serviceregistration/consul/consul_service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type serviceRegistration struct {
serviceAddress *string
disableRegistration bool
checkTimeout time.Duration
redirectAddr string

notifyActiveCh chan struct{}
notifySealedCh chan struct{}
Expand All @@ -78,8 +77,7 @@ type serviceRegistration struct {
}

// NewConsulServiceRegistration constructs a Consul-based ServiceRegistration.
func NewServiceRegistration(conf map[string]string, logger log.Logger, state sr.State, redirectAddr string) (sr.ServiceRegistration, error) {

func NewServiceRegistration(conf map[string]string, logger log.Logger, state sr.State) (sr.ServiceRegistration, error) {
// Allow admins to disable consul integration
disableReg, ok := conf["disable_registration"]
var disableRegistration bool
Expand Down Expand Up @@ -208,7 +206,6 @@ func NewServiceRegistration(conf map[string]string, logger log.Logger, state sr.
serviceAddress: serviceAddr,
checkTimeout: checkTimeout,
disableRegistration: disableRegistration,
redirectAddr: redirectAddr,

notifyActiveCh: make(chan struct{}),
notifySealedCh: make(chan struct{}),
Expand All @@ -221,9 +218,9 @@ func NewServiceRegistration(conf map[string]string, logger log.Logger, state sr.
return c, nil
}

func (c *serviceRegistration) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup) error {
func (c *serviceRegistration) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup, redirectAddr string) error {
go func() {
if err := c.runServiceRegistration(wait, shutdownCh, c.redirectAddr); err != nil {
if err := c.runServiceRegistration(wait, shutdownCh, redirectAddr); err != nil {
if c.logger.IsError() {
c.logger.Error(fmt.Sprintf("error running service registration: %s", err))
}
Expand Down Expand Up @@ -290,12 +287,12 @@ func (c *serviceRegistration) runServiceRegistration(waitGroup *sync.WaitGroup,
// 'server' command will wait for the below goroutine to complete
waitGroup.Add(1)

go c.runEventDemuxer(waitGroup, shutdownCh, redirectAddr)
go c.runEventDemuxer(waitGroup, shutdownCh)

return nil
}

func (c *serviceRegistration) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh <-chan struct{}, redirectAddr string) {
func (c *serviceRegistration) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh <-chan struct{}) {
// This defer statement should be executed last. So push it first.
defer waitGroup.Done()

Expand Down
30 changes: 16 additions & 14 deletions serviceregistration/consul/consul_service_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func testConsulServiceRegistrationConfig(t *testing.T, conf *consulConf) *servic
defer func() {
close(shutdownCh)
}()
be, err := NewServiceRegistration(*conf, logger, sr.State{}, "")
be, err := NewServiceRegistration(*conf, logger, sr.State{})
if err != nil {
t.Fatalf("Expected Consul to initialize: %v", err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -69,8 +69,10 @@ func TestConsul_ServiceRegistration(t *testing.T) {
waitForServices := func(t *testing.T, expected map[string][]string) map[string][]string {
t.Helper()
// Wait for up to 10 seconds
var services map[string][]string
var err error
for i := 0; i < 10; i++ {
services, _, err := client.Catalog().Services(nil)
services, _, err = client.Catalog().Services(nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -79,7 +81,7 @@ func TestConsul_ServiceRegistration(t *testing.T) {
}
time.Sleep(time.Second)
}
t.Fatalf("Catalog Services never reached expected state %v", expected)
t.Fatalf("Catalog Services never reached: got: %v, expected state: %v", services, expected)
return nil
}

Expand All @@ -94,11 +96,11 @@ func TestConsul_ServiceRegistration(t *testing.T) {
sd, err := NewServiceRegistration(map[string]string{
"address": addr,
"token": token,
}, logger, sr.State{}, redirectAddr)
}, logger, sr.State{})
if err != nil {
t.Fatal(err)
}
if err := sd.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := sd.Run(shutdownCh, &sync.WaitGroup{}, redirectAddr); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -167,11 +169,11 @@ func TestConsul_ServiceTags(t *testing.T) {
close(shutdownCh)
}()

be, err := NewServiceRegistration(consulConfig, logger, sr.State{}, "")
be, err := NewServiceRegistration(consulConfig, logger, sr.State{})
if err != nil {
t.Fatal(err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -226,11 +228,11 @@ func TestConsul_ServiceAddress(t *testing.T) {
shutdownCh := make(chan struct{})
logger := logging.NewVaultLogger(log.Debug)

be, err := NewServiceRegistration(test.consulConfig, logger, sr.State{}, "")
be, err := NewServiceRegistration(test.consulConfig, logger, sr.State{})
if err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -355,7 +357,7 @@ func TestConsul_newConsulServiceRegistration(t *testing.T) {
shutdownCh := make(chan struct{})
logger := logging.NewVaultLogger(log.Debug)

be, err := NewServiceRegistration(test.consulConfig, logger, sr.State{}, "")
be, err := NewServiceRegistration(test.consulConfig, logger, sr.State{})
if test.fail {
if err == nil {
t.Fatalf(`Expected config "%s" to fail`, test.name)
Expand All @@ -365,7 +367,7 @@ func TestConsul_newConsulServiceRegistration(t *testing.T) {
} else if !test.fail && err != nil {
t.Fatalf("Expected config %s to not fail: %v", test.name, err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -559,7 +561,7 @@ func TestConsul_serviceID(t *testing.T) {
shutdownCh := make(chan struct{})
be, err := NewServiceRegistration(consulConf{
"service": test.serviceName,
}, logger, sr.State{}, "")
}, logger, sr.State{})
if !test.valid {
if err == nil {
t.Fatalf("expected an error initializing for name %q", test.serviceName)
Expand All @@ -569,7 +571,7 @@ func TestConsul_serviceID(t *testing.T) {
if test.valid && err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}); err != nil {
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

Expand Down
37 changes: 22 additions & 15 deletions serviceregistration/kubernetes/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,33 @@ var (
ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")
)

// Client is a minimal Kubernetes client. We rolled our own because the existing
// Kubernetes client-go library available externally has a high number of dependencies
// and we thought it wasn't worth it for only two API calls. If at some point they break
// the client into smaller modules, or if we add quite a few methods to this client, it may
// be worthwhile to revisit that decision.
type Client struct {
logger hclog.Logger
config *Config
stopCh chan struct{}
}

// New instantiates a Client. The stopCh is used for exiting retry loops
// when closed.
func New(logger hclog.Logger, stopCh <-chan struct{}) (*Client, error) {
func New(logger hclog.Logger) (*Client, error) {
config, err := inClusterConfig()
if err != nil {
return nil, err
}
return &Client{
logger: logger,
config: config,
stopCh: stopCh,
stopCh: make(chan struct{}),
tyrannosaurus-becks marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

// Client is a minimal Kubernetes client. We rolled our own because the existing
// Kubernetes client-go library available externally has a high number of dependencies
// and we thought it wasn't worth it for only two API calls. If at some point they break
// the client into smaller modules, or if we add quite a few methods to this client, it may
// be worthwhile to revisit that decision.
type Client struct {
logger hclog.Logger
config *Config
stopCh <-chan struct{}
func (c *Client) Shutdown() {
close(c.stopCh)
}

// GetPod gets a pod from the Kubernetes API.
Expand Down Expand Up @@ -132,10 +136,13 @@ func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error {
// a stop from our stopChan. This allows us to exit from our retry
// loop during a shutdown, rather than hanging.
ctx, cancelFunc := context.WithCancel(context.Background())
go func(stopCh <-chan struct{}) {
<-stopCh
cancelFunc()
}(c.stopCh)
go func() {
select {
case <-ctx.Done():
case <-c.stopCh:
cancelFunc()
}
}()
retryableReq.WithContext(ctx)

retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken)
Expand Down
2 changes: 1 addition & 1 deletion serviceregistration/kubernetes/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestClient(t *testing.T) {
t.Fatal(err)
}

client, err := New(hclog.Default(), make(chan struct{}))
client, err := New(hclog.Default())
if err != nil {
t.Fatal(err)
}
Expand Down
84 changes: 56 additions & 28 deletions serviceregistration/kubernetes/client/cmd/kubeclient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"encoding/json"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/serviceregistration/kubernetes/client"
Expand All @@ -42,39 +45,64 @@ func init() {
func main() {
flag.Parse()

c, err := client.New(hclog.Default(), make(chan struct{}))
c, err := client.New(hclog.Default())
if err != nil {
panic(err)
}

switch callToMake {
case "get-pod":
pod, err := c.GetPod(namespace, podName)
if err != nil {
panic(err)
}
b, _ := json.Marshal(pod)
fmt.Printf("pod: %s\n", b)
return
case "patch-pod":
patchPairs := strings.Split(patchesToAdd, ",")
var patches []*client.Patch
for _, patchPair := range patchPairs {
fields := strings.Split(patchPair, ":")
if len(fields) != 2 {
panic(fmt.Errorf("unable to split %s from selectors provided of %s", fields, patchesToAdd))
reqCh := make(chan struct{})
shutdownCh := makeShutdownCh()

go func() {
defer close(reqCh)

switch callToMake {
case "get-pod":
pod, err := c.GetPod(namespace, podName)
if err != nil {
panic(err)
}
patches = append(patches, &client.Patch{
Operation: client.Replace,
Path: fields[0],
Value: fields[1],
})
}
if err := c.PatchPod(namespace, podName, patches...); err != nil {
panic(err)
b, _ := json.Marshal(pod)
fmt.Printf("pod: %s\n", b)
return
case "patch-pod":
patchPairs := strings.Split(patchesToAdd, ",")
var patches []*client.Patch
for _, patchPair := range patchPairs {
fields := strings.Split(patchPair, ":")
if len(fields) != 2 {
panic(fmt.Errorf("unable to split %s from selectors provided of %s", fields, patchesToAdd))
}
patches = append(patches, &client.Patch{
Operation: client.Replace,
Path: fields[0],
Value: fields[1],
})
}
if err := c.PatchPod(namespace, podName, patches...); err != nil {
panic(err)
}
return
default:
panic(fmt.Errorf(`unsupported call provided: %q`, callToMake))
}
return
default:
panic(fmt.Errorf(`unsupported call provided: %q`, callToMake))
}()

select {
case <-shutdownCh:
fmt.Println("Interrupt received, exiting...")
case <-reqCh:
}
}

func makeShutdownCh() chan struct{} {
resultCh := make(chan struct{})

shutdownCh := make(chan os.Signal, 4)
signal.Notify(shutdownCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-shutdownCh
close(resultCh)
}()
return resultCh
}
Loading