-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcronjob_crud.py
128 lines (108 loc) · 4 KB
/
cronjob_crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import json
import time
from kubernetes import client, config
config.load_kube_config()
def create_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
print(f'{name} exists, please do not repeat!')
else:
v1 = client.BatchV1Api()
ret = v1.create_namespaced_cron_job(namespace=namespace, body=cronjob_json, pretty=True,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'create succeed\n{json.dumps(ret_dict)}')
def delete_namespaced_cron_job(namespace='default', name=None):
if name is None:
print('name is required!')
exit(0)
if not judge_crontab_exists(namespace, name):
print(f"{name} doesn't exists, please enter a new one!")
else:
v1 = client.BatchV1Api()
ret = v1.delete_namespaced_cron_job(name=name, namespace=namespace, _preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'delete succeed\n{json.dumps(ret_dict)}')
def patch_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
v1 = client.BatchV1Api()
ret = v1.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob_json,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'patch succeed\n{json.dumps(ret_dict)}')
else:
print(f"{name} doesn't exists, please enter a new one!")
def get_cronjob_list(namespace='default'):
v1 = client.BatchV1Api()
ret = v1.list_namespaced_cron_job(namespace=namespace, pretty=True, _preload_content=False)
cron_job_list = json.loads(ret.data)
print(f'cronjob number={len(cron_job_list["items"])}')
return cron_job_list["items"]
def judge_crontab_exists(namespace, name):
cron_job_list = get_cronjob_list(namespace)
for cron_job in cron_job_list:
if name == cron_job['metadata']['name']:
return True
return False
def get_cronjob_body(namespace, name, command):
body = {
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"schedule": "*/1 * * * *",
"concurrencyPolicy": "Allow",
"suspend": False,
"jobTemplate": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": name,
"image": "busybox:1.35",
"command": command
}
],
"restartPolicy": "Never"
}
}
}
},
"successfulJobsHistoryLimit": 3,
"failedJobsHistoryLimit": 1
}
}
return body
if __name__ == '__main__':
# get
cronjob_list = get_cronjob_list()
# delete
delete_namespaced_cron_job('default', 'hostname')
time.sleep(2)
# create
container_command = [
"/bin/sh",
"-c",
"date; echo Hello from the Kubernetes cluster; hostname"
]
hostname_json = get_cronjob_body('default', 'hostname', container_command)
create_namespaced_cron_job('default', hostname_json)
# update
container_command[2] = "date; echo this is patch; hostname"
hostname_json = get_cronjob_body('default', 'hostname', container_command)
patch_namespaced_cron_job('default', hostname_json)