Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[confighttp] add max_concurrent_connections to HTTPServerSettings #8633

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/confighttp-max-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add max_concurrent_connections param to confighttp HTTPServerSettings and metrics of the newly added param

# One or more tracking issues or pull requests related to the change
issues: [https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27066]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 3 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ will not be enabled.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)
- `max_concurrent_connections`: Sets value for maximum number of allowed http connections. defaults to 0.
- [`telemetry`]: enables server metrics. currently, only supports opencensus and have metrics for `max_concurrent_connections`

You can enable [`attribute processor`][attribute-processor] to append any http header to span's attribute using custom key. You also need to enable the "include_metadata"

Expand All @@ -92,6 +94,7 @@ receivers:
- Example-Header
max_age: 7200
endpoint: 0.0.0.0:55690
max_concurrent_connections: 10000
processors:
attributes:
actions:
Expand Down
38 changes: 34 additions & 4 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"context"
"crypto/tls"
"errors"
"io"
Expand All @@ -12,17 +13,18 @@
"time"

"github.com/rs/cors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/net/http2"

"go.opencensus.io/stats"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/extension/auth"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/net/http2"
"golang.org/x/net/netutil"
)

const headerContentEncoding = "Content-Encoding"
Expand Down Expand Up @@ -226,6 +228,8 @@

// MaxRequestBodySize sets the maximum request body size in bytes
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"`
// MaxConcurrentConnections is used to set a limit to the maximum HTTP connections the server can keep open.
MaxConcurrentConnections int `mapstructure:"max_concurrent_connections"`

// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
// Experimental: *NOTE* this option is subject to change or removal in the future.
Expand All @@ -234,6 +238,9 @@
// Additional headers attached to each HTTP response sent to the client.
// Header values are opaque since they may be sensitive.
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`

// Telemetry is to enable and configure observability.
Telemetry HttpServerTelemetryConfig `mapstructure:"telemetry"`
}

// ToListener creates a net.Listener.
Expand All @@ -252,6 +259,10 @@
tlsCfg.NextProtos = []string{http2.NextProtoTLS, "http/1.1"}
listener = tls.NewListener(listener, tlsCfg)
}

if hss.MaxConcurrentConnections > 0 {
listener = netutil.LimitListener(listener, hss.MaxConcurrentConnections)
}
return listener, nil
}

Expand Down Expand Up @@ -324,6 +335,10 @@
handler = responseHeadersHandler(handler, hss.ResponseHeaders)
}

if hss.Telemetry.Enabled {
handler = maxConnectionsObservabilityInterceptor(handler, hss.MaxConcurrentConnections, hss.Telemetry)
}

Check warning on line 340 in config/confighttp/confighttp.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/confighttp.go#L339-L340

Added lines #L339 - L340 were not covered by tests

// Enable OpenTelemetry observability plugin.
// TODO: Consider to use component ID string as prefix for all the operations.
handler = otelhttp.NewHandler(
Expand Down Expand Up @@ -400,3 +415,18 @@
next.ServeHTTP(w, r)
})
}

func maxConnectionsObservabilityInterceptor(next http.Handler, maxConn int, c HttpServerTelemetryConfig) http.Handler {
t := newHttpServerTelemetryInstruments(c)
if t == nil {
return next
}
_ = stats.RecordWithTags(context.Background(), t.tags, t.maxConn.M(int64(maxConn)))
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = stats.RecordWithTags(context.Background(), t.tags, t.conn.M(1))
defer func() {
_ = stats.RecordWithTags(context.Background(), t.tags, t.conn.M(-1))
}()
next.ServeHTTP(w, r)

Check warning on line 430 in config/confighttp/confighttp.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/confighttp.go#L419-L430

Added lines #L419 - L430 were not covered by tests
})
}
57 changes: 57 additions & 0 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,63 @@ func TestHttpServerHeaders(t *testing.T) {
}
}

func TestHttpServerMaxConnSettings(t *testing.T) {
t.Run("has max connection", func(t *testing.T) {
waitTime := 100 * time.Millisecond
hss := &HTTPServerSettings{
Endpoint: "localhost:0",
MaxConcurrentConnections: 1,
}

ln, err := hss.ToListener()
require.NoError(t, err)

s, err := hss.ToServer(
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(waitTime)
w.WriteHeader(http.StatusOK)
}))
require.NoError(t, err)

go func() {
_ = s.Serve(ln)
}()

// TODO: make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s", ln.Addr().String())

// Verify allowed domain gets responses that allow CORS.
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err, "Error creating request: %v", err)
startTime := time.Now()
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err, "Error sending request to http server: %v", err)
resp2, err := http.DefaultClient.Do(req)
require.NoError(t, err, "Error sending request to http server: %v", err)

err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}
err = resp2.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}
endtime := time.Now()
timeElapsed := endtime.UnixMilli() - startTime.UnixMilli()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, http.StatusOK, resp2.StatusCode)
assert.True(t, timeElapsed > (2*waitTime.Milliseconds()))

require.NoError(t, s.Close())
})
}

func verifyCorsResp(t *testing.T, url string, origin string, set *CORSSettings, extraHeader bool, wantStatus int, wantAllowed bool) {
req, err := http.NewRequest(http.MethodOptions, url, nil)
require.NoError(t, err, "Error creating trace OPTIONS request: %v", err)
Expand Down
1 change: 1 addition & 0 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/klauspost/compress v1.17.0
github.com/rs/cors v1.10.1
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.87.0
go.opentelemetry.io/collector/component v0.87.0
go.opentelemetry.io/collector/config/configauth v0.87.0
Expand Down
Loading
Loading