Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic-agent] proxy requests to subprocesses to their metrics endpoints #28165

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Remove the `--kibana-url` from `install` and `enroll` command. {pull}25529[25529]
- Default to port 80 and 443 for Kibana and Fleet Server connections. {pull}25723[25723]
- Remove deprecated/undocumented IncludeCreatorMetadata setting from kubernetes metadata config options {pull}28006[28006]
- The `/processes/<subprocess>` endpoint proxies to the subprocess's monitoring endpoint, instead of querying its `/stats` endpoint {pull}28165[28165]

==== Bugfixes
- Fix rename *ConfigChange to *PolicyChange to align on changes in the UI. {pull}20779[20779]
Expand Down
49 changes: 34 additions & 15 deletions x-pack/elastic-agent/pkg/core/monitoring/server/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
var (
// ErrProgramNotSupported returned when requesting metrics for not supported program.
ErrProgramNotSupported = errors.New("specified program is not supported")
errPathNotFound = errors.New("endpoint not found")
invalidChars = map[rune]struct{}{
'"': {},
'<': {},
Expand Down Expand Up @@ -67,8 +68,13 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
// proxy stats for elastic agent process
return statsHandler(w, r)
}
beatsPath := vars["beatsPath"]
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved

metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), id)
endpoint, err := generateEndpoint(id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out so I could inject my own endpoint into processMetrics, to make testing easier.

if err != nil {
return err
}
metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), endpoint, beatsPath)
if metricsErr != nil {
return metricsErr
}
Expand All @@ -82,23 +88,18 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
}
}

func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
detail, err := parseID(id)
if err != nil {
return nil, 0, err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}
var beatsPathAllowlist = map[string]struct{}{
"": struct{}{},
"stats": struct{}{},
"state": struct{}{},
}

if detail.isMonitoring {
endpoint += "_monitor"
func processMetrics(ctx context.Context, endpoint, path string) ([]byte, int, error) {
if _, ok := beatsPathAllowlist[path]; !ok {
return nil, http.StatusNotFound, errPathNotFound
}

hostData, err := parse.ParseURL(endpoint, "http", "", "", "stats", "")
hostData, err := parse.ParseURL(endpoint, "http", "", "", path, "")
if err != nil {
return nil, 0, errorWithStatus(http.StatusInternalServerError, err)
}
Expand Down Expand Up @@ -145,6 +146,24 @@ func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
return rb, resp.StatusCode, nil
}

func generateEndpoint(id string) (string, error) {
detail, err := parseID(id)
if err != nil {
return "", err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}

if detail.isMonitoring {
endpoint += "_monitor"
}
return endpoint, nil
}

func writeResponse(w http.ResponseWriter, c interface{}) {
bytes, err := json.Marshal(c)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux
// +build linux

package server

import (
"context"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestProcessProxyRequest(t *testing.T) {
sock := "/tmp/elastic-agent-test.sock"
defer os.Remove(sock)

endpoint := "http+unix://" + sock
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Write the path to the client so they can verify the request
// was correct
w.Write([]byte(r.URL.Path))
}))

// Mimic subprocesses and listen on a unix socket
l, err := net.Listen("unix", sock)
require.NoError(t, err)
server.Listener = l
server.Start()
defer server.Close()

for _, path := range []string{"stats", "", "state"} {
respBytes, _, err := processMetrics(context.Background(), endpoint, path)
require.NoError(t, err)
// Verify that the server saw the path we tried to request
assert.Equal(t, "/"+path, string(respBytes))
}
}
31 changes: 31 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/server/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
package server

import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
"gotest.tools/assert"
)

func TestParseID(t *testing.T) {
Expand Down Expand Up @@ -87,3 +90,31 @@ func TestStatusErr(t *testing.T) {
})
}
}

func TestBeatsPathAllowlist(t *testing.T) {
ctx := context.Background()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}))
defer server.Close()
endpoint := server.URL

tcs := []struct {
path string
statusCode int
}{
{"", http.StatusOK},
{"stats", http.StatusOK},
{"state", http.StatusOK},
{"not-allowed", http.StatusNotFound},
{"nested/path", http.StatusNotFound},
}

for _, tc := range tcs {
_, respCode, err := processMetrics(ctx, endpoint, tc.path)
assert.Equal(t, respCode, tc.statusCode)
if tc.statusCode > 299 {
assert.Error(t, err, errPathNotFound.Error())
}
}
}
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(st
if enableProcessStats {
r.HandleFunc("/processes", processesHandler(routesFetchFn))
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/{beatsPath}", createHandler(processHandler(statsHandler)))
}

mux := http.NewServeMux()
Expand Down