Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] Support configuring MaxConcurrentStreams for http2 #14219

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pkg/flags/uint32.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"strconv"
)

type uint32Value uint32

// NewUint32Value creates an uint32 instance with the provided value.
func NewUint32Value(v uint32) *uint32Value {
val := new(uint32Value)
*val = uint32Value(v)
return val
}

// Set parses a command line uint32 value.
// Implements "flag.Value" interface.
func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}

func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}
111 changes: 111 additions & 0 deletions pkg/flags/uint32_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"testing"

"github.com/stretchr/testify/assert"
)

func TestUint32Value(t *testing.T) {
cases := []struct {
name string
s string
expectedVal uint32
expectError bool
}{
{
name: "normal uint32 value",
s: "200",
expectedVal: 200,
},
{
name: "zero value",
s: "0",
expectedVal: 0,
},
{
name: "negative int value",
s: "-200",
expectError: true,
},
{
name: "invalid integer value",
s: "invalid",
expectError: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var val uint32Value
err := val.Set(tc.s)

if tc.expectError {
if err == nil {
t.Errorf("Expected failure on parsing uint32 value from %s", tc.s)
}
} else {
if err != nil {
t.Errorf("Unexpected error when parsing %s: %v", tc.s, err)
}
assert.Equal(t, uint32(val), tc.expectedVal)
}
})
}
}

func TestUint32FromFlag(t *testing.T) {
const flagName = "max-concurrent-streams"

cases := []struct {
name string
defaultVal uint32
arguments []string
expectedVal uint32
}{
{
name: "only default value",
defaultVal: 15,
arguments: []string{},
expectedVal: 15,
},
{
name: "argument has different value from the default one",
defaultVal: 16,
arguments: []string{"--max-concurrent-streams", "200"},
expectedVal: 200,
},
{
name: "argument has the same value from the default one",
defaultVal: 105,
arguments: []string{"--max-concurrent-streams", "105"},
expectedVal: 105,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.")
if err := fs.Parse(tc.arguments); err != nil {
t.Fatalf("Unexpected error: %v\n", err)
}
actualMaxStream := Uint32FromFlag(fs, flagName)
assert.Equal(t, actualMaxStream, tc.expectedVal)
})
}
}
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32

WarningApplyDuration time.Duration

StrictReconfigCheck bool
Expand Down
9 changes: 8 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -55,6 +56,7 @@ const (
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -198,6 +200,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -305,7 +311,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`

//The AuthTokenTTL in seconds of the simple token
// AuthTokenTTL specifies the TTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
Expand Down Expand Up @@ -448,6 +454,7 @@ func NewConfig() *Config {

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
Expand Down
6 changes: 5 additions & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
Expand Down Expand Up @@ -336,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
zap.String("initial-cluster-state", ec.ClusterState),
zap.String("initial-cluster-token", sc.InitialClusterToken),
zap.Int64("quota-size-bytes", quota),
zap.Int64("quota-backend-bytes", quota),
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),

zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
Expand Down
17 changes: 17 additions & 0 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/pkg/v3/debugutil"
"go.etcd.io/etcd/pkg/v3/httputil"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/tmc/grpc-websocket-proxy/wsproxy"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -133,6 +135,10 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

Expand Down Expand Up @@ -182,6 +188,10 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -195,6 +205,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}

func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
// todo (ahrtr): should we support configuring other parameters in the future as well?
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
})
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")

fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.")

// raft connection timeouts
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")
Expand Down Expand Up @@ -396,6 +398,8 @@ func (cfg *config) configFromCmdLine() error {

cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")

cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")

cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")

cfg.ec.ClusterState = cfg.cf.clusterState.String()
Expand Down
12 changes: 12 additions & 0 deletions server/etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -95,6 +96,8 @@ var (
grpcKeepAliveMinTime time.Duration
grpcKeepAliveTimeout time.Duration
grpcKeepAliveInterval time.Duration

maxConcurrentStreams uint32
)

const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
Expand Down Expand Up @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command {

cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")

cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")

return &cmd
}

Expand Down Expand Up @@ -212,6 +217,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)

srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)

if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
lg.Fatal("Failed to configure the http server", zap.Error(err))
}

errc := make(chan error, 3)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams 'math.MaxUint32'
Maximum concurrent streams that each client can open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Expand Down
3 changes: 1 addition & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

grpcServer := grpc.NewServer(append(opts, gopts...)...)

Expand Down
6 changes: 6 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ type etcdProcessClusterConfig struct {

rollingStart bool
logLevel string

MaxConcurrentStreams uint32 // default is math.MaxUint32
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -320,6 +322,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--log-level", cfg.logLevel)
}

if cfg.MaxConcurrentStreams != 0 {
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}

etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
execPath: cfg.execPath,
Expand Down
Loading