This repository has been archived by the owner on Oct 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
MongoDB Atlas monitor implementation (#1322)
* MongoDB Atlas monitor implementation Issue: https://signalfuse.atlassian.net/browse/INT-771
- Loading branch information
1 parent
2a304e6
commit ce9f1ea
Showing
13 changed files
with
2,475 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package atlas | ||
|
||
// metricsMap maps Atlas measurement names to metrics. | ||
var metricsMap = map[string]string{ | ||
"ASSERT_REGULAR": assertsRegular, | ||
"ASSERT_WARNING": assertsWarning, | ||
"ASSERT_MSG": assertsMsg, | ||
"ASSERT_USER": assertsUser, | ||
"CACHE_BYTES_READ_INTO": cacheBytesReadInto, | ||
"CACHE_BYTES_WRITTEN_FROM": cacheBytesWrittenFrom, | ||
"CACHE_USED_BYTES": cacheUsedBytes, | ||
"CACHE_DIRTY_BYTES": cacheDirtyBytes, | ||
"OPCOUNTER_CMD": opcounterCommand, | ||
"OPCOUNTER_DELETE": opcounterDelete, | ||
"OPCOUNTER_GETMORE": opcounterGetmore, | ||
"OPCOUNTER_INSERT": opcounterInsert, | ||
"OPCOUNTER_QUERY": opcounterQuery, | ||
"OPCOUNTER_UPDATE": opcounterUpdate, | ||
"OPCOUNTER_REPL_CMD": opcounterReplCommand, | ||
"OPCOUNTER_REPL_DELETE": opcounterReplDelete, | ||
"OPCOUNTER_REPL_INSERT": opcounterReplInsert, | ||
"OPCOUNTER_REPL_UPDATE": opcounterReplUpdate, | ||
"CONNECTIONS": connectionsCurrent, | ||
"CURSORS_TOTAL_OPEN": cursorsTotalOpen, | ||
"CURSORS_TOTAL_TIMED_OUT": cursorsTimedOut, | ||
"DB_STORAGE_TOTAL": storageSize, | ||
"DB_DATA_SIZE_TOTAL": dataSize, | ||
"DB_INDEX_SIZE_TOTAL": indexSize, | ||
"DOCUMENT_METRICS_RETURNED": documentMetricsReturned, | ||
"DOCUMENT_METRICS_INSERTED": documentMetricsInserted, | ||
"DOCUMENT_METRICS_UPDATED": documentMetricsUpdated, | ||
"DOCUMENT_METRICS_DELETED": documentMetricsDeleted, | ||
"MEMORY_RESIDENT": memResident, | ||
"MEMORY_VIRTUAL": memVirtual, | ||
"MEMORY_MAPPED": memMapped, | ||
"NETWORK_BYTES_IN": networkBytesIn, | ||
"NETWORK_BYTES_OUT": networkBytesOut, | ||
"NETWORK_NUM_REQUESTS": networkNumRequests, | ||
"OP_EXECUTION_TIME_READS": opExecutionTimeReads, | ||
"OP_EXECUTION_TIME_WRITES": opExecutionTimeWrites, | ||
"OP_EXECUTION_TIME_COMMANDS": opExecutionTimeCommands, | ||
"OPLOG_RATE_GB_PER_HOUR": oplogRate, | ||
"OPLOG_MASTER_LAG_TIME_DIFF": oplogMasterLagTimeDiff, | ||
"OPLOG_SLAVE_LAG_MASTER_TIME": oplogSlaveLagMasterTime, | ||
"OPLOG_MASTER_TIME": oplogMasterTime, | ||
"EXTRA_INFO_PAGE_FAULTS": extraInfoPageFaults, | ||
"GLOBAL_LOCK_CURRENT_QUEUE_TOTAL": globalLockCurrentQueueTotal, | ||
"GLOBAL_LOCK_CURRENT_QUEUE_READERS": globalLockCurrentQueueReaders, | ||
"GLOBAL_LOCK_CURRENT_QUEUE_WRITERS": globalLockCurrentQueueWriters, | ||
"QUERY_EXECUTOR_SCANNED": queryExecutorScanned, | ||
"QUERY_EXECUTOR_SCANNED_OBJECTS": queryExecutorScannedObjects, | ||
"QUERY_TARGETING_SCANNED_OBJECTS_PER_RETURNED": queryTargetingScannedObjectsPerReturned, | ||
"QUERY_TARGETING_SCANNED_PER_RETURNED": queryTargetingScannedPerReturned, | ||
"TICKETS_AVAILABLE_READS": ticketsAvailableReads, | ||
"TICKETS_AVAILABLE_WRITE": ticketsAvailableWrite, | ||
"DISK_PARTITION_IOPS_READ": diskPartitionIopsRead, | ||
"DISK_PARTITION_IOPS_WRITE": diskPartitionIopsWrite, | ||
"DISK_PARTITION_IOPS_TOTAL": diskPartitionIopsTotal, | ||
"DISK_PARTITION_LATENCY_READ": diskPartitionLatencyRead, | ||
"DISK_PARTITION_LATENCY_WRITE": diskPartitionLatencyWrite, | ||
"DISK_PARTITION_SPACE_FREE": diskPartitionSpaceFree, | ||
"DISK_PARTITION_SPACE_PERCENT_FREE": diskPartitionSpacePercentFree, | ||
"DISK_PARTITION_SPACE_USED": diskPartitionSpaceUsed, | ||
"DISK_PARTITION_SPACE_PERCENT_USED": diskPartitionSpacePercentUsed, | ||
"DISK_PARTITION_UTILIZATION": diskPartitionUtilization, | ||
"PROCESS_CPU_USER": processCPUUser, | ||
"PROCESS_CPU_KERNEL": processCPUKernel, | ||
"PROCESS_NORMALIZED_CPU_USER": processNormalizedCPUUser, | ||
"PROCESS_NORMALIZED_CPU_KERNEL": processNormalizedCPUKernel, | ||
"PROCESS_NORMALIZED_CPU_CHILDREN_USER": processNormalizedCPUChildrenUser, | ||
"PROCESS_NORMALIZED_CPU_CHILDREN_KERNEL": processNormalizedCPUChildrenKernel, | ||
"SYSTEM_CPU_USER": systemCPUUser, | ||
"SYSTEM_CPU_KERNEL": systemCPUKernel, | ||
"SYSTEM_CPU_NICE": systemCPUNice, | ||
"SYSTEM_CPU_IOWAIT": systemCPUIowait, | ||
"SYSTEM_CPU_IRQ": systemCPUIrq, | ||
"SYSTEM_CPU_SOFTIRQ": systemCPUSoftirq, | ||
"SYSTEM_CPU_GUEST": systemCPUGuest, | ||
"SYSTEM_CPU_STEAL": systemCPUSteal, | ||
"SYSTEM_NORMALIZED_CPU_USER": systemNormalizedCPUUser, | ||
"SYSTEM_NORMALIZED_CPU_KERNEL": systemNormalizedCPUKernel, | ||
"SYSTEM_NORMALIZED_CPU_NICE": systemNormalizedCPUNice, | ||
"SYSTEM_NORMALIZED_CPU_IOWAIT": systemNormalizedCPUIowait, | ||
"SYSTEM_NORMALIZED_CPU_IRQ": systemNormalizedCPUIrq, | ||
"SYSTEM_NORMALIZED_CPU_SOFTIRQ": systemNormalizedCPUSoftirq, | ||
"SYSTEM_NORMALIZED_CPU_GUEST": systemNormalizedCPUGuest, | ||
"SYSTEM_NORMALIZED_CPU_STEAL": systemNormalizedCPUSteal, | ||
"OPERATIONS_SCAN_AND_ORDER": operationsScanAndOrder, | ||
"BACKGROUND_FLUSH_AVG": backgroundFlushAvg, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package measurements | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/mongodb/go-client-mongodb-atlas/mongodbatlas" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// Process is the MongoDB Process identified by the host and port on which the Process is running. | ||
type Process struct { | ||
ID string // The hostname and port of the Atlas MongoDB process in hostname:port format. | ||
ProjectID string // A string value that uniquely identifies a Atlas project. | ||
Host string // Name of the host in which the MongoDB Process is running. | ||
Port int // Port number on which the MongoDB Process is running. | ||
ShardName string // Name of the shard this process belongs to. Only present if this process is part of a sharded cluster. | ||
ReplicaSetName string // Name of the replica set this process belongs to. Only present if this process is part of a replica set. | ||
TypeName string // Type for this Atlas MongoDB process. | ||
} | ||
|
||
// nextPage gets the next page for pagination request. | ||
func nextPage(resp *mongodbatlas.Response) (bool, int) { | ||
if resp == nil || len(resp.Links) == 0 || resp.IsLastPage() { | ||
return false, -1 | ||
} | ||
|
||
currentPage, err := resp.CurrentPage() | ||
|
||
if err != nil { | ||
log.WithError(err).Error("failed to get the next page") | ||
return false, -1 | ||
} | ||
|
||
return true, currentPage + 1 | ||
} | ||
|
||
func errorMsg(err error, resp *mongodbatlas.Response) (string, error) { | ||
if err != nil { | ||
return "request for getting %s failed (Atlas project: %s, host: %s, port: %d)", err | ||
} | ||
|
||
if resp == nil { | ||
return "response for getting %s returned empty (Atlas project: %s, host: %s, port: %d)", fmt.Errorf("empty response") | ||
} | ||
|
||
if err := mongodbatlas.CheckResponse(resp.Response); err != nil { | ||
return "response for getting %s returned error (Atlas project: %s, host: %s, port: %d)", err | ||
} | ||
|
||
return "", nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
package measurements | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/mongodb/go-client-mongodb-atlas/mongodbatlas" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// DisksMeasurements are the metric measurements of a particular disk partition in a MongoDB process host. | ||
type DisksMeasurements map[Process]struct { | ||
PartitionName string | ||
Measurements []*mongodbatlas.Measurements | ||
} | ||
|
||
// DisksGetter is for fetching metric measurements of disk partitions in the MongoDB processes hosts. | ||
type DisksGetter interface { | ||
GetMeasurements(ctx context.Context, timeout time.Duration, processes []Process) DisksMeasurements | ||
} | ||
|
||
// disksGetter implements DisksGetter | ||
type disksGetter struct { | ||
projectID string | ||
granularity string | ||
period string | ||
client *mongodbatlas.Client | ||
enableCache bool | ||
mutex *sync.Mutex | ||
measurementsCache *atomic.Value | ||
disksCache *atomic.Value | ||
} | ||
|
||
// NewProcessesGetter returns a new ProcessesGetter. | ||
func NewDisksGetter(projectID string, granularity string, period string, client *mongodbatlas.Client, enableCache bool) DisksGetter { | ||
return &disksGetter{ | ||
projectID: projectID, | ||
granularity: granularity, | ||
period: period, | ||
client: client, | ||
enableCache: enableCache, | ||
mutex: new(sync.Mutex), | ||
measurementsCache: new(atomic.Value), | ||
disksCache: new(atomic.Value), | ||
} | ||
} | ||
|
||
// GetMeasurements gets metric measurements of disk partitions in the hosts of the given MongoDB processes. | ||
func (getter *disksGetter) GetMeasurements(ctx context.Context, timeout time.Duration, processes []Process) DisksMeasurements { | ||
var measurements = make(DisksMeasurements) | ||
|
||
partitions := getter.getPartitions(ctx, timeout, processes) | ||
|
||
var wg1 sync.WaitGroup | ||
|
||
wg1.Add(1) | ||
|
||
go func() { | ||
defer wg1.Done() | ||
|
||
var wg2 sync.WaitGroup | ||
for process, partitionNames := range partitions { | ||
for _, partitionName := range partitionNames { | ||
wg2.Add(1) | ||
|
||
go func(process Process, partitionName string) { | ||
defer wg2.Done() | ||
|
||
var ctx, cancel = context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
|
||
getter.setMeasurements(ctx, measurements, process, partitionName, 1) | ||
}(process, partitionName) | ||
} | ||
} | ||
wg2.Wait() | ||
|
||
if getter.enableCache { | ||
getter.measurementsCache.Store(measurements) | ||
} | ||
}() | ||
|
||
if getter.measurementsCache.Load() != nil && getter.enableCache { | ||
return getter.measurementsCache.Load().(DisksMeasurements) | ||
} | ||
|
||
wg1.Wait() | ||
|
||
return measurements | ||
} | ||
|
||
// getPartitions is a helper function for fetching the names of disk partitions is the hosts of given MongoDB processes. | ||
func (getter *disksGetter) getPartitions(ctx context.Context, timeout time.Duration, processes []Process) map[Process][]string { | ||
var partitions = make(map[Process][]string) | ||
|
||
var wg1 sync.WaitGroup | ||
|
||
wg1.Add(1) | ||
|
||
go func() { | ||
defer wg1.Done() | ||
|
||
var wg2 sync.WaitGroup | ||
for _, process := range processes { | ||
wg2.Add(1) | ||
|
||
go func(process Process) { | ||
defer wg2.Done() | ||
|
||
var ctx, cancel = context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
|
||
partitionNames := getter.getPartitionNames(ctx, process, 1) | ||
|
||
getter.mutex.Lock() | ||
defer getter.mutex.Unlock() | ||
partitions[process] = partitionNames | ||
}(process) | ||
} | ||
wg2.Wait() | ||
|
||
if getter.enableCache { | ||
getter.disksCache.Store(partitions) | ||
} | ||
}() | ||
|
||
if getter.disksCache.Load() != nil && getter.enableCache { | ||
return getter.disksCache.Load().(map[Process][]string) | ||
} | ||
|
||
wg1.Wait() | ||
|
||
return partitions | ||
} | ||
|
||
// getPartitionNames is a helper function of function getPartitions. | ||
func (getter *disksGetter) getPartitionNames(ctx context.Context, process Process, page int) (names []string) { | ||
list, resp, err := getter.client.ProcessDisks.List(ctx, getter.projectID, process.Host, process.Port, &mongodbatlas.ListOptions{PageNum: page}) | ||
|
||
if msg, err := errorMsg(err, resp); err != nil { | ||
log.WithError(err).Errorf(msg, "disk partition names", getter.projectID, process.Host, process.Port) | ||
return names | ||
} | ||
|
||
if ok, next := nextPage(resp); ok { | ||
names = append(names, getter.getPartitionNames(ctx, process, next)...) | ||
} | ||
|
||
for _, r := range list.Results { | ||
names = append(names, r.PartitionName) | ||
} | ||
|
||
return names | ||
} | ||
|
||
// setMeasurements is a helper function of method GetMeasurements. | ||
func (getter *disksGetter) setMeasurements(ctx context.Context, disksMeasurements DisksMeasurements, process Process, partitionName string, pageNum int) { | ||
var opts = &mongodbatlas.ProcessMeasurementListOptions{ListOptions: &mongodbatlas.ListOptions{PageNum: pageNum}, Granularity: getter.granularity, Period: getter.period} | ||
|
||
list, resp, err := getter.client.ProcessDiskMeasurements.List(ctx, getter.projectID, process.Host, process.Port, partitionName, opts) | ||
|
||
if msg, err := errorMsg(err, resp); err != nil { | ||
log.WithError(err).Errorf(msg, "disk measurements", getter.projectID, process.Host, process.Port) | ||
return | ||
} | ||
|
||
if ok, next := nextPage(resp); ok { | ||
getter.setMeasurements(ctx, disksMeasurements, process, partitionName, next) | ||
} | ||
|
||
getter.mutex.Lock() | ||
defer getter.mutex.Unlock() | ||
|
||
disksMeasurements[process] = struct { | ||
PartitionName string | ||
Measurements []*mongodbatlas.Measurements | ||
}{PartitionName: partitionName, Measurements: list.Measurements} | ||
} |
Oops, something went wrong.