-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathjobs.py
348 lines (277 loc) · 12.8 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import itertools
import json
import operator
import structlog
from django.db.models import Q
from django.http import Http404
from django.utils import timezone
from rest_framework import serializers
from rest_framework.authentication import SessionAuthentication
from rest_framework.generics import ListAPIView
from rest_framework.response import Response
from rest_framework.views import APIView
from interactive import issues
from interactive.slacks import notify_tech_support_of_failed_analysis
from jobserver.api.authentication import get_backend_from_token
from jobserver.emails import send_finished_notification
from jobserver.github import _get_github_api
from jobserver.models import JobRequest, Stats, User, Workspace
COMPLETED_STATES = {"failed", "succeeded"}
logger = structlog.get_logger(__name__)
class CoercingCharFieldSerializer(serializers.CharField):
def run_validation(self, data=serializers.empty):
(is_empty_value, data) = self.validate_empty_values(data)
# coerce the empty value to an empty string here. Using default=""
# makes the field optional which we don't want, and to_internal_value
# is only called if the field isn't classed as empty, so in the case of
# getting null/None any coercion in that field doesn't fire, so we're
# stuck with overriding this method.
if is_empty_value:
return ""
value = self.to_internal_value(data)
self.run_validators(value)
return value
def update_backend_state(backend, request):
# Record when we last heard from the backend, for availability reporting purposes
Stats.objects.update_or_create(
backend=backend,
url=request.path,
defaults={"api_last_seen": timezone.now()},
)
# Store backend state sent up from job-runner. We might rename the header this is
# passed in at some point but for now this is good enough.
if flags := request.headers.get("Flags", ""):
backend.jobrunner_state = json.loads(flags)
backend.save(update_fields=["jobrunner_state"])
class JobAPIUpdate(APIView):
authentication_classes = [SessionAuthentication]
get_github_api = staticmethod(_get_github_api)
class serializer_class(serializers.Serializer):
job_request_id = serializers.CharField()
identifier = serializers.CharField()
action = serializers.CharField(allow_blank=True)
run_command = CoercingCharFieldSerializer(allow_blank=True, allow_null=True)
status = serializers.CharField()
status_code = serializers.CharField(allow_blank=True)
status_message = serializers.CharField(allow_blank=True)
created_at = serializers.DateTimeField()
updated_at = serializers.DateTimeField(allow_null=True)
started_at = serializers.DateTimeField(allow_null=True)
completed_at = serializers.DateTimeField(allow_null=True)
trace_context = serializers.JSONField(allow_null=True, required=False)
metrics = serializers.JSONField(allow_null=True, required=False)
def initial(self, request, *args, **kwargs):
token = request.headers.get("Authorization")
# require auth for all requests
self.backend = get_backend_from_token(token)
return super().initial(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data, many=True)
serializer.is_valid(raise_exception=True)
# get JobRequest instances based on the identifiers in the payload
incoming_job_request_ids = {
j["job_request_id"] for j in serializer.validated_data
}
job_requests = JobRequest.objects.filter(
identifier__in=incoming_job_request_ids
)
job_request_lut = {jr.identifier: jr for jr in job_requests}
# sort the incoming data by JobRequest identifier to ensure the
# subsequent groupby call works correctly.
job_requests = sorted(
serializer.validated_data, key=operator.itemgetter("job_request_id")
)
# group Jobs by their JobRequest ID
jobs_by_request = itertools.groupby(
serializer.validated_data, key=operator.itemgetter("job_request_id")
)
created_job_ids = []
updated_job_ids = []
for jr_identifier, jobs in jobs_by_request:
jobs = list(jobs)
# get the JobRequest for this identifier
job_request = job_request_lut.get(jr_identifier)
if job_request is None:
# we don't expect this to happen under normal circumstances, but it's
# now no longer a protocol violation for job-runner to tell us about
# JobRequests we didn't ask about, so we shouldn't error here
logger.info(
"Ignoring unrecognised JobRequest", job_request_id=jr_identifier
)
continue
# grab the status before any updates so we can track updates to it
# after saving the incoming changes
initial_status = job_request.status
# bind the job request ID to further logs so looking them up in the UI is easier
structlog.contextvars.bind_contextvars(job_request=job_request.id)
database_jobs = job_request.jobs.all()
# get the current Jobs for the JobRequest, keyed on their identifier
jobs_by_identifier = {j.identifier: j for j in database_jobs}
payload_identifiers = {j["identifier"] for j in jobs}
# delete local jobs not in the payload
identifiers_to_delete = set(jobs_by_identifier.keys()) - payload_identifiers
if identifiers_to_delete:
job_request.jobs.filter(identifier__in=identifiers_to_delete).delete()
for job_data in jobs:
# remove this value from the data, it's going to be set by
# creating/updating Job instances via the JobRequest instances
# related Jobs manager (ie job_request.jobs)
job_data.pop("job_request_id")
job, created = job_request.jobs.get_or_create(
identifier=job_data["identifier"],
defaults={**job_data},
)
if created:
created_job_ids.append(str(job.id))
# For newly created jobs we can't tell if they've just transitioned
# to completed so we assume they have to avoid missing notifications
newly_completed = job_data["status"] in COMPLETED_STATES
else:
updated_job_ids.append(str(job.id))
# check to see if the Job is about to transition to completed
# (failed or succeeded) so we can notify after the update
newly_completed = (
job.status not in COMPLETED_STATES
and job_data["status"] in COMPLETED_STATES
)
# update Job "manually" so we can make the check above for
# status transition
for key, value in job_data.items():
setattr(job, key, value)
job.save()
# round trip the Job to the db so all fields are converted to
# their python representations
job.refresh_from_db()
# We only send notifications or alerts for newly completed jobs
if newly_completed:
handle_job_notifications(job_request, job)
# refresh the JobRequest instance so we can get an updated status
job_request.refresh_from_db()
current_status = job_request.status
if current_status != initial_status and current_status in COMPLETED_STATES:
handle_job_request_notifications(
job_request, current_status, self.get_github_api()
)
logger.info(
"Created or updated Jobs",
created_job_ids=",".join(created_job_ids),
updated_job_ids=",".join(updated_job_ids),
)
update_backend_state(self.backend, request)
return Response({"status": "success"}, status=200)
def handle_job_notifications(job_request, job):
if job_request.will_notify:
send_finished_notification(
job_request.created_by.email,
job,
)
logger.info(
"Notified requesting user of completed job",
user_id=job_request.created_by_id,
)
def handle_job_request_notifications(job_request, status, github_api):
if hasattr(job_request, "analysis_request"):
if status == "succeeded":
issues.create_output_checking_request(job_request, github_api)
if status == "failed":
notify_tech_support_of_failed_analysis(job_request)
class WorkspaceSerializer(serializers.ModelSerializer):
created_by = serializers.CharField(source="created_by.username", default=None)
repo = serializers.CharField(source="repo.url", default=None)
class Meta:
fields = [
"name",
"repo",
"branch",
"created_by",
"created_at",
]
model = Workspace
class JobRequestAPIList(ListAPIView):
authentication_classes = []
class serializer_class(serializers.ModelSerializer):
backend = serializers.CharField(source="backend.slug")
created_by = serializers.CharField(source="created_by.username")
workspace = WorkspaceSerializer()
project = serializers.CharField(source="workspace.project.slug")
orgs = serializers.SerializerMethodField()
class Meta:
fields = [
"backend",
"sha",
"identifier",
"force_run_dependencies",
"requested_actions",
"cancelled_actions",
"created_by",
"created_at",
"workspace",
"database_name",
"project",
"orgs",
"codelists_ok",
]
model = JobRequest
def get_orgs(self, obj):
return list(obj.workspace.project.orgs.values_list("slug", flat=True))
def initial(self, request, *args, **kwargs):
token = request.headers.get("Authorization")
# if there's an Auth token then try to authenticate with that otherwise
# ignore since this endpoint can be used either way.
self.backend = get_backend_from_token(token) if token else None
return super().initial(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
response = super().get(request, *args, **kwargs)
# only update state when authenticated and response is 2xx
if self.backend and response.status_code >= 200 and response.status_code < 300:
update_backend_state(self.backend, request)
return response
def get_queryset(self):
qs = (
JobRequest.objects.filter(
Q(jobs__status__in=["pending", "running"]) | Q(jobs=None),
)
.select_related(
"backend",
"created_by",
"workspace",
"workspace__created_by",
"workspace__project",
"workspace__repo",
)
.prefetch_related("workspace__project__orgs")
.order_by("-created_at")
.distinct()
)
backend_slug = getattr(self.backend, "slug", None)
if backend_slug is not None:
qs = qs.filter(backend__slug=backend_slug)
return qs
class UserAPIDetail(APIView):
authentication_classes = []
def initial(self, request, *args, **kwargs):
token = request.headers.get("Authorization")
# require auth for all requests
self.backend = get_backend_from_token(token)
return super().initial(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
try:
user = User.objects.get(username=self.kwargs["username"])
except User.DoesNotExist:
raise Http404
data = {
"permissions": user.get_all_permissions(),
"roles": user.get_all_roles(),
}
return Response(data, status=200)
class WorkspaceStatusesAPI(APIView):
authentication_classes = [SessionAuthentication]
permission_classes = []
def get(self, request, *args, **kwargs):
try:
workspace = Workspace.objects.get(name=self.kwargs["name"])
except Workspace.DoesNotExist:
return Response(status=404)
backend = request.GET.get("backend", None)
actions_with_status = workspace.get_action_status_lut(backend)
return Response(actions_with_status, status=200)