-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcircleci.py
112 lines (84 loc) · 2.71 KB
/
circleci.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3
import json
import os
import time
import requests
token = os.environ["CIRCLECI_TOKEN"]
CACHE_KEY = "__cached"
def parse_time(s):
return time.mktime(time.strptime(s, "%Y-%m-%dT%H:%M:%SZ"))
def api_get(
url,
*args,
_version="2",
_cache_name=None,
_cache_filter=None,
headers=None,
**kwargs,
):
fn = os.path.join("cache", _cache_name + ".json") if _cache_name else None
if fn and os.path.exists(fn):
with open(fn) as f:
return {CACHE_KEY: True, **json.load(f)}
if headers is None:
headers = {}
headers["Circle-Token"] = token
while True:
r = requests.get(
f"https://circleci.com/api/v{_version}/{url}", *args, headers=headers, **kwargs
)
if r.ok:
j = r.json()
break
if "Retry-After" in r.headers:
retry_after = int(r.headers["Retry-After"])
if retry_after > 0:
print(f"retrying {url} after {retry_after}")
time.sleep(retry_after)
else:
r.raise_for_status()
if fn and (not _cache_filter or _cache_filter(j)):
with open(fn, "w") as f:
json.dump(j, f)
return j
def api_post(url, *args, _version="2", headers=None, **kwargs):
if headers is None:
headers = {}
headers["Circle-Token"] = token
r = requests.post(
f"https://circleci.com/api/v{_version}/{url}", *args, headers=headers, **kwargs
)
return r.json()
def pipelines(org_slug, page_token=None):
return api_get(
"pipeline", params={"org-slug": org_slug, "page-token": page_token, "mine": "false"}
)
def project_pipelines(slug, branch, page_token=None):
return api_get(
f"project/{slug}/pipeline",
params={"page-token": page_token, "branch": branch},
)
def pipeline(slug, num):
return api_get(f"project/{slug}/pipeline/{num}")
def pipeline_workflows(uuid, page_token=None):
return api_get(
f"pipeline/{uuid}/workflow",
params={"page-token": page_token},
)
def workflow(uuid):
return api_get(
f"workflow/{uuid}",
_cache_name=f"workflow-{uuid}",
_cache_filter=lambda r: all(r["status"] in {"success", "failed", "canceled"}),
)
def workflow_jobs(uuid, page_token=None):
return api_get(
f"workflow/{uuid}/job",
params={"page-token": page_token},
_cache_name=f"workflow_jobs-{uuid}",
_cache_filter=lambda r: all(
j["status"] in {"success", "failed", "canceled"} for j in r["items"]
),
)
def workflow_rerun(uuid, jobs=[], from_failed=False):
return api_post(f"workflow/{uuid}/rerun", json={"jobs": jobs, "from_failed": from_failed})