Skip to content

Commit

Permalink
Add /api/sampling endpoint in collector
Browse files Browse the repository at this point in the history
Return metricsFactory from test server init

fix import structure

fix unwanted change in agent server

Add license on new file

Signed-off-by: RickyRajinder <singh.sangh@gmail.com>

Add tests for errors

Refactor collector API handler to extend agent handler

Remove unused fields

Unexport fields in HttpHandler

Update callers of NewAPIHandler

Run go fmt

Fix lint issues
  • Loading branch information
RickyRajinder committed Dec 21, 2019
1 parent 3fa8a08 commit 12bf753
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 113 deletions.
35 changes: 19 additions & 16 deletions cmd/agent/app/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,32 @@ var (

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies and baggage restrictions.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
handler := newHTTPHandler(manager, mFactory)
func NewHTTPServer(hostPort string, configManager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
handler := NewHTTPHandler(configManager, mFactory)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
handler.serveSamplingHTTP(w, r, true /* thriftEnums092 */)
handler.ServeSamplingHTTP(w, r, true /* thriftEnums092 */)
})
mux.HandleFunc("/sampling", func(w http.ResponseWriter, r *http.Request) {
handler.serveSamplingHTTP(w, r, false /* thriftEnums092 */)
handler.ServeSamplingHTTP(w, r, false /* thriftEnums092 */)
})
mux.HandleFunc("/baggageRestrictions", func(w http.ResponseWriter, r *http.Request) {
handler.serveBaggageHTTP(w, r)
})
return &http.Server{Addr: hostPort, Handler: mux}
}

func newHTTPHandler(manager configmanager.ClientConfigManager, mFactory metrics.Factory) *httpHandler {
handler := &httpHandler{manager: manager}
//NewHTTPHandler creates a new HTTP endpoint handler
func NewHTTPHandler(configManager configmanager.ClientConfigManager, mFactory metrics.Factory) *HTTPHandler {
handler := &HTTPHandler{configManager: configManager}
metrics.Init(&handler.metrics, mFactory, nil)
return handler
}

type httpHandler struct {
manager configmanager.ClientConfigManager
metrics struct {
//HTTPHandler is composed of a ClientConfigManager and metrics counters
type HTTPHandler struct {
configManager configmanager.ClientConfigManager
metrics struct {
// Number of good sampling requests
SamplingRequestSuccess metrics.Counter `metric:"http-server.requests" tags:"type=sampling"`

Expand All @@ -83,7 +85,7 @@ type httpHandler struct {
}
}

func (h *httpHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) {
func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) {
services := r.URL.Query()["service"]
if len(services) != 1 {
h.metrics.BadRequest.Inc(1)
Expand All @@ -93,7 +95,7 @@ func (h *httpHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request)
return services[0], nil
}

func (h *httpHandler) writeJSON(w http.ResponseWriter, json []byte) error {
func (h *HTTPHandler) writeJSON(w http.ResponseWriter, json []byte) error {
w.Header().Add("Content-Type", mimeTypeApplicationJSON)
if _, err := w.Write(json); err != nil {
h.metrics.WriteFailures.Inc(1)
Expand All @@ -102,12 +104,13 @@ func (h *httpHandler) writeJSON(w http.ResponseWriter, json []byte) error {
return nil
}

func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request, thriftEnums092 bool) {
//ServeSamplingHTTP is used to handle the "/" and "/sampling" endpoints
func (h *HTTPHandler) ServeSamplingHTTP(w http.ResponseWriter, r *http.Request, thriftEnums092 bool) {
service, err := h.serviceFromRequest(w, r)
if err != nil {
return
}
resp, err := h.manager.GetSamplingStrategy(service)
resp, err := h.configManager.GetSamplingStrategy(service)
if err != nil {
h.metrics.CollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
Expand All @@ -132,12 +135,12 @@ func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
}
}

func (h *httpHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
func (h *HTTPHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
service, err := h.serviceFromRequest(w, r)
if err != nil {
return
}
resp, err := h.manager.GetBaggageRestrictions(service)
resp, err := h.configManager.GetBaggageRestrictions(service)
if err != nil {
h.metrics.CollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
Expand Down Expand Up @@ -166,7 +169,7 @@ var samplingStrategyTypes = []tSampling.SamplingStrategyType{
//
// Thrift 0.9.3 classes generate this JSON:
// {"strategyType":"PROBABILISTIC","probabilisticSampling":{"samplingRate":0.5}}
func (h *httpHandler) encodeThriftEnums092(json []byte) []byte {
func (h *HTTPHandler) encodeThriftEnums092(json []byte) []byte {
str := string(json)
for _, strategyType := range samplingStrategyTypes {
str = strings.Replace(
Expand Down
94 changes: 17 additions & 77 deletions cmd/agent/app/httpserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,11 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type testServer struct {
metricsFactory *metricstest.Factory
mgr *mockManager
server *httptest.Server
}

func withServer(
mockSamplingResponse *sampling.SamplingStrategyResponse,
mockBaggageResponse []*baggage.BaggageRestriction,
runTest func(server *testServer),
) {
metricsFactory := metricstest.NewFactory(0)
mgr := &mockManager{
samplingResponse: mockSamplingResponse,
baggageResponse: mockBaggageResponse,
}
realServer := NewHTTPServer(":1", mgr, metricsFactory)
server := httptest.NewServer(realServer.Handler)
defer server.Close()
runTest(&testServer{
metricsFactory: metricsFactory,
mgr: mgr,
server: server,
})
}

func TestHTTPHandler(t *testing.T) {
withServer(probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) {
WithServer(Probabilistic(0.001), Restrictions("luggage", 10), func(ts *TestServer) {
for _, endpoint := range []string{"/", "/sampling"} {
t.Run("request against endpoint "+endpoint, func(t *testing.T) {
resp, err := http.Get(ts.server.URL + endpoint + "?service=Y")
resp, err := http.Get(ts.Server.URL + endpoint + "?service=Y")
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
Expand All @@ -73,33 +47,33 @@ func TestHTTPHandler(t *testing.T) {
objResp := &tSampling092.SamplingStrategyResponse{}
require.NoError(t, json.Unmarshal(body, objResp))
assert.EqualValues(t,
ts.mgr.samplingResponse.GetStrategyType(),
ts.mgr.SamplingResponse.GetStrategyType(),
objResp.GetStrategyType())
assert.Equal(t,
ts.mgr.samplingResponse.GetProbabilisticSampling().GetSamplingRate(),
ts.mgr.SamplingResponse.GetProbabilisticSampling().GetSamplingRate(),
objResp.GetProbabilisticSampling().GetSamplingRate())
} else {
objResp := &sampling.SamplingStrategyResponse{}
require.NoError(t, json.Unmarshal(body, objResp))
assert.EqualValues(t, ts.mgr.samplingResponse, objResp)
assert.EqualValues(t, ts.mgr.SamplingResponse, objResp)
}
})
}

t.Run("request against endpoint /baggage", func(t *testing.T) {
resp, err := http.Get(ts.server.URL + "/baggageRestrictions?service=Y")
resp, err := http.Get(ts.Server.URL + "/baggageRestrictions?service=Y")
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
var objResp []*baggage.BaggageRestriction
require.NoError(t, json.Unmarshal(body, &objResp))
assert.EqualValues(t, ts.mgr.baggageResponse, objResp)
assert.EqualValues(t, ts.mgr.BaggageResponse, objResp)
})

// handler must emit metrics
ts.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{
ts.MetricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{
{Name: "http-server.requests", Tags: map[string]string{"type": "sampling"}, Value: 1},
{Name: "http-server.requests", Tags: map[string]string{"type": "sampling-legacy"}, Value: 1},
{Name: "http-server.requests", Tags: map[string]string{"type": "baggage"}, Value: 1},
Expand Down Expand Up @@ -164,7 +138,7 @@ func TestHTTPHandlerErrors(t *testing.T) {
},
{
description: "sampler marshalling error",
mockSamplingResponse: probabilistic(math.NaN()),
mockSamplingResponse: Probabilistic(math.NaN()),
url: "?service=Y",
statusCode: http.StatusInternalServerError,
body: "Cannot marshall Thrift to JSON\n",
Expand All @@ -176,8 +150,8 @@ func TestHTTPHandlerErrors(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.description, func(t *testing.T) {
withServer(testCase.mockSamplingResponse, testCase.mockBaggageResponse, func(ts *testServer) {
resp, err := http.Get(ts.server.URL + testCase.url)
WithServer(testCase.mockSamplingResponse, testCase.mockBaggageResponse, func(ts *TestServer) {
resp, err := http.Get(ts.Server.URL + testCase.url)
require.NoError(t, err)
assert.Equal(t, testCase.statusCode, resp.StatusCode)
if testCase.body != "" {
Expand All @@ -187,47 +161,32 @@ func TestHTTPHandlerErrors(t *testing.T) {
}

if len(testCase.metrics) > 0 {
ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...)
ts.MetricsFactory.AssertCounterMetrics(t, testCase.metrics...)
}
})
})
}

t.Run("failure to write a response", func(t *testing.T) {
withServer(probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) {
handler := newHTTPHandler(ts.mgr, ts.metricsFactory)
WithServer(Probabilistic(0.001), Restrictions("luggage", 10), func(ts *TestServer) {
handler := NewHTTPHandler(ts.mgr, ts.MetricsFactory)

req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil)
w := &mockWriter{header: make(http.Header)}
handler.serveSamplingHTTP(w, req, false)
handler.ServeSamplingHTTP(w, req, false)

ts.metricsFactory.AssertCounterMetrics(t,
ts.MetricsFactory.AssertCounterMetrics(t,
metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1})

req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil)
handler.serveBaggageHTTP(w, req)

ts.metricsFactory.AssertCounterMetrics(t,
ts.MetricsFactory.AssertCounterMetrics(t,
metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2})
})
})
}

func probabilistic(probability float64) *sampling.SamplingStrategyResponse {
return &sampling.SamplingStrategyResponse{
StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: probability,
},
}
}

func restrictions(key string, size int32) []*baggage.BaggageRestriction {
return []*baggage.BaggageRestriction{
{BaggageKey: key, MaxValueLength: size},
}
}

type mockWriter struct {
header http.Header
}
Expand All @@ -241,22 +200,3 @@ func (w *mockWriter) Write([]byte) (int, error) {
}

func (w *mockWriter) WriteHeader(int) {}

type mockManager struct {
samplingResponse *sampling.SamplingStrategyResponse
baggageResponse []*baggage.BaggageRestriction
}

func (m *mockManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if m.samplingResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.samplingResponse, nil
}

func (m *mockManager) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
if m.baggageResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.baggageResponse, nil
}
92 changes: 92 additions & 0 deletions cmd/agent/app/httpserver/testserver_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"errors"
"net/http/httptest"

"github.com/uber/jaeger-lib/metrics/metricstest"

"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

//TestServer sets up a test HTTP server, with a mock config manager and exports a metrics factory
type TestServer struct {
MetricsFactory *metricstest.Factory
mgr *MockManager
Server *httptest.Server
}

//WithServer initializes the test server
func WithServer(
mockSamplingResponse *sampling.SamplingStrategyResponse,
mockBaggageResponse []*baggage.BaggageRestriction,
runTest func(server *TestServer),
) {
metricsFactory := metricstest.NewFactory(0)
mgr := &MockManager{
SamplingResponse: mockSamplingResponse,
BaggageResponse: mockBaggageResponse,
}
realServer := NewHTTPServer(":1", mgr, metricsFactory)
server := httptest.NewServer(realServer.Handler)
defer server.Close()
runTest(&TestServer{
MetricsFactory: metricsFactory,
mgr: mgr,
Server: server,
})
}

//Probabilistic returns a SamplingStrategyResponse with the given probability
func Probabilistic(probability float64) *sampling.SamplingStrategyResponse {
return &sampling.SamplingStrategyResponse{
StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: probability,
},
}
}

//Restrictions returns BaggageRestrictions
func Restrictions(key string, size int32) []*baggage.BaggageRestriction {
return []*baggage.BaggageRestriction{
{BaggageKey: key, MaxValueLength: size},
}
}

//MockManager is a mock ClientConfigManager
type MockManager struct {
SamplingResponse *sampling.SamplingStrategyResponse
BaggageResponse []*baggage.BaggageRestriction
}

//GetSamplingStrategy returns the sampling strategy pf the mock
func (m *MockManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if m.SamplingResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.SamplingResponse, nil
}

//GetBaggageRestrictions returns the baggage restrictions of the mock
func (m *MockManager) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
if m.BaggageResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.BaggageResponse, nil
}
Loading

0 comments on commit 12bf753

Please sign in to comment.