Skip to content

Commit

Permalink
Fix data race in phpfpm initializing http client (influxdata#7738)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Jun 30, 2020
1 parent 8593428 commit 669456d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
55 changes: 28 additions & 27 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,39 @@ var sampleConfig = `
# insecure_skip_verify = false
`

func (r *phpfpm) SampleConfig() string {
func (p *phpfpm) SampleConfig() string {
return sampleConfig
}

func (r *phpfpm) Description() string {
func (p *phpfpm) Description() string {
return "Read metrics of phpfpm, via HTTP status page or socket"
}

func (p *phpfpm) Init() error {
tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return err
}

p.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: p.Timeout.Duration,
}
return nil
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (g *phpfpm) Gather(acc telegraf.Accumulator) error {
if len(g.Urls) == 0 {
return g.gatherServer("http://127.0.0.1/status", acc)
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
if len(p.Urls) == 0 {
return p.gatherServer("http://127.0.0.1/status", acc)
}

var wg sync.WaitGroup

urls, err := expandUrls(g.Urls)
urls, err := expandUrls(p.Urls)
if err != nil {
return err
}
Expand All @@ -105,7 +120,7 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {
wg.Add(1)
go func(serv string) {
defer wg.Done()
acc.AddError(g.gatherServer(serv, acc))
acc.AddError(p.gatherServer(serv, acc))
}(serv)
}

Expand All @@ -115,23 +130,9 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {
}

// Request status page to get stat raw data and import it
func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
if g.client == nil {
tlsCfg, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
tr := &http.Transport{
TLSClientConfig: tlsCfg,
}
g.client = &http.Client{
Transport: tr,
Timeout: g.Timeout.Duration,
}
}

func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
return g.gatherHttp(addr, acc)
return p.gatherHttp(addr, acc)
}

var (
Expand Down Expand Up @@ -170,11 +171,11 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
return err
}

return g.gatherFcgi(fcgi, statusPath, acc, addr)
return p.gatherFcgi(fcgi, statusPath, acc, addr)
}

// Gather stat using fcgi protocol
func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error {
func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error {
fpmOutput, fpmErr, err := fcgi.Request(map[string]string{
"SCRIPT_NAME": "/" + statusPath,
"SCRIPT_FILENAME": statusPath,
Expand All @@ -194,15 +195,15 @@ func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumula
}

// Gather stat using http protocol
func (g *phpfpm) gatherHttp(addr string, acc telegraf.Accumulator) error {
func (p *phpfpm) gatherHttp(addr string, acc telegraf.Accumulator) error {
u, err := url.Parse(addr)
if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
}

req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme,
u.Host, u.Path), nil)
res, err := g.client.Do(req)
res, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v",
addr, err)
Expand Down
32 changes: 28 additions & 4 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
Urls: []string{ts.URL},
}

err := r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
err = acc.GatherError(r.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand Down Expand Up @@ -76,6 +79,9 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
}

err = r.Init()
require.NoError(t, err)

var acc testutil.Accumulator
err = acc.GatherError(r.Gather)
require.NoError(t, err)
Expand Down Expand Up @@ -121,6 +127,9 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
Urls: []string{tcp.Addr().String()},
}

err = r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err = acc.GatherError(r.Gather)
Expand Down Expand Up @@ -177,6 +186,9 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
}

err = r.Init()
require.NoError(t, err)

var acc1, acc2 testutil.Accumulator

err = acc1.GatherError(r.Gather)
Expand Down Expand Up @@ -232,6 +244,9 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
}

err = r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err = acc.GatherError(r.Gather)
Expand Down Expand Up @@ -264,9 +279,12 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
r := &phpfpm{}

err := r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
err = acc.GatherError(r.Gather)
require.Error(t, err)
assert.Contains(t, err.Error(), "127.0.0.1/status")
}
Expand All @@ -276,9 +294,12 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t
Urls: []string{"http://aninvalidone"},
}

err := r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
err = acc.GatherError(r.Gather)
require.Error(t, err)
assert.Contains(t, err.Error(), `Unable to connect to phpfpm status page 'http://aninvalidone'`)
assert.Contains(t, err.Error(), `lookup aninvalidone`)
Expand All @@ -289,9 +310,12 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testi
Urls: []string{"/tmp/invalid.sock"},
}

err := r.Init()
require.NoError(t, err)

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
err = acc.GatherError(r.Gather)
require.Error(t, err)
assert.Equal(t, `dial unix /tmp/invalid.sock: connect: no such file or directory`, err.Error())

Expand Down

0 comments on commit 669456d

Please sign in to comment.