From 321aad0238d332fb94352dd439be1e8c599216a3 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 13 Sep 2021 17:53:14 +0800 Subject: [PATCH 1/7] remove watcherEnabled Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index cf4a866e5ba..8c3167fd998 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -207,7 +207,6 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { // Setup key dictionary watcher watcher := clientv3.NewWatcher(m.etcdClient) watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision())) - watcherEnabled := true defer watcher.Close() // Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod). checkPeriod := m.dataKeyRotationPeriod @@ -224,7 +223,6 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { if resp.Canceled { // If the watcher failed, we fallback to reload every 10 minutes. log.Warn("encryption key watcher canceled") - watcherEnabled = false continue } for _, event := range resp.Events { @@ -239,7 +237,7 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { } m.helper.eventAfterReloadByWatcher() case <-m.helper.tick(ticker): - m.checkOnTick(watcherEnabled) + m.checkOnTick() m.helper.eventAfterTicker() case <-ctx.Done(): // Server shutdown. @@ -249,7 +247,7 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { } // checkOnTick perform key rotation and key reload on timer tick, if necessary. -func (m *KeyManager) checkOnTick(watcherEnabled bool) { +func (m *KeyManager) checkOnTick() { m.mu.Lock() defer m.mu.Unlock() // Check data key rotation in case we are the PD leader. @@ -257,13 +255,6 @@ func (m *KeyManager) checkOnTick(watcherEnabled bool) { if err != nil { log.Warn("fail to rotate data encryption key", errs.ZapError(err)) } - // Fallback mechanism to reload keys if watcher failed. - if !watcherEnabled { - _, err = m.loadKeysImpl() - if err != nil { - log.Warn("fail to reload keys after watcher failed", errs.ZapError(err)) - } - } } // loadKeysFromKVImpl reload keys from etcd result. From a63259b308e2f1a33eba3ce8c4c3d56de31592d2 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 13 Sep 2021 19:00:01 +0800 Subject: [PATCH 2/7] encryption: refine key manager watcher loop Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 66 ++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index 8c3167fd998..b39fa0e43e8 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -206,7 +206,6 @@ func (m *KeyManager) keysRevision() int64 { func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { // Setup key dictionary watcher watcher := clientv3.NewWatcher(m.etcdClient) - watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision())) defer watcher.Close() // Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod). checkPeriod := m.dataKeyRotationPeriod @@ -215,33 +214,58 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { } ticker := time.NewTicker(checkPeriod) defer ticker.Stop() - // Loop + for { - select { - // Reload encryption keys updated by PD leader (could be ourselves). - case resp := <-watchChan: - if resp.Canceled { - // If the watcher failed, we fallback to reload every 10 minutes. - log.Warn("encryption key watcher canceled") - continue - } - for _, event := range resp.Events { - if event.Type != mvccpb.PUT { - log.Warn("encryption keys is deleted unexpectedly") - continue + var ( + resp clientv3.WatchResponse + ok bool + ) + rch := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision())) + + keyWatchLoop: + for { + select { + case resp, ok = <-rch: + if !ok || resp.CompactRevision != 0 || resp.Canceled { + break keyWatchLoop } - _, err := m.loadKeysFromKV(event.Kv) - if err != nil { - log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err)) + for _, event := range resp.Events { + if event.Type != mvccpb.PUT { + log.Warn("encryption keys is deleted unexpectedly") + continue + } + _, err := m.loadKeysFromKV(event.Kv) + if err != nil { + log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err)) + } } + m.helper.eventAfterReloadByWatcher() + case <-m.helper.tick(ticker): + m.checkOnTick() + m.helper.eventAfterTicker() } - m.helper.eventAfterReloadByWatcher() - case <-m.helper.tick(ticker): - m.checkOnTick() - m.helper.eventAfterTicker() + } + + select { case <-ctx.Done(): // Server shutdown. return + default: + } + + if resp.CompactRevision != 0 { + // meet compacted error + log.Warn("revision has been compacted, use the compact revision", + zap.Int64("revision", m.keysRevision()), + zap.Int64("compact-revision", resp.CompactRevision)) + } else { + // other error + log.Error("encryption key watcher canceled", + errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err())) + } + + if _, err := m.loadKeys(); err != nil { + log.Error("encryption key reload failed", errs.ZapError(err)) } } } From 09b22c75e9cd4c94e094ed9fa7fe05ed62a4fcc8 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Wed, 15 Sep 2021 13:04:21 +0800 Subject: [PATCH 3/7] fix log Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index b39fa0e43e8..4fb91d5559c 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -255,12 +255,12 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { if resp.CompactRevision != 0 { // meet compacted error - log.Warn("revision has been compacted, use the compact revision", + log.Warn("revision has been compacted, the watcher will watch again", zap.Int64("revision", m.keysRevision()), zap.Int64("compact-revision", resp.CompactRevision)) } else { // other error - log.Error("encryption key watcher canceled", + log.Error("encryption key watcher canceled, the watcher will watch again", errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err())) } From b288ef18cc5327bb0f8909e08365352f0d505d84 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Thu, 16 Sep 2021 14:32:27 +0800 Subject: [PATCH 4/7] add comment Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index 4fb91d5559c..e0031354283 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -227,6 +227,8 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { select { case resp, ok = <-rch: if !ok || resp.CompactRevision != 0 || resp.Canceled { + // If chan is closed or canceled + // Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams break keyWatchLoop } for _, event := range resp.Events { From 34e1abf64d45741d1e11f66b4c0131de782874be Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Thu, 16 Sep 2021 14:33:24 +0800 Subject: [PATCH 5/7] add comment Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index e0031354283..afdd466e572 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -227,7 +227,7 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { select { case resp, ok = <-rch: if !ok || resp.CompactRevision != 0 || resp.Canceled { - // If chan is closed or canceled + // If chan is closed or canceled, exit watch loop // Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams break keyWatchLoop } From bd520e5829894dc5c8830275ea18c8ed084b0c28 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Fri, 24 Sep 2021 18:22:11 +0800 Subject: [PATCH 6/7] address comment Signed-off-by: HunDunDM --- server/encryptionkm/key_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index afdd466e572..68c60498b30 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -226,7 +226,7 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { for { select { case resp, ok = <-rch: - if !ok || resp.CompactRevision != 0 || resp.Canceled { + if !ok || resp.Err() != nil { // If chan is closed or canceled, exit watch loop // Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams break keyWatchLoop From 70f219b6d38fe18fc1d9114247c5f3a8dfcc570e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Mon, 27 Sep 2021 18:36:48 +0800 Subject: [PATCH 7/7] *: use SerialSuites to reduce the servers in tests at the same time (#4171) Signed-off-by: HunDunDM --- server/encryptionkm/key_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/encryptionkm/key_manager_test.go b/server/encryptionkm/key_manager_test.go index 25f93f2ab2e..c6460b4f5d9 100644 --- a/server/encryptionkm/key_manager_test.go +++ b/server/encryptionkm/key_manager_test.go @@ -42,7 +42,7 @@ func TestKeyManager(t *testing.T) { type testKeyManagerSuite struct{} -var _ = Suite(&testKeyManagerSuite{}) +var _ = SerialSuites(&testKeyManagerSuite{}) const ( testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"