Skip to content

Commit

Permalink
Make connection pool configurable (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen committed Sep 4, 2024
1 parent 132be15 commit 358d1f0
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Steps:
make static
sudo ./bin/static/k8s-dqlite \
--storage-dir /var/snap/microk8s/current/var/kubernetes/backend \
--listen unix:///var/snap/microk8s/current/var/kubernetes/backend/kine.sock
--listen unix:///var/snap/microk8s/current/var/kubernetes/backend/kine.sock:12379
```

7. While developing and making changes to `k8s-dqlite`, just restart k8s-dqlite
Expand Down
8 changes: 8 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/server"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -32,6 +33,8 @@ var (
otel bool
otelAddress string

connectionPoolConfig generic.ConnectionPoolConfig

watchAvailableStorageInterval time.Duration
watchAvailableStorageMinBytes uint64
lowAvailableStorageAction string
Expand Down Expand Up @@ -108,6 +111,7 @@ var (
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
rootCmdOpts.connectionPoolConfig,
rootCmdOpts.watchQueryTimeout,
)
if err != nil {
Expand Down Expand Up @@ -176,6 +180,10 @@ func init() {
rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint")
rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint")
rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint")
rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.")
rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit")
rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.")
rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxIdleTime, "datastore-connection-max-idle-time", 0*time.Second, "Maximum amount of time a connection may be idle before being closed. If value <= 0, then there is no limit.")
rootCmd.Flags().DurationVar(&rootCmdOpts.watchAvailableStorageInterval, "watch-storage-available-size-interval", 5*time.Second, "Interval to check if the disk is running low on space. Set to 0 to disable the periodic disk size check")
rootCmd.Flags().Uint64Var(&rootCmdOpts.watchAvailableStorageMinBytes, "watch-storage-available-size-min-bytes", 10*1024*1024, "Minimum required available disk size (in bytes) to continue operation. If available disk space gets below this threshold, then the --low-available-storage-action is performed")
rootCmd.Flags().StringVar(&rootCmdOpts.lowAvailableStorageAction, "low-available-storage-action", "none", "Action to perform in case the available storage is low. One of (none|handover|terminate). none means no action is performed. handover means the dqlite node will handover its leadership role, if any. terminate means this dqlite node will shutdown")
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ func init() {
}
}

func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName)
func New(ctx context.Context, datasourceName string, tlsInfo tls.Config, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName, connectionPoolConfig)
return backend, err
}

func NewVariant(ctx context.Context, datasourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, datasourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
logrus.Printf("New kine for dqlite")

// Driver name will be extracted from query parameters
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName)
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName, connectionPoolConfig)
if err != nil {
return nil, nil, errors.Wrap(err, "sqlite client")
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"go.opentelemetry.io/otel/trace"
)

const otelName = "generic"
const (
otelName = "generic"
defaultMaxIdleConns = 2 // default from database/sql
)

var (
otelTracer trace.Tracer
Expand Down Expand Up @@ -161,10 +164,32 @@ type Generic struct {
WatchQueryTimeout time.Duration
}

func configureConnectionPooling(db *sql.DB) {
db.SetMaxIdleConns(5)
db.SetMaxOpenConns(5)
db.SetConnMaxLifetime(60 * time.Second)
type ConnectionPoolConfig struct {
MaxIdle int
MaxOpen int
MaxLifetime time.Duration
MaxIdleTime time.Duration
}

func configureConnectionPooling(connPoolConfig *ConnectionPoolConfig, db *sql.DB) {
// behavior of database/sql - zero means defaultMaxIdleConns; negative means 0
if connPoolConfig.MaxIdle < 0 {
connPoolConfig.MaxIdle = 0
} else if connPoolConfig.MaxIdle == 0 {
connPoolConfig.MaxIdle = defaultMaxIdleConns
}

logrus.Infof(
"Configuring database connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%v, connMaxIdleTime=%v ",
connPoolConfig.MaxIdle,
connPoolConfig.MaxOpen,
connPoolConfig.MaxLifetime,
connPoolConfig.MaxIdleTime,
)
db.SetMaxIdleConns(connPoolConfig.MaxIdle)
db.SetMaxOpenConns(connPoolConfig.MaxOpen)
db.SetConnMaxLifetime(connPoolConfig.MaxLifetime)
db.SetConnMaxIdleTime(connPoolConfig.MaxIdleTime)
}

func q(sql, param string, numbered bool) string {
Expand Down Expand Up @@ -199,7 +224,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig *ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
var (
db *sql.DB
err error
Expand All @@ -218,7 +243,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
}
}

configureConnectionPooling(db)
configureConnectionPooling(connPoolConfig, db)

return &Generic{
DB: prepared.New(db),
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ type opts struct {
admissionControlOnlyWriteQueries bool
}

func New(ctx context.Context, dataSourceName string) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName)
func New(ctx context.Context, dataSourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connectionPoolConfig)
if err != nil {
return nil, err
}

return backend, err
}

func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, driverName, dataSourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
const retryAttempts = 300

opts, err := parseOpts(dataSourceName)
Expand All @@ -65,7 +65,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}

dialect, err := generic.Open(ctx, driverName, opts.dsn, "?", false)
dialect, err := generic.Open(ctx, driverName, opts.dsn, connectionPoolConfig, "?", false)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/kine/drivers/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"database/sql"
"path"
"testing"
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -51,7 +53,13 @@ func TestMigration(t *testing.T) {
}

ctx := context.Background()
if _, err := sqlite.New(ctx, dbPath); err != nil {
connPoolConfig := generic.ConnectionPoolConfig{
MaxIdle: 5,
MaxOpen: 5,
MaxLifetime: 60 * time.Second,
MaxIdleTime: 0 * time.Second,
}
if _, err := sqlite.New(ctx, dbPath, &connPoolConfig); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/kine/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand All @@ -26,9 +27,10 @@ const (
)

type Config struct {
GRPCServer *grpc.Server
Listener string
Endpoint string
GRPCServer *grpc.Server
Listener string
Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig

tls.Config
}
Expand Down Expand Up @@ -180,9 +182,9 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
switch driver {
case SQLiteBackend:
leaderElect = false
backend, err = sqlite.New(ctx, dsn)
backend, err = sqlite.New(ctx, dsn, &cfg.ConnectionPoolConfig)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn, cfg.Config)
backend, err = dqlite.New(ctx, dsn, cfg.Config, &cfg.ConnectionPoolConfig)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/app"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand Down Expand Up @@ -69,6 +70,7 @@ func New(
admissionControlPolicy string,
admissionControlPolicyLimitMaxConcurrentTxn int64,
admissionControlOnlyWriteQueries bool,
connectionPoolConfig generic.ConnectionPoolConfig,
watchQueryTimeout time.Duration,
) (*Server, error) {
var (
Expand Down Expand Up @@ -221,7 +223,8 @@ func New(
}
options = append(options, app.WithTLS(listen, dial))
}

// set datastore connection pool options
kineConfig.ConnectionPoolConfig = connectionPoolConfig
// handle tuning parameters
if exists, err := fileExists(dir, "tuning.yaml"); err != nil {
return nil, fmt.Errorf("failed to check for tuning.yaml: %w", err)
Expand Down

0 comments on commit 358d1f0

Please sign in to comment.