From 2468935516f64aad1b5285550cc11edaa37b9394 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Thu, 5 Dec 2024 11:48:46 +0100 Subject: [PATCH 01/17] make progress notify interval configurable --- cmd/root.go | 7 +++++-- docs/configuration.md | 1 + pkg/kine/endpoint/endpoint.go | 6 ++++-- pkg/kine/server/limited.go | 4 +++- pkg/kine/server/server.go | 6 ++++-- pkg/server/server.go | 4 ++++ 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 0c220bef..107586e8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -39,8 +39,9 @@ var ( watchAvailableStorageMinBytes uint64 lowAvailableStorageAction string - etcdMode bool - watchQueryTimeout time.Duration + etcdMode bool + watchQueryTimeout time.Duration + watchProgressNotifyInterval time.Duration } rootCmd = &cobra.Command{ @@ -106,6 +107,7 @@ var ( rootCmdOpts.lowAvailableStorageAction, rootCmdOpts.connectionPoolConfig, rootCmdOpts.watchQueryTimeout, + rootCmdOpts.watchProgressNotifyInterval, ) if err != nil { logrus.WithError(err).Fatal("Failed to create server") @@ -181,6 +183,7 @@ func init() { rootCmd.Flags().Uint64Var(&rootCmdOpts.watchAvailableStorageMinBytes, "watch-storage-available-size-min-bytes", 10*1024*1024, "Minimum required available disk size (in bytes) to continue operation. If available disk space gets below this threshold, then the --low-available-storage-action is performed") rootCmd.Flags().StringVar(&rootCmdOpts.lowAvailableStorageAction, "low-available-storage-action", "none", "Action to perform in case the available storage is low. One of (none|handover|terminate). none means no action is performed. handover means the dqlite node will handover its leadership role, if any. terminate means this dqlite node will shutdown") rootCmd.Flags().DurationVar(&rootCmdOpts.watchQueryTimeout, "watch-query-timeout", 20*time.Second, "Timeout for querying events in the watch poll loop. If timeout is reached, the poll loop will be re-triggered. The minimum value is 5 seconds.") + rootCmd.Flags().DurationVar(&rootCmdOpts.watchProgressNotifyInterval, "watch-progress-notify-interval", 5*time.Second, "Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications.") rootCmd.AddCommand(&cobra.Command{ Use: "version", diff --git a/docs/configuration.md b/docs/configuration.md index ee80413c..4b475415 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -32,6 +32,7 @@ The following configuration options are available listed in a table format: | ~~`--admission-control-policy-limit`~~ | `REMOVED` | - | | ~~`--admission-control-only-for-write-queries`~~ | `REMOVED` | - | | `--watch-query-timeout` | Timeout for querying events in the watch poll loop | `20s` | +| `--watch-progress-notify-interval` | Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications. | `5s` | ## Observability diff --git a/pkg/kine/endpoint/endpoint.go b/pkg/kine/endpoint/endpoint.go index 46880ca9..b3256dd4 100644 --- a/pkg/kine/endpoint/endpoint.go +++ b/pkg/kine/endpoint/endpoint.go @@ -6,6 +6,7 @@ import ( "net" "os" "strings" + "time" "github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite" "github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic" @@ -33,6 +34,7 @@ type Config struct { ConnectionPoolConfig generic.ConnectionPoolConfig tls.Config + NotifyInterval time.Duration } type ETCDConfig struct { @@ -65,7 +67,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { listen = KineSocket } - b := server.New(backend) + b := server.New(backend, config.NotifyInterval) grpcServer := grpcServer(config) b.Register(grpcServer) @@ -130,7 +132,7 @@ func ListenAndReturnBackend(ctx context.Context, config Config) (ETCDConfig, ser listen = KineSocket } - b := server.New(backend) + b := server.New(backend, config.NotifyInterval) grpcServer := grpcServer(config) b.Register(grpcServer) diff --git a/pkg/kine/server/limited.go b/pkg/kine/server/limited.go index 83ea068b..8f8a6b15 100644 --- a/pkg/kine/server/limited.go +++ b/pkg/kine/server/limited.go @@ -3,12 +3,14 @@ package server import ( "context" "fmt" + "time" "go.etcd.io/etcd/api/v3/etcdserverpb" ) type LimitedServer struct { - backend Backend + backend Backend + notifyInterval time.Duration } func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) { diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index e204e952..e2eed0a6 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -21,10 +22,11 @@ type KVServerBridge struct { limited *LimitedServer } -func New(backend Backend) *KVServerBridge { +func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { return &KVServerBridge{ limited: &LimitedServer{ - backend: backend, + backend: backend, + notifyInterval: notifyInterval, }, } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 21c2f89f..79c58d81 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -69,6 +69,8 @@ func New( lowAvailableStorageAction string, connectionPoolConfig generic.ConnectionPoolConfig, watchQueryTimeout time.Duration, + watchProgressNotifyInterval time.Duration, + ) (*Server, error) { var ( options []app.Option @@ -222,6 +224,8 @@ func New( } // set datastore connection pool options kineConfig.ConnectionPoolConfig = connectionPoolConfig + // set watch progress notify interval + kineConfig.NotifyInterval = watchProgressNotifyInterval // handle tuning parameters if exists, err := fileExists(dir, "tuning.yaml"); err != nil { return nil, fmt.Errorf("failed to check for tuning.yaml: %w", err) From e2949674a881ab4539e0ee92e9ffeb9a931bd6c0 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 4 Dec 2024 15:45:16 +0200 Subject: [PATCH 02/17] emulated etcd version --- cmd/root.go | 3 +++ pkg/kine/endpoint/endpoint.go | 2 +- pkg/kine/server/maintenance.go | 5 +++-- pkg/kine/server/server.go | 3 ++- pkg/server/server.go | 1 + 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 107586e8..0463bed4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -32,6 +32,7 @@ var ( metricsAddress string otel bool otelAddress string + emulatedEtcdVersion string connectionPoolConfig generic.ConnectionPoolConfig @@ -102,6 +103,7 @@ var ( rootCmdOpts.diskMode, rootCmdOpts.clientSessionCacheSize, rootCmdOpts.minTLSVersion, + rootCmdOpts.emulatedEtcdVersion, rootCmdOpts.watchAvailableStorageInterval, rootCmdOpts.watchAvailableStorageMinBytes, rootCmdOpts.lowAvailableStorageAction, @@ -175,6 +177,7 @@ func init() { rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint") + rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.13", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit") rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.") diff --git a/pkg/kine/endpoint/endpoint.go b/pkg/kine/endpoint/endpoint.go index b3256dd4..6de8f05d 100644 --- a/pkg/kine/endpoint/endpoint.go +++ b/pkg/kine/endpoint/endpoint.go @@ -31,8 +31,8 @@ type Config struct { GRPCServer *grpc.Server Listener string Endpoint string + EmulatedEtcdVersion string ConnectionPoolConfig generic.ConnectionPoolConfig - tls.Config NotifyInterval time.Duration } diff --git a/pkg/kine/server/maintenance.go b/pkg/kine/server/maintenance.go index bc05aa83..932686ef 100644 --- a/pkg/kine/server/maintenance.go +++ b/pkg/kine/server/maintenance.go @@ -19,8 +19,9 @@ func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusReque return nil, err } return &etcdserverpb.StatusResponse{ - Header: &etcdserverpb.ResponseHeader{}, - DbSize: size, + Header: &etcdserverpb.ResponseHeader{}, + DbSize: size, + Version: s.emulatedETCDVersion, }, nil } diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index e2eed0a6..ec9a9048 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -19,7 +19,8 @@ var ( ) type KVServerBridge struct { - limited *LimitedServer + limited *LimitedServer + emulatedETCDVersion string } func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { diff --git a/pkg/server/server.go b/pkg/server/server.go index 79c58d81..5728943a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,6 +64,7 @@ func New( diskMode bool, clientSessionCacheSize uint, minTLSVersion string, + emulatedEtcdVersion string, watchAvailableStorageInterval time.Duration, watchAvailableStorageMinBytes uint64, lowAvailableStorageAction string, From 5c0e78c27927ea54152a864f7df27da289776b7b Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 4 Dec 2024 15:26:37 +0100 Subject: [PATCH 03/17] nit --- docs/configuration.md | 1 + pkg/kine/server/maintenance.go | 2 +- pkg/kine/server/server.go | 2 +- pkg/server/server.go | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 4b475415..48232bab 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,6 +21,7 @@ The following configuration options are available listed in a table format: | `--otel` | Enable traces endpoint | `false` | | `--otel-listen` | The address to listen for OpenTelemetry endpoint | `127.0.0.1:4317` | | `--metrics-listen` | The address to listen for metrics endpoint | `127.0.0.1:9042` | +| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications. | `3.5.13` | | `--datastore-max-idle-connections` | Maximum number of idle connections retained by datastore | `5` | | `--datastore-max-open-connections` | Maximum number of open connections used by datastore | `5` | | `--datastore-connection-max-lifetime` | Maximum amount of time a connection may be reused | `60s` | diff --git a/pkg/kine/server/maintenance.go b/pkg/kine/server/maintenance.go index 932686ef..5e79cd9e 100644 --- a/pkg/kine/server/maintenance.go +++ b/pkg/kine/server/maintenance.go @@ -21,7 +21,7 @@ func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusReque return &etcdserverpb.StatusResponse{ Header: &etcdserverpb.ResponseHeader{}, DbSize: size, - Version: s.emulatedETCDVersion, + Version: s.emulatedEtcdVersion, }, nil } diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index ec9a9048..c9175bb9 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -20,7 +20,7 @@ var ( type KVServerBridge struct { limited *LimitedServer - emulatedETCDVersion string + emulatedEtcdVersion string } func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { diff --git a/pkg/server/server.go b/pkg/server/server.go index 5728943a..c3a1053d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -345,7 +345,7 @@ func (s *Server) Start(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to start kine: %w", err) } - logrus.WithFields(logrus.Fields{"address": s.kineConfig.Listener, "database": s.kineConfig.Endpoint}).Print("Started kine") + logrus.WithFields(logrus.Fields{"address": s.kineConfig.Listener, "database": s.kineConfig.Endpoint, "emulatedEtcdVersion": s.kineConfig.EmulatedEtcdVersion}).Print("Started kine") s.backend = backend From 03afca5fcc6d18156f012ea760df086db30bda07 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Thu, 5 Dec 2024 09:25:49 +0100 Subject: [PATCH 04/17] test no WatchList supported etcd version with 1.32rc --- .github/workflows/k8s-snap-integration.yaml | 8 ++++---- cmd/root.go | 2 +- docs/configuration.md | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/k8s-snap-integration.yaml b/.github/workflows/k8s-snap-integration.yaml index 02a9f764..2c0fa2f5 100644 --- a/.github/workflows/k8s-snap-integration.yaml +++ b/.github/workflows/k8s-snap-integration.yaml @@ -2,7 +2,7 @@ name: Integration Test K8s-snap on: push: - branches: ["master"] + branches: ["master", "KU-2276/emulated-etcd-version"] #TODO remove KU-2276/emulated-etcd-version pull_request: permissions: @@ -30,9 +30,9 @@ jobs: uses: actions/setup-go@v5 with: go-version: "1.21" - - name: Download latest k8s-snap + - name: Download latest k8s-snap #TODO remove v1.32.0-rc0 run: | - sudo snap download k8s --channel=latest/edge --basename k8s + sudo snap download k8s --channel=latest/edge/v1.32.0-rc0 --basename k8s - name: Install lxd uses: canonical/k8s-snap/.github/actions/install-lxd@main - name: Build k8s-dqlite @@ -52,7 +52,7 @@ jobs: env: TEST_SNAP: ${{ github.workspace }}/k8s-updated.snap TEST_SUBSTRATE: lxd - TEST_LXD_IMAGE: ubuntu:22.04 + TEST_LXD_IMAGE: ${{ matrix.os }} TEST_INSPECTION_REPORTS_DIR: ${{ github.workspace }}/inspection-reports run: | git clone https://github.com/canonical/k8s-snap.git diff --git a/cmd/root.go b/cmd/root.go index 0463bed4..c8065492 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -177,7 +177,7 @@ func init() { rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint") - rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.13", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications.") + rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.12", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.12, in order to indicate no support for watch progress notifications yet.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit") rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.") diff --git a/docs/configuration.md b/docs/configuration.md index 48232bab..44363466 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,7 +21,7 @@ The following configuration options are available listed in a table format: | `--otel` | Enable traces endpoint | `false` | | `--otel-listen` | The address to listen for OpenTelemetry endpoint | `127.0.0.1:4317` | | `--metrics-listen` | The address to listen for metrics endpoint | `127.0.0.1:9042` | -| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate support for watch progress notifications. | `3.5.13` | +| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.12, in order to indicate no support for watch progress notifications yet. | `3.5.12` | | `--datastore-max-idle-connections` | Maximum number of idle connections retained by datastore | `5` | | `--datastore-max-open-connections` | Maximum number of open connections used by datastore | `5` | | `--datastore-connection-max-lifetime` | Maximum amount of time a connection may be reused | `60s` | From d34dfebb4aaa40e42addb966c674dd62f1d4b32f Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Thu, 5 Dec 2024 10:10:57 +0100 Subject: [PATCH 05/17] clean up --- .github/workflows/k8s-snap-integration.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/k8s-snap-integration.yaml b/.github/workflows/k8s-snap-integration.yaml index 2c0fa2f5..3ffff646 100644 --- a/.github/workflows/k8s-snap-integration.yaml +++ b/.github/workflows/k8s-snap-integration.yaml @@ -2,7 +2,7 @@ name: Integration Test K8s-snap on: push: - branches: ["master", "KU-2276/emulated-etcd-version"] #TODO remove KU-2276/emulated-etcd-version + branches: ["master"] pull_request: permissions: @@ -30,9 +30,9 @@ jobs: uses: actions/setup-go@v5 with: go-version: "1.21" - - name: Download latest k8s-snap #TODO remove v1.32.0-rc0 + - name: Download latest k8s-snap run: | - sudo snap download k8s --channel=latest/edge/v1.32.0-rc0 --basename k8s + sudo snap download k8s --channel=latest/edge --basename k8s - name: Install lxd uses: canonical/k8s-snap/.github/actions/install-lxd@main - name: Build k8s-dqlite From ad404e12624a5e27a64f29021a0392e188fead16 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Thu, 5 Dec 2024 14:49:56 +0100 Subject: [PATCH 06/17] clean-up --- .github/workflows/k8s-snap-integration.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/k8s-snap-integration.yaml b/.github/workflows/k8s-snap-integration.yaml index 3ffff646..02a9f764 100644 --- a/.github/workflows/k8s-snap-integration.yaml +++ b/.github/workflows/k8s-snap-integration.yaml @@ -52,7 +52,7 @@ jobs: env: TEST_SNAP: ${{ github.workspace }}/k8s-updated.snap TEST_SUBSTRATE: lxd - TEST_LXD_IMAGE: ${{ matrix.os }} + TEST_LXD_IMAGE: ubuntu:22.04 TEST_INSPECTION_REPORTS_DIR: ${{ github.workspace }}/inspection-reports run: | git clone https://github.com/canonical/k8s-snap.git From c7a05d36db6f89e2a439c407d9b0478ed29d24c1 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Fri, 6 Dec 2024 14:18:01 +0100 Subject: [PATCH 07/17] switch to etcd 3.5.7 --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 44363466..a512a405 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,7 +21,7 @@ The following configuration options are available listed in a table format: | `--otel` | Enable traces endpoint | `false` | | `--otel-listen` | The address to listen for OpenTelemetry endpoint | `127.0.0.1:4317` | | `--metrics-listen` | The address to listen for metrics endpoint | `127.0.0.1:9042` | -| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.12, in order to indicate no support for watch progress notifications yet. | `3.5.12` | +| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.7, in order to indicate no support for watch progress notifications yet. | `3.5.7` | | `--datastore-max-idle-connections` | Maximum number of idle connections retained by datastore | `5` | | `--datastore-max-open-connections` | Maximum number of open connections used by datastore | `5` | | `--datastore-connection-max-lifetime` | Maximum amount of time a connection may be reused | `60s` | From 749e5ad9f1c25adb0d6a44fc5e5e0ad93380d4ce Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Tue, 10 Dec 2024 09:58:13 +0100 Subject: [PATCH 08/17] signal etcd version supports watch progress notify --- cmd/root.go | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index c8065492..d27c8ebf 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -177,7 +177,7 @@ func init() { rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint") - rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.12", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.12, in order to indicate no support for watch progress notifications yet.") + rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.13", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate no support for watch progress notifications yet.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit") rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.") diff --git a/docs/configuration.md b/docs/configuration.md index a512a405..46d441e0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,7 +21,7 @@ The following configuration options are available listed in a table format: | `--otel` | Enable traces endpoint | `false` | | `--otel-listen` | The address to listen for OpenTelemetry endpoint | `127.0.0.1:4317` | | `--metrics-listen` | The address to listen for metrics endpoint | `127.0.0.1:9042` | -| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.7, in order to indicate no support for watch progress notifications yet. | `3.5.7` | +| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate no support for watch progress notifications yet. | `3.5.13` | | `--datastore-max-idle-connections` | Maximum number of idle connections retained by datastore | `5` | | `--datastore-max-open-connections` | Maximum number of open connections used by datastore | `5` | | `--datastore-connection-max-lifetime` | Maximum amount of time a connection may be reused | `60s` | From 186cd8fecfbd532ecb4ccd7efa1b0fabc273a109 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Tue, 10 Dec 2024 10:15:44 +0100 Subject: [PATCH 09/17] add progress notify --- go.mod | 10 +- go.sum | 32 ++++-- pkg/kine/server/types.go | 9 +- pkg/kine/server/watch.go | 220 +++++++++++++++++++++++++++++++------- pkg/kine/sqllog/sqllog.go | 41 ++++--- test/util_test.go | 10 +- 6 files changed, 253 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index 315f5853..67faf97f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/canonical/k8s-dqlite -go 1.22 +go 1.22.0 + +toolchain go1.22.10 require ( github.com/canonical/go-dqlite/v2 v2.0.0 @@ -21,9 +23,11 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 + golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d golang.org/x/sys v0.26.0 google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 + k8s.io/apimachinery v0.31.3 ) require ( @@ -71,7 +75,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sync v0.8.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect @@ -80,5 +84,7 @@ require ( google.golang.org/protobuf v1.35.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index a2f10948..00424172 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,9 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -42,7 +43,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -63,8 +65,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/renameio v1.0.1 h1:Lh/jXZmvZxb0BBeSY5VKEfidcbcbenKjZFzM/q0fSeU= github.com/google/renameio v1.0.1/go.mod h1:t/HQoYBZSsWSNK35C6CO/TpPLDVWvxOHboWUAweKUpk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -107,8 +109,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= -github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -116,8 +118,9 @@ github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiN github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -208,6 +211,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0= +golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -236,8 +241,9 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -270,8 +276,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -315,5 +321,11 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/apimachinery v0.31.3 h1:6l0WhcYgasZ/wk9ktLq5vLaoXJJr5ts6lkaQzgeYPq4= +k8s.io/apimachinery v0.31.3/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index a437f0e0..23e7d26e 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -19,8 +19,9 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) - Watch(ctx context.Context, key string, revision int64) (<-chan []*Event, error) + Watch(ctx context.Context, key string, revision int64) (WatchResult, error) DbSize(ctx context.Context) (int64, error) + CurrentRevision(ctx context.Context) (int64, error) DoCompact(ctx context.Context) error Close() error } @@ -39,3 +40,9 @@ type Event struct { KV *KeyValue PrevKV *KeyValue } + +type WatchResult struct { + CurrentRevision int64 + CompactRevision int64 + Events <-chan []*Event +} diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index b78f9e8d..0e8189e5 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -4,49 +4,78 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/exp/rand" + "k8s.io/apimachinery/pkg/util/wait" ) var ( watchID int64 ) +// explicit interface check +var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) + +// getProgressReportInterval returns the configured progress report interval, with some jitter +func (s *KVServerBridge) getProgressReportInterval() time.Duration { + // add rand(1/10*notifyInterval) as jitter so that kine will not + // send progress notifications to watchers at the same time even when watchers + // are created at the same time. + jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) + return s.limited.notifyInterval + jitter +} + func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ - server: ws, - backend: s.limited.backend, - watches: map[int64]func(){}, + server: ws, + backend: s.limited.backend, + watches: map[int64]func(){}, + progress: map[int64]chan<- int64{}, } defer w.Close() + go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, 0, 0, nil) + } + if pr := msg.GetProgressRequest(); pr != nil { + w.Progress(ws.Context()) } } } type watcher struct { - sync.Mutex + sync.RWMutex - wg sync.WaitGroup - backend Backend - server etcdserverpb.Watch_WatchServer - watches map[int64]func() + wg sync.WaitGroup + backend Backend + server etcdserverpb.Watch_WatchServer + watches map[int64]func() + progress map[int64]chan<- int64 } func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { + if r.WatchId != clientv3.AutoWatchID { + logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + return + } + w.Lock() defer w.Unlock() @@ -57,8 +86,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) w.wg.Add(1) key := string(r.Key) + startRevision := r.StartRevision + + var progressCh chan int64 + if r.ProgressNotify { + progressCh = make(chan int64) + w.progress[id] = progressCh + } - logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) + logrus.Tracef("WATCH START id=%d, key=%s, revision=%d, progressNotify=%v, watchCount=%d", id, key, startRevision, r.ProgressNotify, len(w.watches)) go func() { defer w.wg.Done() @@ -67,36 +103,74 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) return } - watchCh, err := w.backend.Watch(ctx, key, r.StartRevision) + wr, err := w.backend.Watch(ctx, key, startRevision) + // If the watch result has a non-zero CompactRevision, then the watch request failed due to + // the requested start revision having been compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. if err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) //TODO: need to return currrev and compact rev return } - for events := range watchCh { - if len(events) == 0 { - continue + + trace := logrus.IsLevelEnabled(logrus.TraceLevel) + outer := true + for outer { + var reads int + var events []*Event + var revision int64 + + // Block on initial read from events or progress channel + select { + case events = <-wr.Events: + // got events; read additional queued events from the channel and add to batch + reads++ + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: + inner = false + } + } + case revision = <-progressCh: + // have been requested to send progress with no events } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + // get max revision from collected events + if len(events) > 0 { + revision = events[len(events)-1].KV.ModRevision + if trace { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // send response. note that there are no events if this is a progress response. + if revision >= startRevision { + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(revision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, 0, 0, err) + } } } - w.Cancel(id, nil) + w.Cancel(id, 0, 0, nil) logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -124,8 +198,12 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error) { +func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { w.Lock() + if progressCh, ok := w.progress[watchID]; ok { + close(progressCh) + delete(w.progress, watchID) + } if cancel, ok := w.watches[watchID]; ok { cancel() delete(w.watches, watchID) @@ -136,23 +214,85 @@ func (w *watcher) Cancel(watchID int64, err error) { if err != nil { reason = err.Error() } - logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: err != nil, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) - if serr != nil && err != nil { + if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) } } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() - for _, v := range w.watches { - v() + for id, progressCh := range w.progress { + close(progressCh) + delete(w.progress, id) + } + for id, cancel := range w.watches { + cancel() + delete(w.watches, id) } w.Unlock() w.wg.Wait() } + +// Progress sends a progress report if all watchers are synced. +// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 +func (w *watcher) Progress(ctx context.Context) { + w.RLock() + defer w.RUnlock() + + logrus.Tracef("WATCH REQUEST PROGRESS") + + // All synced watchers will be blocked in the outer loop and able to receive on the progress channel. + // If any cannot be sent to, then it is not synced and has pending events to be sent. + // Send revision 0, as we don't actually want the watchers to send a progress response if they do receive. + for id, progressCh := range w.progress { + select { + case progressCh <- 0: + default: + logrus.Tracef("WATCH SEND PROGRESS FAILED NOT SYNCED id=%d ", id) + return + } + } + + // If all watchers are synced, send a broadcast progress notification with the latest revision. + id := int64(clientv3.InvalidWatchID) + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) +} + +// ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop +func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { + logrus.Tracef("WATCH PROGRESS TICK") + revision, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return false, nil + } + + w.RLock() + defer w.RUnlock() + + // Send revision to all synced channels + for _, progressCh := range w.progress { + select { + case progressCh <- revision: + default: + } + } + return false, nil +} diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 9cb9b8b7..7d2aa04a 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -22,6 +22,7 @@ const ( otelName = "sqllog" SupersededCount = 100 compactBatchSize = 1000 + pollBatchSize = 500 ) var ( @@ -71,6 +72,7 @@ type SQLLog struct { d Dialect broadcaster broadcaster.Broadcaster[[]*server.Event] notify chan int64 + currentRev int64 wg sync.WaitGroup } @@ -216,6 +218,13 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { return nil } +func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { + if s.currentRev != 0 { + return s.currentRev, nil + } + return s.d.CurrentRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) @@ -338,13 +347,13 @@ func (s *SQLLog) ttl(ctx context.Context) { go run(ctx, key, revision, time.Duration(lease)*time.Second) } - watchCh, err := s.Watch(ctx, "/", startRevision) + wr, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return } - for events := range watchCh { + for events := range wr.Events { for _, event := range events { if event.KV.Lease > 0 { go run(ctx, event.KV.Key, event.KV.ModRevision, time.Duration(event.KV.Lease)*time.Second) @@ -354,7 +363,7 @@ func (s *SQLLog) ttl(ctx context.Context) { }() } -func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<-chan []*server.Event, error) { +func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (server.WatchResult, error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName)) defer span.End() span.SetAttributes( @@ -362,9 +371,12 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- attribute.Int64("startRevision", startRevision), ) + res := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: res} + values, err := s.broadcaster.Subscribe(ctx) if err != nil { - return nil, err + return wr, err } if startRevision > 0 { @@ -374,10 +386,13 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { span.RecordError(err) - return nil, err + if err == server.ErrCompacted { + compact, _, _ := s.d.GetCompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = initialRevision + } } - res := make(chan []*server.Event, 100) if len(initialEvents) > 0 { res <- initialEvents } @@ -396,7 +411,8 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- } } }() - return res, nil + + return wr, nil } func filterEvents(events []*server.Event, key string, startRevision int64) []*server.Event { @@ -457,8 +473,8 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { } func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStart int64) { + s.currentRev = pollStart var ( - last = pollStart skip int64 skipTime time.Time waitForMore = true @@ -474,7 +490,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar case <-ctx.Done(): return case check := <-s.notify: - if check <= last { + if check <= s.currentRev { continue } case <-wait.C: @@ -484,7 +500,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) defer cancel() - rows, err := s.d.After(watchCtx, last, 500) + rows, err := s.d.After(watchCtx, s.currentRev, pollBatchSize) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { logrus.Errorf("fail to list latest changes: %v", err) @@ -504,7 +520,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar waitForMore = len(events) < 100 - rev := last + rev := s.currentRev var ( sequential []*server.Event saveLast bool @@ -515,6 +531,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar // Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3 // we don't want to notify row 4 because 3 is essentially dropped forever. if event.KV.ModRevision != next { + logrus.Tracef("MODREVISION GAP: expected %v, got %v", next, event.KV.ModRevision) if canSkipRevision(next, skip, skipTime) { // This situation should never happen, but we have it here as a fallback just for unknown reasons // we don't want to pause all watches forever @@ -553,7 +570,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar } if saveLast { - last = rev + s.currentRev = rev if len(sequential) > 0 { result <- sequential } diff --git a/test/util_test.go b/test/util_test.go index 6d0d7c1c..985a4815 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -138,8 +138,9 @@ func startSqlite(_ context.Context, tb testing.TB, dir string) (*endpoint.Config } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + NotifyInterval: 5 * time.Second, }, db } @@ -164,8 +165,9 @@ func startDqlite(ctx context.Context, tb testing.TB, dir string, listener *instr } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + NotifyInterval: 5 * time.Second, }, db } From fb7b5397da7e4b54cad1a934e6b9ee4c1633a0c6 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Tue, 10 Dec 2024 13:47:10 +0100 Subject: [PATCH 10/17] handle errors better --- pkg/kine/server/types.go | 6 ++++-- pkg/kine/server/watch.go | 13 +++++++++---- pkg/kine/sqllog/sqllog.go | 21 +++++++++++++++------ 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 23e7d26e..82ee4aeb 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -7,8 +7,9 @@ import ( ) var ( - ErrKeyExists = rpctypes.ErrGRPCDuplicateKey - ErrCompacted = rpctypes.ErrGRPCCompacted + ErrKeyExists = rpctypes.ErrGRPCDuplicateKey + ErrCompacted = rpctypes.ErrGRPCCompacted + ErrGRPCUnhealthy = rpctypes.ErrGRPCUnhealthy ) type Backend interface { @@ -45,4 +46,5 @@ type WatchResult struct { CurrentRevision int64 CompactRevision int64 Events <-chan []*Event + Errorc <-chan error } diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 0e8189e5..af3cdc0d 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -111,8 +111,8 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) // If the watch result has a non-zero CompactRevision, then the watch request failed due to // the requested start revision having been compacted. Pass the current and and compact // revision to the client via the cancel response, along with the correct error message. - if err != nil { - w.Cancel(id, 0, 0, err) //TODO: need to return currrev and compact rev + if err == ErrCompacted { + w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) return } @@ -170,7 +170,12 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } } } - w.Cancel(id, 0, 0, nil) + select { + case err := <-wr.Errorc: + w.Cancel(id, 0, 0, err) + default: + w.Cancel(id, 0, 0, nil) + } logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -218,7 +223,7 @@ func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { serr := w.server.Send(&etcdserverpb.WatchResponse{ Header: txnHeader(revision), - Canceled: err != nil, + Canceled: true, CancelReason: reason, WatchId: watchID, CompactRevision: compactRev, diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 7d2aa04a..0d506d87 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -372,7 +372,8 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se ) res := make(chan []*server.Event, 100) - wr := server.WatchResult{Events: res} + errc := make(chan error, 1) + wr := server.WatchResult{Events: res, Errorc: errc} values, err := s.broadcaster.Subscribe(ctx) if err != nil { @@ -385,12 +386,20 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { - span.RecordError(err) - if err == server.ErrCompacted { - compact, _, _ := s.d.GetCompactRevision(ctx) - wr.CompactRevision = compact - wr.CurrentRevision = initialRevision + if !errors.Is(err, context.Canceled) { + if err == server.ErrCompacted { + logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) + span.RecordError(err) + compact, _, _ := s.d.GetCompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = initialRevision + } else { + errc <- server.ErrGRPCUnhealthy + } } + // TODO: confirm that we want to cancel watch here + close(res) + s.wg.Done() } if len(initialEvents) > 0 { From 7eed67b6ff18a187863132174080286cad96aa4a Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 11 Dec 2024 09:19:18 +0100 Subject: [PATCH 11/17] test progress notify --- test/watch_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/watch_test.go b/test/watch_test.go index 860ce37b..5fa810c5 100644 --- a/test/watch_test.go +++ b/test/watch_test.go @@ -17,6 +17,9 @@ func TestWatch(t *testing.T) { // pollTimeout is the timeout for waiting to receive an event. pollTimeout = 50 * time.Millisecond + // progressNotifyTimeout is the timeout for waiting to receive a progress notify. Adjusts for jitter in the progress notification. + progressNotifyTimeout = 1 * time.Second + // idleTimeout is the amount of time to wait to ensure that no events // are received when they should not. idleTimeout = 100 * time.Millisecond @@ -113,6 +116,16 @@ func TestWatch(t *testing.T) { g.Consistently(watchCh, idleTimeout).ShouldNot(Receive()) }) + t.Run("ProgressNotify", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + g := NewWithT(t) + err := kine.client.RequestProgress(ctx) + g.Expect(err).NotTo(HaveOccurred()) + + g.Eventually(watchCh, progressNotifyTimeout).Should(ReceiveProgressNotify(g)) + }) + }) } } @@ -131,6 +144,12 @@ func ReceiveEvents(g Gomega, checks ...EventMatcher) types.GomegaMatcher { })) } +func ReceiveProgressNotify(g Gomega) types.GomegaMatcher { + return Receive(Satisfy(func(watch clientv3.WatchResponse) bool { + return watch.IsProgressNotify() + })) +} + func CreateEvent(g Gomega, key, value string, revision int64) EventMatcher { return func(event *clientv3.Event) bool { ok := g.Expect(event.Type).To(Equal(clientv3.EventTypePut)) From 521ca86fcf8c2c0e8912e8d1b819f9ff33f21250 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 11 Dec 2024 11:25:44 +0100 Subject: [PATCH 12/17] remove configuration option for emulatedEtcdVersion --- cmd/root.go | 3 --- docs/configuration.md | 1 - pkg/kine/endpoint/endpoint.go | 1 - pkg/kine/server/server.go | 4 ++++ pkg/server/server.go | 3 +-- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index d27c8ebf..107586e8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -32,7 +32,6 @@ var ( metricsAddress string otel bool otelAddress string - emulatedEtcdVersion string connectionPoolConfig generic.ConnectionPoolConfig @@ -103,7 +102,6 @@ var ( rootCmdOpts.diskMode, rootCmdOpts.clientSessionCacheSize, rootCmdOpts.minTLSVersion, - rootCmdOpts.emulatedEtcdVersion, rootCmdOpts.watchAvailableStorageInterval, rootCmdOpts.watchAvailableStorageMinBytes, rootCmdOpts.lowAvailableStorageAction, @@ -177,7 +175,6 @@ func init() { rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint") rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint") - rootCmd.Flags().StringVar(&rootCmdOpts.emulatedEtcdVersion, "emulated-etcd-version", "3.5.13", "The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate no support for watch progress notifications yet.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.") rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit") rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.") diff --git a/docs/configuration.md b/docs/configuration.md index 46d441e0..4b475415 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,7 +21,6 @@ The following configuration options are available listed in a table format: | `--otel` | Enable traces endpoint | `false` | | `--otel-listen` | The address to listen for OpenTelemetry endpoint | `127.0.0.1:4317` | | `--metrics-listen` | The address to listen for metrics endpoint | `127.0.0.1:9042` | -| `--emulated-etcd-version` | The emulated etcd version to return on a call to the status endpoint. Defaults to 3.5.13, in order to indicate no support for watch progress notifications yet. | `3.5.13` | | `--datastore-max-idle-connections` | Maximum number of idle connections retained by datastore | `5` | | `--datastore-max-open-connections` | Maximum number of open connections used by datastore | `5` | | `--datastore-connection-max-lifetime` | Maximum amount of time a connection may be reused | `60s` | diff --git a/pkg/kine/endpoint/endpoint.go b/pkg/kine/endpoint/endpoint.go index 6de8f05d..1048638a 100644 --- a/pkg/kine/endpoint/endpoint.go +++ b/pkg/kine/endpoint/endpoint.go @@ -31,7 +31,6 @@ type Config struct { GRPCServer *grpc.Server Listener string Endpoint string - EmulatedEtcdVersion string ConnectionPoolConfig generic.ConnectionPoolConfig tls.Config NotifyInterval time.Duration diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index c9175bb9..85d18eec 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -13,6 +13,10 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) +// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. +// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 +const emulatedEtcdVersion = "3.5.13" + var ( _ etcdserverpb.KVServer = (*KVServerBridge)(nil) _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) diff --git a/pkg/server/server.go b/pkg/server/server.go index c3a1053d..79c58d81 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,7 +64,6 @@ func New( diskMode bool, clientSessionCacheSize uint, minTLSVersion string, - emulatedEtcdVersion string, watchAvailableStorageInterval time.Duration, watchAvailableStorageMinBytes uint64, lowAvailableStorageAction string, @@ -345,7 +344,7 @@ func (s *Server) Start(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to start kine: %w", err) } - logrus.WithFields(logrus.Fields{"address": s.kineConfig.Listener, "database": s.kineConfig.Endpoint, "emulatedEtcdVersion": s.kineConfig.EmulatedEtcdVersion}).Print("Started kine") + logrus.WithFields(logrus.Fields{"address": s.kineConfig.Listener, "database": s.kineConfig.Endpoint}).Print("Started kine") s.backend = backend From b0b95c3a4f2735534c7b0650e03a84acba2cf051 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 11 Dec 2024 14:58:05 +0100 Subject: [PATCH 13/17] add clarification, nits --- pkg/kine/server/watch.go | 24 +++++++++++++++--------- pkg/kine/sqllog/sqllog.go | 13 ++++++++++--- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index af3cdc0d..fbb2f40b 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -108,8 +108,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } wr, err := w.backend.Watch(ctx, key, startRevision) - // If the watch result has a non-zero CompactRevision, then the watch request failed due to - // the requested start revision having been compacted. Pass the current and and compact + // the requested start revision is compacted. Pass the current and and compact // revision to the client via the cancel response, along with the correct error message. if err == ErrCompacted { w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) @@ -123,10 +122,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) var events []*Event var revision int64 - // Block on initial read from events or progress channel + // Wait for events or progress notifications select { - case events = <-wr.Events: - // got events; read additional queued events from the channel and add to batch + case events, ok := <-wr.Events: + if !ok { + // Channel was closed, break out of the loop + outer = false + break + } + // We received events; batch any additional queued events reads++ inner := true for inner { @@ -135,19 +139,20 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) reads++ events = append(events, e...) if !ok { - // channel was closed, break out of both loops + // channel is closed, break out of both loops inner = false outer = false } default: + // No more events in the queue, we can exit the inner loop inner = false } } case revision = <-progressCh: - // have been requested to send progress with no events + // Received progress update without events } - // get max revision from collected events + // Determine the highest revision among the collected events if len(events) > 0 { revision = events[len(events)-1].KV.ModRevision if trace { @@ -157,7 +162,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } } - // send response. note that there are no events if this is a progress response. + // Send response, even if this is a progress-only response and no events occured if revision >= startRevision { wr := &etcdserverpb.WatchResponse{ Header: txnHeader(revision), @@ -170,6 +175,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } } } + // handle errors from the channel or gracefully cancel the watch select { case err := <-wr.Errorc: w.Cancel(id, 0, 0, err) diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 0d506d87..fa020aa7 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -375,8 +375,11 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se errc := make(chan error, 1) wr := server.WatchResult{Events: res, Errorc: errc} + // starting watching right away so we don't miss anything + ctx, cancel := context.WithCancel(ctx) values, err := s.broadcaster.Subscribe(ctx) if err != nil { + cancel() return wr, err } @@ -387,6 +390,7 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { if !errors.Is(err, context.Canceled) { + // In case the key has been compacted we need to inform the api-server about the current-revision and the compact revision in the cancel watch response to the api-server. if err == server.ErrCompacted { logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) span.RecordError(err) @@ -394,12 +398,13 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se wr.CompactRevision = compact wr.CurrentRevision = initialRevision } else { + // If the After query fails because k8s-dqlite restarts we cancel the watch and return an error message that the api-server understands: server.ErrGRPCUnhealthy + // See fix: https://github.com/k3s-io/kine/pull/373 errc <- server.ErrGRPCUnhealthy } } - // TODO: confirm that we want to cancel watch here - close(res) - s.wg.Done() + // Cancel the watcher by cancelling the context of its subscription to the broadcaster + cancel() } if len(initialEvents) > 0 { @@ -411,8 +416,10 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se defer func() { close(res) s.wg.Done() + cancel() }() + // Filter for events that update/create/delete the given key for events := range values { filtered := filterEvents(events, key, initialRevision) if len(filtered) > 0 { From 7edce5ce55b7ff3c42591212fd26f9f4c6de512f Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 11 Dec 2024 17:11:57 +0100 Subject: [PATCH 14/17] fix --- pkg/kine/server/watch.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index fbb2f40b..559dfe26 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -124,12 +124,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) // Wait for events or progress notifications select { - case events, ok := <-wr.Events: - if !ok { - // Channel was closed, break out of the loop - outer = false - break - } + case events = <-wr.Events: // We received events; batch any additional queued events reads++ inner := true From c700d463d22836534147c5ec45e02bfc0e816ab2 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Wed, 18 Dec 2024 15:47:40 +0100 Subject: [PATCH 15/17] Marco's suggestions --- go.mod | 19 ++++--- go.sum | 29 ++++------ pkg/kine/server/maintenance.go | 6 +- pkg/kine/server/server.go | 7 +-- pkg/kine/server/types.go | 10 +--- pkg/kine/server/watch.go | 101 ++++++++++++++++++++------------- pkg/kine/sqllog/sqllog.go | 60 +++++++++----------- 7 files changed, 116 insertions(+), 116 deletions(-) diff --git a/go.mod b/go.mod index 67faf97f..52ca0cc0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.10 require ( github.com/canonical/go-dqlite/v2 v2.0.0 github.com/mattn/go-sqlite3 v1.14.22 - github.com/onsi/gomega v1.27.10 + github.com/onsi/gomega v1.33.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/sirupsen/logrus v1.9.3 @@ -23,11 +23,9 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 - golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d - golang.org/x/sys v0.26.0 + golang.org/x/sys v0.28.0 google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 - k8s.io/apimachinery v0.31.3 ) require ( @@ -37,6 +35,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -45,6 +44,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect github.com/google/renameio v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect @@ -57,6 +57,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/onsi/ginkgo/v2 v2.19.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.50.0 // indirect github.com/prometheus/procfs v0.13.0 // indirect @@ -73,18 +75,17 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.28.0 // indirect - golang.org/x/net v0.30.0 // indirect + golang.org/x/crypto v0.30.0 // indirect + golang.org/x/net v0.32.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect + golang.org/x/tools v0.28.0 // indirect google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index 00424172..5a52c69f 100644 --- a/go.sum +++ b/go.sum @@ -42,7 +42,6 @@ github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -111,8 +110,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= -github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiNRNwI= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -208,11 +207,9 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -231,8 +228,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -257,14 +254,14 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -321,11 +318,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/apimachinery v0.31.3 h1:6l0WhcYgasZ/wk9ktLq5vLaoXJJr5ts6lkaQzgeYPq4= -k8s.io/apimachinery v0.31.3/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= -k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= -k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= -k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/kine/server/maintenance.go b/pkg/kine/server/maintenance.go index 5e79cd9e..cc6a26b5 100644 --- a/pkg/kine/server/maintenance.go +++ b/pkg/kine/server/maintenance.go @@ -9,6 +9,10 @@ import ( var _ etcdserverpb.MaintenanceServer = (*KVServerBridge)(nil) +// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. +// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 +const emulatedEtcdVersion = "3.5.13" + func (s *KVServerBridge) Alarm(context.Context, *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { return nil, fmt.Errorf("alarm is not supported") } @@ -21,7 +25,7 @@ func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusReque return &etcdserverpb.StatusResponse{ Header: &etcdserverpb.ResponseHeader{}, DbSize: size, - Version: s.emulatedEtcdVersion, + Version: emulatedEtcdVersion, }, nil } diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index 85d18eec..e2eed0a6 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -13,18 +13,13 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. -// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 -const emulatedEtcdVersion = "3.5.13" - var ( _ etcdserverpb.KVServer = (*KVServerBridge)(nil) _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) ) type KVServerBridge struct { - limited *LimitedServer - emulatedEtcdVersion string + limited *LimitedServer } func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 82ee4aeb..dd9fc883 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -20,9 +20,10 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) - Watch(ctx context.Context, key string, revision int64) (WatchResult, error) + Watch(ctx context.Context, key string, revision int64) (<-chan []*Event, error) DbSize(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) + GetCompactRevision(ctx context.Context) (int64, int64, error) DoCompact(ctx context.Context) error Close() error } @@ -41,10 +42,3 @@ type Event struct { KV *KeyValue PrevKV *KeyValue } - -type WatchResult struct { - CurrentRevision int64 - CompactRevision int64 - Events <-chan []*Event - Errorc <-chan error -} diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 559dfe26..761fe653 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -10,8 +10,6 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" - "golang.org/x/exp/rand" - "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -21,15 +19,6 @@ var ( // explicit interface check var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) -// getProgressReportInterval returns the configured progress report interval, with some jitter -func (s *KVServerBridge) getProgressReportInterval() time.Duration { - // add rand(1/10*notifyInterval) as jitter so that kine will not - // send progress notifications to watchers at the same time even when watchers - // are created at the same time. - jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) - return s.limited.notifyInterval + jitter -} - func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ server: ws, @@ -39,7 +28,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } defer w.Close() - go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) + w.pollProgressNotify(ws.Context(), s.limited.notifyInterval) for { msg, err := ws.Recv() @@ -52,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } if cr := msg.GetCancelRequest(); cr != nil { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) - w.Cancel(cr.WatchId, 0, 0, nil) + w.Cancel(cr.WatchId, nil, ws.Context()) } if pr := msg.GetProgressRequest(); pr != nil { w.Progress(ws.Context()) @@ -60,8 +49,38 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } } +// pollProgressNotify periodically sends progress notifications to all watchers. +func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) { + ch := make(chan struct{}, 1) + + go func() { + defer close(ch) + + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + // Skip this tick if ProgressIfSynced is still running. + select { + case ch <- struct{}{}: + if err := w.ProgressIfSynced(ctx); err != nil { + logrus.Errorf("Failed to send progress notification: %v", err) + } + <-ch + default: + logrus.Warn("Skipping progress notification: still busy.") + } + } + } + }() +} + type watcher struct { - sync.RWMutex + sync.Mutex wg sync.WaitGroup backend Backend @@ -72,7 +91,7 @@ type watcher struct { func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { if r.WatchId != clientv3.AutoWatchID { - logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + logrus.Errorf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) return } @@ -103,15 +122,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, 0, 0, err) + w.Cancel(id, err, ctx) return } - wr, err := w.backend.Watch(ctx, key, startRevision) - // the requested start revision is compacted. Pass the current and and compact - // revision to the client via the cancel response, along with the correct error message. - if err == ErrCompacted { - w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) + watchCh, err := w.backend.Watch(ctx, key, startRevision) + if err != nil { + logrus.Errorf("Failed to start watch: %v", err) + w.Cancel(id, err, ctx) return } @@ -124,13 +142,13 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) // Wait for events or progress notifications select { - case events = <-wr.Events: + case events = <-watchCh: // We received events; batch any additional queued events reads++ inner := true for inner { select { - case e, ok := <-wr.Events: + case e, ok := <-watchCh: reads++ events = append(events, e...) if !ok { @@ -166,17 +184,11 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) if err := w.server.Send(wr); err != nil { - w.Cancel(id, 0, 0, err) + w.Cancel(id, err, ctx) } } } - // handle errors from the channel or gracefully cancel the watch - select { - case err := <-wr.Errorc: - w.Cancel(id, 0, 0, err) - default: - w.Cancel(id, 0, 0, nil) - } + logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -204,7 +216,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { +func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { w.Lock() if progressCh, ok := w.progress[watchID]; ok { close(progressCh) @@ -216,9 +228,20 @@ func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { } w.Unlock() + revision := int64(0) + compactRev := int64(0) reason := "" if err != nil { reason = err.Error() + if err == ErrCompacted { + // the requested start revision is compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + compactRev, revision, err = w.backend.GetCompactRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err) + compactRev = 0 + } + } } logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) @@ -252,8 +275,8 @@ func (w *watcher) Close() { // Progress sends a progress report if all watchers are synced. // Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 func (w *watcher) Progress(ctx context.Context) { - w.RLock() - defer w.RUnlock() + w.Lock() + defer w.Unlock() logrus.Tracef("WATCH REQUEST PROGRESS") @@ -282,16 +305,16 @@ func (w *watcher) Progress(ctx context.Context) { } // ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop -func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { +func (w *watcher) ProgressIfSynced(ctx context.Context) error { logrus.Tracef("WATCH PROGRESS TICK") revision, err := w.backend.CurrentRevision(ctx) if err != nil { logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) - return false, nil + return err } - w.RLock() - defer w.RUnlock() + w.Lock() + defer w.Unlock() // Send revision to all synced channels for _, progressCh := range w.progress { @@ -300,5 +323,5 @@ func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { default: } } - return false, nil + return nil } diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index fa020aa7..6535a1b3 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -72,7 +72,6 @@ type SQLLog struct { d Dialect broadcaster broadcaster.Broadcaster[[]*server.Event] notify chan int64 - currentRev int64 wg sync.WaitGroup } @@ -194,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { // small batches. Given that this logic runs every second, // on regime it should take usually just a couple batches // to keep the pace. - start, target, err := s.d.GetCompactRevision(ctx) + start, target, err := s.GetCompactRevision(ctx) if err != nil { return err } @@ -219,12 +218,13 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { } func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { - if s.currentRev != 0 { - return s.currentRev, nil - } return s.d.CurrentRevision(ctx) } +func (s *SQLLog) GetCompactRevision(ctx context.Context) (int64, int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) @@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 attribute.Int64("limit", limit), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -347,13 +347,13 @@ func (s *SQLLog) ttl(ctx context.Context) { go run(ctx, key, revision, time.Duration(lease)*time.Second) } - wr, err := s.Watch(ctx, "/", startRevision) + watchCh, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return } - for events := range wr.Events { + for events := range watchCh { for _, event := range events { if event.KV.Lease > 0 { go run(ctx, event.KV.Key, event.KV.ModRevision, time.Duration(event.KV.Lease)*time.Second) @@ -363,7 +363,7 @@ func (s *SQLLog) ttl(ctx context.Context) { }() } -func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (server.WatchResult, error) { +func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<-chan []*server.Event, error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName)) defer span.End() span.SetAttributes( @@ -371,16 +371,12 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se attribute.Int64("startRevision", startRevision), ) - res := make(chan []*server.Event, 100) - errc := make(chan error, 1) - wr := server.WatchResult{Events: res, Errorc: errc} - // starting watching right away so we don't miss anything ctx, cancel := context.WithCancel(ctx) values, err := s.broadcaster.Subscribe(ctx) if err != nil { cancel() - return wr, err + return nil, err } if startRevision > 0 { @@ -390,23 +386,19 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { if !errors.Is(err, context.Canceled) { - // In case the key has been compacted we need to inform the api-server about the current-revision and the compact revision in the cancel watch response to the api-server. - if err == server.ErrCompacted { - logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) - span.RecordError(err) - compact, _, _ := s.d.GetCompactRevision(ctx) - wr.CompactRevision = compact - wr.CurrentRevision = initialRevision - } else { - // If the After query fails because k8s-dqlite restarts we cancel the watch and return an error message that the api-server understands: server.ErrGRPCUnhealthy - // See fix: https://github.com/k3s-io/kine/pull/373 - errc <- server.ErrGRPCUnhealthy + span.RecordError(err) + logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) + // We return an error message that the api-server understands: server.ErrGRPCUnhealthy + if err != server.ErrCompacted { + err = server.ErrGRPCUnhealthy } } // Cancel the watcher by cancelling the context of its subscription to the broadcaster cancel() + return nil, err } + res := make(chan []*server.Event, 100) if len(initialEvents) > 0 { res <- initialEvents } @@ -428,7 +420,7 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (se } }() - return wr, nil + return res, nil } func filterEvents(events []*server.Event, key string, startRevision int64) []*server.Event { @@ -453,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { return nil, err } - pollStart, _, err := s.d.GetCompactRevision(ctx) + pollStart, _, err := s.GetCompactRevision(ctx) if err != nil { return nil, err } @@ -489,8 +481,8 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { } func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStart int64) { - s.currentRev = pollStart var ( + last = pollStart skip int64 skipTime time.Time waitForMore = true @@ -506,7 +498,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar case <-ctx.Done(): return case check := <-s.notify: - if check <= s.currentRev { + if check <= last { continue } case <-wait.C: @@ -516,7 +508,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) defer cancel() - rows, err := s.d.After(watchCtx, s.currentRev, pollBatchSize) + rows, err := s.d.After(watchCtx, last, pollBatchSize) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { logrus.Errorf("fail to list latest changes: %v", err) @@ -536,7 +528,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar waitForMore = len(events) < 100 - rev := s.currentRev + rev := last var ( sequential []*server.Event saveLast bool @@ -586,7 +578,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar } if saveLast { - s.currentRev = rev + last = rev if len(sequential) > 0 { result <- sequential } @@ -611,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.GetCompactRevision(ctx) if err != nil { return 0, 0, err } From 83cd6b490ff26eb8ebe56326b4daf5d047f69174 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Fri, 20 Dec 2024 11:20:25 +0100 Subject: [PATCH 16/17] little nits --- pkg/kine/server/watch.go | 28 ++++++++-------------------- pkg/kine/sqllog/sqllog.go | 10 +++++----- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 761fe653..e7074b70 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -41,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } if cr := msg.GetCancelRequest(); cr != nil { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) - w.Cancel(cr.WatchId, nil, ws.Context()) + w.Cancel(cr.WatchId, nil) } if pr := msg.GetProgressRequest(); pr != nil { w.Progress(ws.Context()) @@ -51,11 +51,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { // pollProgressNotify periodically sends progress notifications to all watchers. func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) { - ch := make(chan struct{}, 1) - go func() { - defer close(ch) - tick := time.NewTicker(interval) defer tick.Stop() @@ -64,15 +60,8 @@ func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration case <-ctx.Done(): return case <-tick.C: - // Skip this tick if ProgressIfSynced is still running. - select { - case ch <- struct{}{}: - if err := w.ProgressIfSynced(ctx); err != nil { - logrus.Errorf("Failed to send progress notification: %v", err) - } - <-ch - default: - logrus.Warn("Skipping progress notification: still busy.") + if err := w.ProgressIfSynced(ctx); err != nil { + logrus.Errorf("Failed to send progress notification: %v", err) } } } @@ -122,14 +111,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err, ctx) + w.Cancel(id, err) return } watchCh, err := w.backend.Watch(ctx, key, startRevision) if err != nil { logrus.Errorf("Failed to start watch: %v", err) - w.Cancel(id, err, ctx) + w.Cancel(id, err) return } @@ -184,7 +173,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) if err := w.server.Send(wr); err != nil { - w.Cancel(id, err, ctx) + w.Cancel(id, err) } } } @@ -216,7 +205,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { +func (w *watcher) Cancel(watchID int64, err error) { w.Lock() if progressCh, ok := w.progress[watchID]; ok { close(progressCh) @@ -236,10 +225,9 @@ func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { if err == ErrCompacted { // the requested start revision is compacted. Pass the current and and compact // revision to the client via the cancel response, along with the correct error message. - compactRev, revision, err = w.backend.GetCompactRevision(ctx) + compactRev, revision, err = w.backend.GetCompactRevision(w.server.Context()) if err != nil { logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err) - compactRev = 0 } } } diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 6535a1b3..b4efa9ce 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -193,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { // small batches. Given that this logic runs every second, // on regime it should take usually just a couple batches // to keep the pace. - start, target, err := s.GetCompactRevision(ctx) + start, target, err := s.d.GetCompactRevision(ctx) if err != nil { return err } @@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 attribute.Int64("limit", limit), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -445,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { return nil, err } - pollStart, _, err := s.GetCompactRevision(ctx) + pollStart, _, err := s.d.GetCompactRevision(ctx) if err != nil { return nil, err } @@ -603,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, 0, err } From 8b0d967ac05076b8840719f8c692e8c1e1f47e05 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Fri, 20 Dec 2024 11:43:08 +0100 Subject: [PATCH 17/17] mini nit --- test/watch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/watch_test.go b/test/watch_test.go index 5fa810c5..09981573 100644 --- a/test/watch_test.go +++ b/test/watch_test.go @@ -17,7 +17,7 @@ func TestWatch(t *testing.T) { // pollTimeout is the timeout for waiting to receive an event. pollTimeout = 50 * time.Millisecond - // progressNotifyTimeout is the timeout for waiting to receive a progress notify. Adjusts for jitter in the progress notification. + // progressNotifyTimeout is the timeout for waiting to receive a progress notify. progressNotifyTimeout = 1 * time.Second // idleTimeout is the amount of time to wait to ensure that no events