diff --git a/server/cgmon.go b/pkg/cgroup/cgmon.go similarity index 55% rename from server/cgmon.go rename to pkg/cgroup/cgmon.go index 5debd85495d..407e50f50c7 100644 --- a/server/cgmon.go +++ b/pkg/cgroup/cgmon.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package cgroup import ( "context" @@ -24,7 +24,6 @@ import ( "github.com/pingcap/log" "github.com/shirou/gopsutil/v3/mem" bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/cgroup" "go.uber.org/zap" ) @@ -32,7 +31,8 @@ const ( refreshInterval = 10 * time.Second ) -type cgroupMonitor struct { +// Monitor is used to monitor the cgroup. +type Monitor struct { started bool ctx context.Context cancel context.CancelFunc @@ -42,70 +42,85 @@ type cgroupMonitor struct { lastMemoryLimit uint64 } -// StartCgroupMonitor uses to start the cgroup monitoring. +// StartMonitor uses to start the cgroup monitoring. // WARN: this function is not thread-safe. -func (cgmon *cgroupMonitor) startCgroupMonitor(ctx context.Context) { - if cgmon.started { +func (m *Monitor) StartMonitor(ctx context.Context) { + if m.started { return } - cgmon.started = true - // Get configured maxprocs. - cgmon.cfgMaxProcs = runtime.GOMAXPROCS(0) - cgmon.ctx, cgmon.cancel = context.WithCancel(ctx) - cgmon.wg.Add(1) - go cgmon.refreshCgroupLoop() + m.started = true + if runtime.GOOS != "linux" { + return + } + m.ctx, m.cancel = context.WithCancel(ctx) + m.wg.Add(1) + go m.refreshCgroupLoop() log.Info("cgroup monitor started") } -// StopCgroupMonitor uses to stop the cgroup monitoring. +// StopMonitor uses to stop the cgroup monitoring. // WARN: this function is not thread-safe. -func (cgmon *cgroupMonitor) stopCgroupMonitor() { - if !cgmon.started { +func (m *Monitor) StopMonitor() { + if !m.started { + return + } + if runtime.GOOS != "linux" { return } - cgmon.started = false - if cgmon.cancel != nil { - cgmon.cancel() + m.started = false + if m.cancel != nil { + m.cancel() } - cgmon.wg.Wait() + m.wg.Wait() log.Info("cgroup monitor stopped") } -func (cgmon *cgroupMonitor) refreshCgroupLoop() { +func (m *Monitor) refreshCgroupLoop() { ticker := time.NewTicker(refreshInterval) defer func() { if r := recover(); r != nil { log.Error("[pd] panic in the recoverable goroutine", - zap.String("funcInfo", "refreshCgroupLoop"), + zap.String("func-info", "refreshCgroupLoop"), zap.Reflect("r", r), zap.Stack("stack")) } - cgmon.wg.Done() + m.wg.Done() ticker.Stop() }() - cgmon.refreshCgroupCPU() - cgmon.refreshCgroupMemory() + err := m.refreshCgroupCPU() + if err != nil { + log.Warn("failed to get cgroup memory limit", zap.Error(err)) + } + err = m.refreshCgroupMemory() + if err != nil { + log.Warn("failed to get cgroup memory limit", zap.Error(err)) + } for { select { - case <-cgmon.ctx.Done(): + case <-m.ctx.Done(): return case <-ticker.C: - cgmon.refreshCgroupCPU() - cgmon.refreshCgroupMemory() + err = m.refreshCgroupCPU() + if err != nil { + log.Debug("failed to get cgroup cpu quota", zap.Error(err)) + } + err = m.refreshCgroupMemory() + if err != nil { + log.Debug("failed to get cgroup memory limit", zap.Error(err)) + } } } } -func (cgmon *cgroupMonitor) refreshCgroupCPU() { +func (m *Monitor) refreshCgroupCPU() error { // Get the number of CPUs. quota := runtime.NumCPU() // Get CPU quota from cgroup. - cpuPeriod, cpuQuota, err := cgroup.GetCPUPeriodAndQuota() + cpuPeriod, cpuQuota, err := GetCPUPeriodAndQuota() if err != nil { - log.Warn("failed to get cgroup cpu quota", zap.Error(err)) - return + return err } if cpuPeriod > 0 && cpuQuota > 0 { ratio := float64(cpuQuota) / float64(cpuPeriod) @@ -114,35 +129,34 @@ func (cgmon *cgroupMonitor) refreshCgroupCPU() { } } - if quota != cgmon.lastMaxProcs && quota < cgmon.cfgMaxProcs { - runtime.GOMAXPROCS(quota) + if quota != m.lastMaxProcs { log.Info("set the maxprocs", zap.Int("quota", quota)) bs.ServerMaxProcsGauge.Set(float64(quota)) - cgmon.lastMaxProcs = quota - } else if cgmon.lastMaxProcs == 0 { - log.Info("set the maxprocs", zap.Int("cfgMaxProcs", cgmon.cfgMaxProcs)) - bs.ServerMaxProcsGauge.Set(float64(cgmon.cfgMaxProcs)) - cgmon.lastMaxProcs = cgmon.cfgMaxProcs + m.lastMaxProcs = quota + } else if m.lastMaxProcs == 0 { + log.Info("set the maxprocs", zap.Int("maxprocs", m.cfgMaxProcs)) + bs.ServerMaxProcsGauge.Set(float64(m.cfgMaxProcs)) + m.lastMaxProcs = m.cfgMaxProcs } + return nil } -func (cgmon *cgroupMonitor) refreshCgroupMemory() { - memLimit, err := cgroup.GetMemoryLimit() +func (m *Monitor) refreshCgroupMemory() error { + memLimit, err := GetMemoryLimit() if err != nil { - log.Warn("failed to get cgroup memory limit", zap.Error(err)) - return + return err } vmem, err := mem.VirtualMemory() if err != nil { - log.Warn("failed to get system memory size", zap.Error(err)) - return + return err } if memLimit > vmem.Total { memLimit = vmem.Total } - if memLimit != cgmon.lastMemoryLimit { - log.Info("set the memory limit", zap.Uint64("memLimit", memLimit)) + if memLimit != m.lastMemoryLimit { + log.Info("set the memory limit", zap.Uint64("mem-limit", memLimit)) bs.ServerMemoryLimit.Set(float64(memLimit)) - cgmon.lastMemoryLimit = memLimit + m.lastMemoryLimit = memLimit } + return nil } diff --git a/pkg/cgroup/cgroup_cpu_linux.go b/pkg/cgroup/cgroup_cpu_linux.go index fa7a8e84efa..5dc9ea4c285 100644 --- a/pkg/cgroup/cgroup_cpu_linux.go +++ b/pkg/cgroup/cgroup_cpu_linux.go @@ -25,9 +25,9 @@ import ( // GetCgroupCPU returns the CPU usage and quota for the current cgroup. func GetCgroupCPU() (CPUUsage, error) { - cpuusage, err := getCgroupCPUHelper("/") - cpuusage.NumCPU = runtime.NumCPU() - return cpuusage, err + cpuUsage, err := getCgroupCPUHelper("/") + cpuUsage.NumCPU = runtime.NumCPU() + return cpuUsage, err } // CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process diff --git a/server/server.go b/server/server.go index 68c69b38e53..5e8d3dea03b 100644 --- a/server/server.go +++ b/server/server.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/sysutil" "github.com/tikv/pd/pkg/audit" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/cgroup" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" @@ -237,7 +238,7 @@ type Server struct { schedulingPrimaryWatcher *etcdutil.LoopWatcher // Cgroup Monitor - cgmon cgroupMonitor + cgMonitor cgroup.Monitor } // HandlerBuilder builds a server HTTP handler. @@ -545,7 +546,7 @@ func (s *Server) Close() { log.Info("closing server") - s.cgmon.stopCgroupMonitor() + s.cgMonitor.StopMonitor() s.stopServerLoop() if s.IsAPIServiceMode() { @@ -622,7 +623,7 @@ func (s *Server) Run() error { return err } - s.cgmon.startCgroupMonitor(s.ctx) + s.cgMonitor.StartMonitor(s.ctx) failpoint.Inject("delayStartServerLoop", func() { time.Sleep(2 * time.Second)