diff --git a/etcdserver/api/rafthttp/http.go b/etcdserver/api/rafthttp/http.go index 18e9c53f24c9..d0e0c81e2090 100644 --- a/etcdserver/api/rafthttp/http.go +++ b/etcdserver/api/rafthttp/http.go @@ -258,6 +258,11 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + snapshotReceiveInflights.WithLabelValues(from).Inc() + defer func() { + snapshotReceiveInflights.WithLabelValues(from).Dec() + }() + if h.lg != nil { h.lg.Info( "receiving database snapshot", diff --git a/etcdserver/api/rafthttp/metrics.go b/etcdserver/api/rafthttp/metrics.go index ce51248d8875..02fff84be7c4 100644 --- a/etcdserver/api/rafthttp/metrics.go +++ b/etcdserver/api/rafthttp/metrics.go @@ -80,6 +80,15 @@ var ( []string{"To"}, ) + snapshotSendInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_inflights_total", + Help: "Total number of inflight snapshot sends", + }, + []string{"To"}, + ) + snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "etcd", Subsystem: "network", @@ -111,6 +120,15 @@ var ( []string{"From"}, ) + snapshotReceiveInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_inflights_total", + Help: "Total number of inflight snapshot receives", + }, + []string{"From"}, + ) + snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "etcd", Subsystem: "network", @@ -156,9 +174,11 @@ func init() { prometheus.MustRegister(recvFailures) prometheus.MustRegister(snapshotSend) + prometheus.MustRegister(snapshotSendInflights) prometheus.MustRegister(snapshotSendFailures) prometheus.MustRegister(snapshotSendSeconds) prometheus.MustRegister(snapshotReceive) + prometheus.MustRegister(snapshotReceiveInflights) prometheus.MustRegister(snapshotReceiveFailures) prometheus.MustRegister(snapshotReceiveSeconds) diff --git a/etcdserver/api/rafthttp/snapshot_sender.go b/etcdserver/api/rafthttp/snapshot_sender.go index 85abaeaa4d1f..62efb0cdc3d8 100644 --- a/etcdserver/api/rafthttp/snapshot_sender.go +++ b/etcdserver/api/rafthttp/snapshot_sender.go @@ -90,6 +90,11 @@ func (s *snapshotSender) send(merged snap.Message) { plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To)) } + snapshotSendInflights.WithLabelValues(to).Inc() + defer func() { + snapshotSendInflights.WithLabelValues(to).Dec() + }() + err := s.post(req) defer merged.CloseWithError(err) if err != nil { @@ -139,7 +144,6 @@ func (s *snapshotSender) send(merged snap.Message) { } sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize)) - snapshotSend.WithLabelValues(to).Inc() snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds()) } diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index a868331fb4c3..e7f4a3747b34 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -76,6 +76,12 @@ var ( Name: "slow_apply_total", Help: "The total number of slow apply requests (likely overloaded from slow disk).", }) + applySnapshotInProgress = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "snapshot_apply_in_progress_total", + Help: "1 if the server is applying the incoming snapshot. 0 if none.", + }) proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "server", @@ -153,6 +159,7 @@ func init() { prometheus.MustRegister(leaderChanges) prometheus.MustRegister(heartbeatSendFailures) prometheus.MustRegister(slowApplies) + prometheus.MustRegister(applySnapshotInProgress) prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsPending) diff --git a/etcdserver/server.go b/etcdserver/server.go index 81ed45afee85..76b802b2e720 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1113,6 +1113,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { if raft.IsEmptySnap(apply.snapshot) { return } + applySnapshotInProgress.Inc() lg := s.getLogger() if lg != nil { @@ -1138,6 +1139,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } else { plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) } + applySnapshotInProgress.Dec() }() if apply.snapshot.Metadata.Index <= ep.appliedi { diff --git a/integration/v3_watch_restore_test.go b/integration/v3_watch_restore_test.go index c5dcb12c152f..b2835a2b5f37 100644 --- a/integration/v3_watch_restore_test.go +++ b/integration/v3_watch_restore_test.go @@ -71,7 +71,25 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // trigger snapshot send from leader to this slow follower // which then calls watchable store Restore clus.Members[0].RecoverPartition(t, clus.Members[1:]...) - clus.WaitLeader(t) + lead := clus.WaitLeader(t) + + sends, err := clus.Members[lead].Metric("etcd_network_snapshot_send_inflights_total") + if err != nil { + t.Fatal(err) + } + if sends != "0" && sends != "1" { + // 0 if already sent, 1 if sending + t.Fatalf("inflight snapshot sends expected 0 or 1, got %q", sends) + } + receives, err := clus.Members[(lead+1)%3].Metric("etcd_network_snapshot_receive_inflights_total") + if err != nil { + t.Fatal(err) + } + if receives != "0" && receives != "1" { + // 0 if already received, 1 if receiving + t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives) + } + time.Sleep(2 * time.Second) // slow follower now applies leader snapshot