Skip to content

Commit

Permalink
confighttp: add max_concurrent_connections to HTTPServerSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
timannguyen committed Oct 12, 2023
1 parent d7b49df commit 6a87a54
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 4 deletions.
25 changes: 25 additions & 0 deletions .chloggen/confighttp-max-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add max_concurrent_connections param to confighttp HTTPServerSettings and metrics of the newly added param

# One or more tracking issues or pull requests related to the change
issues: [https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27066]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 3 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ will not be enabled.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)
- `max_concurrent_connections`: Sets value for maximum number of allowed http connections. defaults to 0.
- [`telemetry`]: enables server metrics. currently, only supports opencensus and have metrics for `max_concurrent_connections`

You can enable [`attribute processor`][attribute-processor] to append any http header to span's attribute using custom key. You also need to enable the "include_metadata"

Expand All @@ -92,6 +94,7 @@ receivers:
- Example-Header
max_age: 7200
endpoint: 0.0.0.0:55690
max_concurrent_connections: 10000
processors:
attributes:
actions:
Expand Down
38 changes: 34 additions & 4 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"context"
"crypto/tls"
"errors"
"io"
Expand All @@ -12,17 +13,18 @@ import (
"time"

"github.com/rs/cors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/net/http2"

"go.opencensus.io/stats"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/extension/auth"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/net/http2"
"golang.org/x/net/netutil"
)

const headerContentEncoding = "Content-Encoding"
Expand Down Expand Up @@ -226,6 +228,8 @@ type HTTPServerSettings struct {

// MaxRequestBodySize sets the maximum request body size in bytes
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"`
// MaxConcurrentConnections is used to set a limit to the maximum HTTP connections the server can keep open.
MaxConcurrentConnections int `mapstructure:"max_concurrent_connections"`

// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
// Experimental: *NOTE* this option is subject to change or removal in the future.
Expand All @@ -234,6 +238,9 @@ type HTTPServerSettings struct {
// Additional headers attached to each HTTP response sent to the client.
// Header values are opaque since they may be sensitive.
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`

// Telemetry is to enable and configure observability.
Telemetry HttpServerTelemetryConfig `mapstructure:"telemetry"`
}

// ToListener creates a net.Listener.
Expand All @@ -252,6 +259,10 @@ func (hss *HTTPServerSettings) ToListener() (net.Listener, error) {
tlsCfg.NextProtos = []string{http2.NextProtoTLS, "http/1.1"}
listener = tls.NewListener(listener, tlsCfg)
}

if hss.MaxConcurrentConnections > 0 {
listener = netutil.LimitListener(listener, hss.MaxConcurrentConnections)
}
return listener, nil
}

Expand Down Expand Up @@ -324,6 +335,10 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.
handler = responseHeadersHandler(handler, hss.ResponseHeaders)
}

if hss.Telemetry.Enabled {
handler = maxConnectionsObservabilityInterceptor(handler, hss.MaxConcurrentConnections, hss.Telemetry)
}

Check warning on line 340 in config/confighttp/confighttp.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/confighttp.go#L339-L340

Added lines #L339 - L340 were not covered by tests

// Enable OpenTelemetry observability plugin.
// TODO: Consider to use component ID string as prefix for all the operations.
handler = otelhttp.NewHandler(
Expand Down Expand Up @@ -400,3 +415,18 @@ func maxRequestBodySizeInterceptor(next http.Handler, maxRecvSize int64) http.Ha
next.ServeHTTP(w, r)
})
}

func maxConnectionsObservabilityInterceptor(next http.Handler, maxConn int, c HttpServerTelemetryConfig) http.Handler {
t := newHttpServerTelemetryInstruments(c)
if t == nil {
return next
}
_ = stats.RecordWithTags(context.Background(), t.tags, t.maxConn.M(int64(maxConn)))
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = stats.RecordWithTags(context.Background(), t.tags, t.conn.M(1))
defer func() {
_ = stats.RecordWithTags(context.Background(), t.tags, t.conn.M(-1))
}()
next.ServeHTTP(w, r)

Check warning on line 430 in config/confighttp/confighttp.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/confighttp.go#L419-L430

Added lines #L419 - L430 were not covered by tests
})
}
57 changes: 57 additions & 0 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,63 @@ func TestHttpServerHeaders(t *testing.T) {
}
}

func TestHttpServerMaxConnSettings(t *testing.T) {
t.Run("has max connection", func(t *testing.T) {
waitTime := 100 * time.Millisecond
hss := &HTTPServerSettings{
Endpoint: "localhost:0",
MaxConcurrentConnections: 1,
}

ln, err := hss.ToListener()
require.NoError(t, err)

s, err := hss.ToServer(
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(waitTime)
w.WriteHeader(http.StatusOK)
}))
require.NoError(t, err)

go func() {
_ = s.Serve(ln)
}()

// TODO: make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s", ln.Addr().String())

// Verify allowed domain gets responses that allow CORS.
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err, "Error creating request: %v", err)
startTime := time.Now()
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err, "Error sending request to http server: %v", err)
resp2, err := http.DefaultClient.Do(req)
require.NoError(t, err, "Error sending request to http server: %v", err)

err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}
err = resp2.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}
endtime := time.Now()
timeElapsed := endtime.UnixMilli() - startTime.UnixMilli()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, http.StatusOK, resp2.StatusCode)
assert.True(t, timeElapsed > (2*waitTime.Milliseconds()))

require.NoError(t, s.Close())
})
}

func verifyCorsResp(t *testing.T, url string, origin string, set *CORSSettings, extraHeader bool, wantStatus int, wantAllowed bool) {
req, err := http.NewRequest(http.MethodOptions, url, nil)
require.NoError(t, err, "Error creating trace OPTIONS request: %v", err)
Expand Down
1 change: 1 addition & 0 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/klauspost/compress v1.17.0
github.com/rs/cors v1.10.1
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.87.0
go.opentelemetry.io/collector/component v0.87.0
go.opentelemetry.io/collector/config/configauth v0.87.0
Expand Down
Loading

0 comments on commit 6a87a54

Please sign in to comment.