From 9ff9d33f8cb040b0df961b70e2ffde6c46585dff Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Mon, 15 Mar 2021 10:11:03 +0800 Subject: [PATCH 1/3] [cluster utilization] get the complete list of jobs in 7 days --- .../src/cluster-utilization/send_alert.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/src/alert-manager/src/cluster-utilization/send_alert.py b/src/alert-manager/src/cluster-utilization/send_alert.py index 6c08d41372..c2e438db90 100644 --- a/src/alert-manager/src/cluster-utilization/send_alert.py +++ b/src/alert-manager/src/cluster-utilization/send_alert.py @@ -1,4 +1,4 @@ -from datetime import timezone, datetime +from datetime import timezone, datetime, timedelta import logging import os import requests @@ -14,7 +14,7 @@ ALERT_PREFIX = "/alert-manager/api/v1/alerts" # only the jobs that are running or completed within 7d should be included # currently, we just set the limit to max -REST_JOB_API_PREFIX = "/rest-server/api/v2/jobs?limit=50000" +REST_JOB_API_PREFIX = "/rest-server/api/v2/jobs?order=completionTime,DESC" TOKEN = os.environ.get('PAI_BEARER_TOKEN') PROMETHEUS_SCRAPE_INTERVAL = int(os.environ.get('PROMETHEUS_SCRAPE_INTERVAL')) @@ -49,15 +49,39 @@ def datetime_to_hours(dt): return dt.days * 24 + dt.seconds / 3600 +def check_timestamp_within_7d(timestamp): + """ + check if a timestamp is within 7 days + """ + return datetime.fromtimestamp(int(timestamp/1000), timezone.utc) + timedelta(days=7) < datetime.now(timezone.utc) + + +def get_jobs_in_7d(rest_url): + """ + Returns all jobs within 7 days + """ + jobs_in_7d = [] + + offset = 0 + limit = 5000 + headers = {'Authorization': "Bearer {}".format(TOKEN)} + while True: + resp = requests.get(rest_url+"limit={}&offset={}".format(limit, offset), headers=headers) + resp.raise_for_status() + jobs = resp.json() + jobs_in_7d += jobs + if not jobs or jobs[-1]["completedTime"] is not None and check_timestamp_within_7d(jobs[-1]["completedTime"]) : + break + offset += limit + + return jobs_in_7d + + @enable_request_debug_log def get_usage_info(job_gpu_percent, job_gpu_hours, user_usage_result, rest_url): job_infos = {} user_infos = {} - # get all jobs - headers = {'Authorization': "Bearer {}".format(TOKEN)} - resp = requests.get(rest_url, headers=headers) - resp.raise_for_status() - job_list = resp.json() + job_list = get_jobs_in_7d(rest_url) for v in user_usage_result["data"]["result"]: user_infos[v["metric"]["username"]] = { From 21e8825c04c9e39aa49f148d0477ad11a354e9bf Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Mon, 15 Mar 2021 13:50:09 +0800 Subject: [PATCH 2/3] refine --- .../quick-start/services-configuration.yaml.template | 1 - src/alert-manager/src/cluster-utilization/send_alert.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/contrib/kubespray/quick-start/services-configuration.yaml.template b/contrib/kubespray/quick-start/services-configuration.yaml.template index 27df33f3e4..aa73221c65 100644 --- a/contrib/kubespray/quick-start/services-configuration.yaml.template +++ b/contrib/kubespray/quick-start/services-configuration.yaml.template @@ -263,7 +263,6 @@ authentication: # summary: "{% raw %}{{$labels.job_name}}{% endraw %} has a job gpu percent lower than 30% for 1 hour" # description: Monitor job level gpu utilization in certain virtual clusters. - # uncomment following if you want to change customize grafana # grafana: # port: 3000 diff --git a/src/alert-manager/src/cluster-utilization/send_alert.py b/src/alert-manager/src/cluster-utilization/send_alert.py index c2e438db90..bce1f75d1b 100644 --- a/src/alert-manager/src/cluster-utilization/send_alert.py +++ b/src/alert-manager/src/cluster-utilization/send_alert.py @@ -53,7 +53,7 @@ def check_timestamp_within_7d(timestamp): """ check if a timestamp is within 7 days """ - return datetime.fromtimestamp(int(timestamp/1000), timezone.utc) + timedelta(days=7) < datetime.now(timezone.utc) + return datetime.fromtimestamp(int(timestamp/1000), timezone.utc) > datetime.now(timezone.utc) - timedelta(days=7) def get_jobs_in_7d(rest_url): @@ -70,7 +70,7 @@ def get_jobs_in_7d(rest_url): resp.raise_for_status() jobs = resp.json() jobs_in_7d += jobs - if not jobs or jobs[-1]["completedTime"] is not None and check_timestamp_within_7d(jobs[-1]["completedTime"]) : + if not jobs or (jobs[-1]["completedTime"] is not None and not check_timestamp_within_7d(jobs[-1]["completedTime"])) : break offset += limit From fc045b1b4ac78d493df1cdf8feb64a09ccd0694a Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Fri, 19 Mar 2021 13:44:53 +0800 Subject: [PATCH 3/3] refine --- .../services-configuration.yaml.template | 1 + .../src/cluster-utilization/send_alert.py | 22 +++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/contrib/kubespray/quick-start/services-configuration.yaml.template b/contrib/kubespray/quick-start/services-configuration.yaml.template index aa73221c65..27df33f3e4 100644 --- a/contrib/kubespray/quick-start/services-configuration.yaml.template +++ b/contrib/kubespray/quick-start/services-configuration.yaml.template @@ -263,6 +263,7 @@ authentication: # summary: "{% raw %}{{$labels.job_name}}{% endraw %} has a job gpu percent lower than 30% for 1 hour" # description: Monitor job level gpu utilization in certain virtual clusters. + # uncomment following if you want to change customize grafana # grafana: # port: 3000 diff --git a/src/alert-manager/src/cluster-utilization/send_alert.py b/src/alert-manager/src/cluster-utilization/send_alert.py index bce1f75d1b..b6ebf14cb9 100644 --- a/src/alert-manager/src/cluster-utilization/send_alert.py +++ b/src/alert-manager/src/cluster-utilization/send_alert.py @@ -56,11 +56,18 @@ def check_timestamp_within_7d(timestamp): return datetime.fromtimestamp(int(timestamp/1000), timezone.utc) > datetime.now(timezone.utc) - timedelta(days=7) -def get_jobs_in_7d(rest_url): +def get_related_jobs(rest_url): """ - Returns all jobs within 7 days + Returns all related jobs + + Returns: + -------- + list + All the jobs completed within 7 days will be included in the list. + Jobs completed before 7 days may also be included. + The list may contain duplicated jobs. """ - jobs_in_7d = [] + jobs_related = [] offset = 0 limit = 5000 @@ -69,19 +76,20 @@ def get_jobs_in_7d(rest_url): resp = requests.get(rest_url+"limit={}&offset={}".format(limit, offset), headers=headers) resp.raise_for_status() jobs = resp.json() - jobs_in_7d += jobs + jobs_related += jobs + # no more jobs or the last job in the list completed before 7 days if not jobs or (jobs[-1]["completedTime"] is not None and not check_timestamp_within_7d(jobs[-1]["completedTime"])) : break offset += limit - return jobs_in_7d + return jobs_related @enable_request_debug_log def get_usage_info(job_gpu_percent, job_gpu_hours, user_usage_result, rest_url): job_infos = {} user_infos = {} - job_list = get_jobs_in_7d(rest_url) + jobs_related = get_related_jobs(rest_url) for v in user_usage_result["data"]["result"]: user_infos[v["metric"]["username"]] = { @@ -92,7 +100,7 @@ def get_usage_info(job_gpu_percent, job_gpu_hours, user_usage_result, rest_url): job_name = v["metric"]["job_name"] matched_job = list( filter(lambda job: "{}~{}".format(job["username"], job["name"]) == job_name, - job_list)) + jobs_related)) # ingore unfounded jobs if not matched_job: logging.warning("Job %s not found.", job_name)