Skip to content

Commit

Permalink
feat: use same transport for heartbeat (#8)
Browse files Browse the repository at this point in the history
* If insecure_skip_verify: use the same transport for Heartbeat as for a regular queries.
  • Loading branch information
orian authored Jan 17, 2025
1 parent 2133208 commit 72f1b29
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
21 changes: 19 additions & 2 deletions internal/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type HeartBeat interface {
type heartBeatOpts struct {
defaultUser string
defaultPassword string
client *http.Client
}

type Option interface {
Expand All @@ -43,20 +44,35 @@ func WithDefaultUser(user, password string) Option {
}
}

type customHttpClient struct {
client *http.Client
}

func (c customHttpClient) apply(opts *heartBeatOpts) {
opts.client = c.client
}

func WithHttpClient(client *http.Client) Option {
return customHttpClient{client}
}

type heartBeat struct {
interval time.Duration
timeout time.Duration
request string
response string
user string
password string
client *http.Client
}

// User credentials are not needed
const defaultEndpoint string = "/ping"

func NewHeartbeat(c config.HeartBeat, options ...Option) HeartBeat {
opts := &heartBeatOpts{}
opts := &heartBeatOpts{
client: http.DefaultClient,
}
for _, o := range options {
o.apply(opts)
}
Expand All @@ -66,6 +82,7 @@ func NewHeartbeat(c config.HeartBeat, options ...Option) HeartBeat {
timeout: time.Duration(c.Timeout),
request: c.Request,
response: c.Response,
client: opts.client,
}

if c.Request != defaultEndpoint {
Expand Down Expand Up @@ -98,7 +115,7 @@ func (hb *heartBeat) IsHealthy(ctx context.Context, addr string) error {
req = req.WithContext(ctx)

startTime := time.Now()
resp, err := http.DefaultClient.Do(req)
resp, err := hb.client.Do(req)
if err != nil {
return fmt.Errorf("cannot send request in %s: %w", time.Since(startTime), err)
}
Expand Down
15 changes: 13 additions & 2 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type reverseProxy struct {
maxIdleConns int
maxIdleConnsPerHost int
maxErrorReasonSize int64
transport *http.Transport
}

func newReverseProxy(cfgCp *config.HTTPClientConfig) *reverseProxy {
Expand Down Expand Up @@ -87,6 +88,7 @@ func newReverseProxy(cfgCp *config.HTTPClientConfig) *reverseProxy {
reloadWG: sync.WaitGroup{},
maxIdleConns: cfgCp.ConnectionPool.MaxIdleConnsPerHost,
maxIdleConnsPerHost: cfgCp.ConnectionPool.MaxIdleConnsPerHost,
transport: transport,
}
}

Expand Down Expand Up @@ -627,15 +629,24 @@ func calcQueryParamsHash(origParams url.Values) uint32 {
// applyConfig applies the given cfg to reverseProxy.
//
// New config is applied only if non-nil error returned.
// Otherwise old config version is kept.
// Otherwise, old config version is kept.
//
//nolint:cyclop // TODO consider complexity
func (rp *reverseProxy) applyConfig(cfg *config.Config) error {
// configLock protects from concurrent calls to applyConfig
// by serializing such calls.
// configLock shouldn't be used in other places.
rp.configLock.Lock()
defer rp.configLock.Unlock()

clusters, err := newClusters(cfg.Clusters)
client := http.DefaultClient
if cfg.HTTPClient.InsecureSkipVerify {
client = &http.Client{}
*client = *http.DefaultClient
client.Transport = rp.transport
}

clusters, err := newClusters(cfg.Clusters, client)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ type cluster struct {
retryNumber int
}

func newCluster(c config.Cluster) (*cluster, error) {
func newCluster(c config.Cluster, client *http.Client) (*cluster, error) {
clusterUsers := make(map[string]*clusterUser, len(c.ClusterUsers))
for _, cu := range c.ClusterUsers {
if _, ok := clusterUsers[cu.Name]; ok {
Expand All @@ -791,7 +791,8 @@ func newCluster(c config.Cluster) (*cluster, error) {
clusterUsers[cu.Name] = newClusterUser(cu)
}

heartBeat := heartbeat.NewHeartbeat(c.HeartBeat, heartbeat.WithDefaultUser(c.ClusterUsers[0].Name, c.ClusterUsers[0].Password))
heartBeat := heartbeat.NewHeartbeat(c.HeartBeat, heartbeat.WithDefaultUser(c.ClusterUsers[0].Name,
c.ClusterUsers[0].Password), heartbeat.WithHttpClient(client))

newC := &cluster{
name: c.Name,
Expand All @@ -811,13 +812,13 @@ func newCluster(c config.Cluster) (*cluster, error) {
return newC, nil
}

func newClusters(cfg []config.Cluster) (map[string]*cluster, error) {
func newClusters(cfg []config.Cluster, client *http.Client) (map[string]*cluster, error) {
clusters := make(map[string]*cluster, len(cfg))
for _, c := range cfg {
if _, ok := clusters[c.Name]; ok {
return nil, fmt.Errorf("duplicate config for cluster %q", c.Name)
}
tmpC, err := newCluster(c)
tmpC, err := newCluster(c, client)
if err != nil {
return nil, fmt.Errorf("cannot initialize cluster %q: %w", c.Name, err)
}
Expand Down

0 comments on commit 72f1b29

Please sign in to comment.