Skip to content

Commit

Permalink
feat(storagenode): add configurations for initial window size
Browse files Browse the repository at this point in the history
This PR adds new gRPC initial flow control window size options to the storage node.

- `--server-initial-conn-window-size`
- `--server-initial-stream-window-size`

See:
- https://httpwg.org/specs/rfc7540.html#InitialWindowSize
- https://pkg.go.dev/google.golang.org/grpc#InitialConnWindowSize
  • Loading branch information
ijsong committed May 19, 2023
1 parent b93c628 commit 82c1e97
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 5 deletions.
8 changes: 8 additions & 0 deletions bin/start_varlogsn.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ def start(args: argparse.Namespace) -> None:
if args.server_write_buffer_size:
cmd.append(
f"--server-write-buffer-size={args.server_write_buffer_size}")
if args.server_initial_conn_window_size:
cmd.append(
f"--server-initial-conn-window-size={args.server_initial_conn_window_size}")
if args.server_initial_stream_window_size:
cmd.append(
f"--server-initial-stream-window-size={args.server_initial_stream_window_size}")
if args.replication_client_read_buffer_size:
cmd.append(
f"--replication-client-read-buffer-size={args.replication_client_read_buffer_size}")
Expand Down Expand Up @@ -266,6 +272,8 @@ def main() -> None:
# grpc options
parser.add_argument("--server-read-buffer-size", type=str)
parser.add_argument("--server-write-buffer-size", type=str)
parser.add_argument("--server-initial-conn-window-size", type=str)
parser.add_argument("--server-initial-stream-window-size", type=str)
parser.add_argument("--replication-client-read-buffer-size", type=str)
parser.add_argument("--replication-client-write-buffer-size", type=str)

Expand Down
2 changes: 2 additions & 0 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func newStartCommand() *cli.Command {
flagServerMaxRecvMsgSize.StringFlag(false, units.ToByteSizeString(storagenode.DefaultServerMaxRecvSize)),
flagReplicationClientReadBufferSize.StringFlag(false, units.ToByteSizeString(storagenode.DefaultReplicateClientReadBufferSize)),
flagReplicationClientWriteBufferSize.StringFlag(false, units.ToByteSizeString(storagenode.DefaultReplicateClientWriteBufferSize)),
flagServerInitialConnWindowSize,
flagServerInitialStreamWindowSize,

// lse options
flagLogStreamExecutorSequenceQueueCapacity.IntFlag(false, logstream.DefaultSequenceQueueCapacity),
Expand Down
10 changes: 10 additions & 0 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ var (
Envs: []string{"SERVER_MAX_MSG_SIZE", "SERVER_MAX_MESSAGE_SIZE"},
Usage: "B, KiB, MiB, GiB",
}
flagServerInitialConnWindowSize = &cli.StringFlag{
Name: "server-initial-conn-window-size",
EnvVars: []string{"SERVER_INITIAL_CONN_WINDOW_SIZE"},
Usage: "Window size for a connection.",
}
flagServerInitialStreamWindowSize = &cli.StringFlag{
Name: "server-initial-stream-window-size",
EnvVars: []string{"SERVER_INITIAL_STREAM_WINDOW_SIZE"},
Usage: "Window size for stream.",
}
flagReplicationClientReadBufferSize = flags.FlagDesc{
Name: "replication-client-read-buffer-size",
Envs: []string{"REPLICATION_CLIENT_READ_BUFFER_SIZE"},
Expand Down
21 changes: 19 additions & 2 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -117,7 +118,7 @@ func start(c *cli.Context) error {
return err
}

sn, err := storagenode.NewStorageNode(
snOpts := []storagenode.Option{
storagenode.WithClusterID(clusterID),
storagenode.WithStorageNodeID(storageNodeID),
storagenode.WithListenAddress(c.String(flagListen.Name)),
Expand All @@ -138,7 +139,23 @@ func start(c *cli.Context) error {
storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))),
storagenode.WithDefaultStorageOptions(storageOpts...),
storagenode.WithLogger(logger),
)
}
if initialConnWindowSize := c.String(flagServerInitialConnWindowSize.Name); initialConnWindowSize != "" {
size, err := units.FromByteSizeString(initialConnWindowSize, 0, math.MaxInt32)
if err != nil {
return err
}
snOpts = append(snOpts, storagenode.WithGRPCServerInitialConnWindowSize(int32(size)))
}
if initialStreamWindowSize := c.String(flagServerInitialStreamWindowSize.Name); initialStreamWindowSize != "" {
size, err := units.FromByteSizeString(initialStreamWindowSize, 0, math.MaxInt32)
if err != nil {
return err
}
snOpts = append(snOpts, storagenode.WithGRPCServerInitialWindowSize(int32(size)))
}

sn, err := storagenode.NewStorageNode(snOpts...)
if err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions internal/storagenode/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type config struct {
grpcServerReadBufferSize int64
grpcServerWriteBufferSize int64
grpcServerMaxRecvMsgSize int64
grpcServerInitialConnWindowSize struct {
value int32
set bool
}
grpcServerInitialWindowSize struct {
value int32
set bool
}
replicateClientReadBufferSize int64
replicateClientWriteBufferSize int64
maxLogStreamReplicasCount int32
Expand Down Expand Up @@ -168,6 +176,20 @@ func WithGRPCServerMaxRecvMsgSize(grpcServerMaxRecvMsgSize int64) Option {
})
}

func WithGRPCServerInitialConnWindowSize(grpcServerInitialConnWindowSize int32) Option {
return newFuncOption(func(cfg *config) {
cfg.grpcServerInitialConnWindowSize.value = grpcServerInitialConnWindowSize
cfg.grpcServerInitialConnWindowSize.set = true
})
}

func WithGRPCServerInitialWindowSize(grpcServerInitialWindowSize int32) Option {
return newFuncOption(func(cfg *config) {
cfg.grpcServerInitialWindowSize.value = grpcServerInitialWindowSize
cfg.grpcServerInitialWindowSize.set = true
})
}

func WithReplicateClientReadBufferSize(replicateClientReadBufferSize int64) Option {
return newFuncOption(func(cfg *config) {
cfg.replicateClientReadBufferSize = replicateClientReadBufferSize
Expand Down
14 changes: 11 additions & 3 deletions internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) {
}
dataDirs = filterValidDataDirectories(dataDirs, cfg.cid, cfg.snid, cfg.logger)

grpcServer := grpc.NewServer(
grpcServerOpts := []grpc.ServerOption{
grpc.ReadBufferSize(int(cfg.grpcServerReadBufferSize)),
grpc.WriteBufferSize(int(cfg.grpcServerWriteBufferSize)),
grpc.MaxRecvMsgSize(int(cfg.grpcServerMaxRecvMsgSize)),
Expand All @@ -118,12 +118,18 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) {
return resp, err
},
),
)
}
if opt := cfg.grpcServerInitialConnWindowSize; opt.set {
grpcServerOpts = append(grpcServerOpts, grpc.InitialConnWindowSize(opt.value))
}
if opt := cfg.grpcServerInitialWindowSize; opt.set {
grpcServerOpts = append(grpcServerOpts, grpc.InitialWindowSize(opt.value))
}

sn := &StorageNode{
config: cfg,
executors: executorsmap.New(hintNumExecutors),
server: grpcServer,
server: grpc.NewServer(grpcServerOpts...),
healthServer: health.NewServer(),
closedC: make(chan struct{}),
snPaths: snPaths,
Expand Down Expand Up @@ -198,6 +204,8 @@ func (sn *StorageNode) Serve() error {
zap.Int64("grpcServerReadBufferSize", sn.grpcServerReadBufferSize),
zap.Int64("grpcServerWriteBufferSize", sn.grpcServerWriteBufferSize),
zap.Int64("grpcServerMaxRecvMsgSize", sn.grpcServerMaxRecvMsgSize),
zap.Int32("grpcServerInitialConnWindowSize", sn.grpcServerInitialConnWindowSize.value),
zap.Int32("grpcServerInitialStreamWindowSize", sn.grpcServerInitialWindowSize.value),
zap.Int64("grpcReplicateClientReadBufferSize", sn.replicateClientReadBufferSize),
zap.Int64("grpcReplicateClientWriteBufferSize", sn.replicateClientWriteBufferSize),
)
Expand Down

0 comments on commit 82c1e97

Please sign in to comment.