-
Notifications
You must be signed in to change notification settings - Fork 16
Add fallback to no gRPC #454
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -172,11 +172,15 @@ func (c *Command) Run(args []string) int { | |
MirrorKubernetesNamespacePrefix: c.flagMirrorK8SNamespacePrefix, | ||
} | ||
|
||
consulHTTPAddressOrCommand, port, err := parseConsulHTTPAddress() | ||
consulScheme, consulHTTPAddressOrCommand, port, err := parseConsulHTTPAddress() | ||
if err != nil { | ||
logger.Error("error reading "+consulHTTPAddressEnvName, "error", err) | ||
return 1 | ||
} | ||
if consulCfg.Scheme != "https" { | ||
// override only if it needs to be explicitly marked | ||
consulCfg.Scheme = consulScheme | ||
} | ||
Comment on lines
+180
to
+183
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, this is to say, if we already have the scheme set to |
||
|
||
tlsCfg, err := api.SetupTLSConfig(&consulCfg.TLSConfig) | ||
if err != nil { | ||
|
@@ -201,8 +205,10 @@ func (c *Command) Run(args []string) int { | |
} | ||
|
||
consulClientConfig := consul.ClientConfig{ | ||
Name: "api-gateway-controller", | ||
ApiClientConfig: consulCfg, | ||
Addresses: consulHTTPAddressOrCommand, | ||
UseDynamic: os.Getenv("CONSUL_DYNAMIC_SERVER_DISCOVERY") == "true", | ||
HTTPPort: port, | ||
GRPCPort: grpcPort(), | ||
PlainText: consulCfg.Scheme == "http", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,10 @@ type Client interface { | |
} | ||
|
||
type ClientConfig struct { | ||
Name string | ||
Namespace string | ||
ApiClientConfig *api.Config | ||
UseDynamic bool | ||
PlainText bool | ||
Addresses string | ||
HTTPPort int | ||
|
@@ -81,6 +84,53 @@ func (c *client) Wait(until time.Duration) error { | |
} | ||
|
||
func (c *client) WatchServers(ctx context.Context) error { | ||
if !c.config.UseDynamic { | ||
cfg := c.config.ApiClientConfig | ||
cfg.Address = fmt.Sprintf("%s:%d", c.config.Addresses, c.config.HTTPPort) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not entirely specific to this line, but do we have unit test coverage for checking that both |
||
|
||
var err error | ||
var client *api.Client | ||
var token string | ||
if c.config.Credentials.Type == discovery.CredentialsTypeLogin { | ||
baseClient, err := api.NewClient(cfg) | ||
if err != nil { | ||
c.initialized <- err | ||
return err | ||
} | ||
if c.config.Namespace != "" { | ||
cfg.Namespace = c.config.Namespace | ||
} | ||
client, token, err = login(ctx, baseClient, cfg, c.config) | ||
if err != nil { | ||
c.initialized <- err | ||
return err | ||
} | ||
defer logout(baseClient, token, c.config) | ||
|
||
} else { | ||
// this might be empty | ||
cfg.Token = c.config.Credentials.Static.Token | ||
if c.config.Namespace != "" { | ||
cfg.Namespace = c.config.Namespace | ||
} | ||
client, err = api.NewClient(cfg) | ||
if err != nil { | ||
c.initialized <- err | ||
return err | ||
} | ||
} | ||
|
||
c.mutex.Lock() | ||
c.client = client | ||
c.token = cfg.Token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Edit: reading the |
||
c.mutex.Unlock() | ||
|
||
close(c.initialized) | ||
|
||
<-ctx.Done() | ||
return nil | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
c.stop = cancel | ||
|
||
|
@@ -124,6 +174,9 @@ func (c *client) WatchServers(ctx context.Context) error { | |
} | ||
updateClient := func(s discovery.State) error { | ||
cfg := c.config.ApiClientConfig | ||
if c.config.Namespace != "" { | ||
cfg.Namespace = c.config.Namespace | ||
} | ||
cfg.Address = fmt.Sprintf("%s:%d", s.Address.IP.String(), c.config.HTTPPort) | ||
if static { | ||
// This is to fix the fact that s.Address always resolves to an IP, if | ||
|
@@ -220,3 +273,34 @@ func (c *client) Internal() *api.Client { | |
|
||
return c.client | ||
} | ||
|
||
func login(ctx context.Context, client *api.Client, cfg *api.Config, config ClientConfig) (*api.Client, string, error) { | ||
authenticator := NewAuthenticator( | ||
config.Logger.Named("authenticator"), | ||
client, | ||
config.Credentials.Login.AuthMethod, | ||
config.Credentials.Login.Namespace, | ||
) | ||
|
||
token, err := authenticator.Authenticate(ctx, config.Name, config.Credentials.Login.BearerToken) | ||
if err != nil { | ||
return nil, "", fmt.Errorf("error logging in to consul: %w", err) | ||
} | ||
|
||
// Now update the client so that it will read the ACL token we just fetched. | ||
cfg.Token = token | ||
newClient, err := api.NewClient(cfg) | ||
if err != nil { | ||
return nil, "", fmt.Errorf("error updating client connection with token: %w", err) | ||
} | ||
return newClient, token, nil | ||
} | ||
|
||
func logout(client *api.Client, token string, config ClientConfig) error { | ||
config.Logger.Info("deleting acl token") | ||
_, err := client.ACL().Logout(&api.WriteOptions{Token: token}) | ||
if err != nil { | ||
return fmt.Errorf("error deleting acl token: %w", err) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add another test where this is
false
?