Skip to content

Commit

Permalink
replication: additional fields for metrics (#1874)
Browse files Browse the repository at this point in the history
  • Loading branch information
poornas authored Aug 21, 2023
1 parent 11ae9b4 commit 1eac111
Showing 1 changed file with 120 additions and 66 deletions.
186 changes: 120 additions & 66 deletions pkg/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,49 +689,63 @@ func (e ExistingObjectReplication) Validate() error {
// TargetMetrics represents inline replication metrics
// such as pending, failed and completed bytes in total for a bucket remote target
type TargetMetrics struct {
// Pending size in bytes
PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
// Completed count
ReplicatedCount uint64 `json:"replicationCount,omitempty"`
// Completed size in bytes
ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
// Total Replica size in bytes
ReplicaSize uint64 `json:"replicaSize,omitempty"`
// Failed size in bytes
FailedSize uint64 `json:"failedReplicationSize,omitempty"`
// Total number of pending operations including metadata updates
PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
// Total number of failed operations including metadata updates
FailedCount uint64 `json:"failedReplicationCount,omitempty"`
// Bandwidth limit in bytes/sec for this target
BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"`
// Current bandwidth used in bytes/sec for this target
CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"`
// Completed count
ReplicatedCount uint64 `json:"replicationCount,omitempty"`
// transfer rate for large uploads
XferRateLrg XferStats `json:"largeTransferRate"`
// transfer rate for small uploads
XferRateSml XferStats `json:"smallTransferRate"`
// errors seen in replication in last minute, hour and total
Failed TimedErrStats `json:"failed,omitempty"`
}

// Metrics represents inline replication metrics for a bucket.
type Metrics struct {
Stats map[string]TargetMetrics
// Total Pending size in bytes across targets
PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
// Completed size in bytes across targets
ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
// Total Replica size in bytes across targets
ReplicaSize uint64 `json:"replicaSize,omitempty"`
// Failed size in bytes across targets
FailedSize uint64 `json:"failedReplicationSize,omitempty"`
// Total number of pending operations including metadata updates across targets
PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
// Total number of failed operations including metadata updates across targets
FailedCount uint64 `json:"failedReplicationCount,omitempty"`
// Total Replica counts
ReplicaCount int64 `json:"replicaCount,omitempty"`
// Total Replicated count
ReplicatedCount int64 `json:"replicationCount,omitempty"`
// errors seen in replication in last minute, hour and total
Errors TimedErrStats `json:"failed,omitempty"`
// Total number of entries that are queued for replication
QStats InQueueMetric `json:"queued"`
}

// RStat - has count and bytes for replication metrics
type RStat struct {
Count float64 `json:"count"`
Bytes int64 `json:"bytes"`
}

// Add two RStat
func (r RStat) Add(r1 RStat) RStat {
return RStat{
Count: r.Count + r1.Count,
Bytes: r.Bytes + r1.Bytes,
}
}

// TimedErrStats holds error stats for a time period
type TimedErrStats struct {
LastMinute RStat `json:"lastMinute"`
LastHour RStat `json:"lastHour"`
Totals RStat `json:"totals"`
}

// Add two TimedErrStats
func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats {
return TimedErrStats{
LastMinute: te.LastMinute.Add(o.LastMinute),
LastHour: te.LastHour.Add(o.LastHour),
Totals: te.Totals.Add(o.Totals),
}
}

// ResyncTargetsInfo provides replication target information to resync replicated data.
Expand Down Expand Up @@ -767,10 +781,30 @@ type XferStats struct {
CurrRate float64 `json:"currRate"`
}

// InQueueStats holds stats for objects in replication queue
type InQueueStats struct {
Count int32 `json:"count"`
Bytes int64 `json:"bytes"`
// Merge two XferStats
func (x *XferStats) Merge(x1 XferStats) {
x.AvgRate += x1.AvgRate
x.PeakRate += x1.PeakRate
x.CurrRate += x1.CurrRate
}

// QStat holds count and bytes for objects in replication queue
type QStat struct {
Count float64 `json:"count"`
Bytes float64 `json:"bytes"`
}

// Add 2 QStat entries
func (q *QStat) Add(q1 QStat) {
q.Count += q1.Count
q.Bytes += q1.Bytes
}

// InQueueMetric holds stats for objects in replication queue
type InQueueMetric struct {
Curr QStat `json:"curr" msg:"cq"`
Avg QStat `json:"avg" msg:"aq"`
Max QStat `json:"peak" msg:"pq"`
}

// MetricName name of replication metric
Expand All @@ -785,16 +819,34 @@ const (
Total MetricName = "Total"
)

// WorkerStat has stats on number of replication workers
type WorkerStat struct {
Curr int32 `json:"curr"`
Avg float32 `json:"avg"`
Max int32 `json:"max"`
}

// ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes
// and number of entries that failed replication after 3 retries
type ReplMRFStats struct {
LastFailedCount uint64 `json:"failedCount_last5min"`
// Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
TotalDroppedCount uint64 `json:"droppedCount_since_uptime"`
// Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"`
}

// ReplQNodeStats holds stats for a node in replication queue
type ReplQNodeStats struct {
NodeName string `json:"nodeName"`
Uptime int64 `json:"uptime"`
ActiveWorkers int32 `json:"activeWorkers"`
NodeName string `json:"nodeName"`
Uptime int64 `json:"uptime"`
Workers WorkerStat `json:"activeWorkers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"`
XferStats map[MetricName]XferStats `json:"transferSummary"`
TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"`

QStats map[MetricName]InQueueStats `json:"qStats"`
QStats InQueueMetric `json:"queueStats"`
MRFStats ReplMRFStats `json:"mrfStats"`
}

// ReplQueueStats holds stats for replication queue across nodes
Expand All @@ -803,33 +855,54 @@ type ReplQueueStats struct {
}

// Workers returns number of workers across all nodes
func (q ReplQueueStats) Workers() int64 {
var workers int64
func (q ReplQueueStats) Workers() (tot WorkerStat) {
for _, node := range q.Nodes {
workers += int64(node.ActiveWorkers)
tot.Avg += node.Workers.Avg
tot.Curr += node.Workers.Curr
if tot.Max < node.Workers.Max {
tot.Max = node.Workers.Max
}
}
if len(q.Nodes) > 0 {
tot.Avg /= float32(len(q.Nodes))
tot.Curr /= int32(len(q.Nodes))
}
return workers
return tot
}

// qStatSummary returns cluster level stats for objects in replication queue
func (q ReplQueueStats) qStatSummary() InQueueMetric {
m := InQueueMetric{}
for _, v := range q.Nodes {
m.Avg.Add(v.QStats.Avg)
m.Curr.Add(v.QStats.Curr)
if m.Max.Count < v.QStats.Max.Count {
m.Max.Add(v.QStats.Max)
}
}
return m
}

// ReplQStats holds stats for objects in replication queue
type ReplQStats struct {
Uptime int64 `json:"uptime"`
Workers int64 `json:"workers"`
Uptime int64 `json:"uptime"`
Workers WorkerStat `json:"workers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"`

QStats map[MetricName]InQueueStats `json:"qStats"`
QStats InQueueMetric `json:"qStats"`
MRFStats ReplMRFStats `json:"mrfStats"`
}

// QStats returns cluster level stats for objects in replication queue
func (q ReplQueueStats) QStats() (r ReplQStats) {
r.QStats = make(map[MetricName]InQueueStats)
r.QStats = q.qStatSummary()
r.XferStats = make(map[MetricName]XferStats)
r.TgtXferStats = make(map[string]map[MetricName]XferStats)
r.Workers = q.Workers()

for _, node := range q.Nodes {
r.Workers += int64(node.ActiveWorkers)
for arn := range node.TgtXferStats {
xmap, ok := node.TgtXferStats[arn]
if !ok {
Expand Down Expand Up @@ -859,39 +932,20 @@ func (q ReplQueueStats) QStats() (r ReplQStats) {
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
r.XferStats[k] = st
}
for k, v := range node.QStats {
st, ok := r.QStats[k]
if !ok {
st = InQueueStats{}
}
st.Count += v.Count
st.Bytes += v.Bytes
r.QStats[k] = st
}
r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount
r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount
r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes
r.Uptime += node.Uptime
}
if len(q.Nodes) > 0 {
for k := range r.XferStats {
st := r.XferStats[k]
st.AvgRate /= float64(len(q.Nodes))
st.CurrRate /= float64(len(q.Nodes))
r.XferStats[k] = st
}
for arn := range r.TgtXferStats {
for m, v := range r.TgtXferStats[arn] {
v.AvgRate /= float64(len(q.Nodes))
v.CurrRate /= float64(len(q.Nodes))
r.TgtXferStats[arn][m] = v
}
}
r.Uptime /= int64(len(q.Nodes)) // average uptime
}

return
}

// MetricsV2 represents replication metrics for a bucket.
type MetricsV2 struct {
Uptime int64 `json:"uptime"`
CurrentStats Metrics `json:"currStats"`
QueueStats ReplQueueStats `json:"queueStats"`
}

0 comments on commit 1eac111

Please sign in to comment.