Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay update of artifacts until final job save #11832

Merged
merged 3 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# AWX
from awx.main.redact import UriCleaner
from awx.main.constants import MINIMAL_EVENTS
from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE
from awx.main.utils.update_model import update_model
from awx.main.queue import CallbackQueueDispatcher

Expand All @@ -33,6 +33,7 @@ def __init__(self, model=None):
self.model = model
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
self.wrapup_event_dispatched = False
self.extra_update_fields = {}

def update_model(self, pk, _attempt=0, **updates):
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
Expand All @@ -45,6 +46,26 @@ def wrapup_event_type(self):
def event_data_key(self):
return self.instance.event_class.JOB_REFERENCE

def delay_update(self, skip_if_already_set=False, **kwargs):
"""Stash fields that should be saved along with the job status change"""
for key, value in kwargs.items():
if key in self.extra_update_fields and skip_if_already_set:
continue
elif key in self.extra_update_fields and key in ('job_explanation', 'result_traceback'):
if str(value) in self.extra_update_fields.get(key, ''):
continue # if already set, avoid duplicating messages
# In the case of these fields, we do not want to lose any prior information, so combine values
self.extra_update_fields[key] = '\n'.join([str(self.extra_update_fields[key]), str(value)])
else:
self.extra_update_fields[key] = value

def get_delayed_update_fields(self):
"""Return finalized dict of all fields that should be saved along with the job status change"""
self.extra_update_fields['emitted_events'] = self.event_ct
if 'got an unexpected keyword argument' in self.extra_update_fields.get('result_traceback', ''):
self.delay_update(result_traceback=ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
return self.extra_update_fields

def event_handler(self, event_data):
#
# ⚠️ D-D-D-DANGER ZONE ⚠️
Expand Down Expand Up @@ -149,8 +170,7 @@ def event_handler(self, event_data):
Handle artifacts
'''
if event_data.get('event_data', {}).get('artifact_data', {}):
self.instance.artifacts = event_data['event_data']['artifact_data']
self.instance.save(update_fields=['artifacts'])
self.delay_update(artifacts=event_data['event_data']['artifact_data'])

return False

Expand Down Expand Up @@ -218,10 +238,7 @@ def status_handler(self, status_data, runner_config):
elif status_data['status'] == 'error':
result_traceback = status_data.get('result_traceback', None)
if result_traceback:
from awx.main.signals import disable_activity_stream # Circular import

with disable_activity_stream():
self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback)
self.delay_update(result_traceback=result_traceback)


class RunnerCallbackForProjectUpdate(RunnerCallback):
Expand Down
22 changes: 6 additions & 16 deletions awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
JOB_FOLDER_PREFIX,
MAX_ISOLATED_PATH_COLON_DELIMITER,
CONTAINER_VOLUMES_MOUNT_TYPES,
ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE,
)
from awx.main.models import (
Instance,
Expand Down Expand Up @@ -411,7 +410,6 @@ def run(self, pk, **kwargs):
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
self.instance.websocket_emit_status("running")
status, rc = 'error', None
extra_update_fields = {}
fact_modification_times = {}
self.runner_callback.event_ct = 0

Expand Down Expand Up @@ -546,18 +544,14 @@ def run(self, pk, **kwargs):
rc = res.rc

if status in ('timeout', 'error'):
job_explanation = f"Job terminated due to {status}"
self.instance.job_explanation = self.instance.job_explanation or job_explanation
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation=f"Job terminated due to {status}")
if status == 'timeout':
status = 'failed'

extra_update_fields['job_explanation'] = self.instance.job_explanation

except ReceptorNodeNotFound as exc:
extra_update_fields['job_explanation'] = str(exc)
self.runner_callback.delay_update(job_explanation=str(exc))
except Exception:
# this could catch programming or file system errors
extra_update_fields['result_traceback'] = traceback.format_exc()
self.runner_callback.delay_update(result_traceback=traceback.format_exc())
logger.exception('%s Exception occurred while running task', self.instance.log_format)
finally:
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct)
Expand All @@ -567,18 +561,14 @@ def run(self, pk, **kwargs):
except PostRunError as exc:
if status == 'successful':
status = exc.status
extra_update_fields['job_explanation'] = exc.args[0]
self.runner_callback.delay_update(job_explanation=exc.args[0])
if exc.tb:
extra_update_fields['result_traceback'] = exc.tb
self.runner_callback.delay_update(result_traceback=exc.tb)
except Exception:
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))

# We really shouldn't get into this one but just in case....
if 'got an unexpected keyword argument' in extra_update_fields.get('result_traceback', ''):
extra_update_fields['result_traceback'] = "{}\n\n{}".format(extra_update_fields['result_traceback'], ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)

self.instance = self.update_model(pk)
self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, select_for_update=True, **extra_update_fields)
self.instance = self.update_model(pk, status=status, select_for_update=True, **self.runner_callback.get_delayed_update_fields())

# Field host_status_counts is used as a metric to check if event processing is finished
# we send notifications if it is, if not, callback receiver will send them
Expand Down
43 changes: 18 additions & 25 deletions awx/main/tasks/receptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
parse_yaml_or_json,
cleanup_new_process,
)
from awx.main.constants import (
MAX_ISOLATED_PATH_COLON_DELIMITER,
ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE,
)
from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER

# Receptorctl
from receptorctl.socket_interface import ReceptorControl
Expand Down Expand Up @@ -350,6 +347,11 @@ def _run_internal(self, receptor_ctl):
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
elif res.status == 'error':
# If ansible-runner ran, but an error occured at runtime, the traceback information
# is saved via the status_handler passed in to the processor.
if 'result_traceback' in self.task.runner_callback.extra_update_fields:
return res

try:
unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}')
detail = unit_status.get('Detail', None)
Expand All @@ -365,28 +367,19 @@ def _run_internal(self, receptor_ctl):
logger.warning(f"Could not launch pod for {log_name}. Exceeded quota.")
self.task.update_model(self.task.instance.pk, status='pending')
return
# If ansible-runner ran, but an error occured at runtime, the traceback information
# is saved via the status_handler passed in to the processor.
if state_name == 'Succeeded':
return res
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an in-congruence here. Yes, if the status_handler received this information the receptor output should be this... but it's an imperfect metric. These two things are associated, but other events could cause one to be true but not the other.

so I forced it by doing this

diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py
index ab27ce9f36..fd720ac985 100644
--- a/awx/main/tasks/callback.py
+++ b/awx/main/tasks/callback.py
@@ -220,6 +220,8 @@ class RunnerCallback:
             result_traceback = status_data.get('result_traceback', None)
             if result_traceback:
                 self.delay_update(result_traceback=result_traceback)
+        elif status_data['status'] == 'successful':
+            self.delay_update(result_traceback='alan set this!')
 
 
 class RunnerCallbackForProjectUpdate(RunnerCallback):

And this works as I expected. The new if condition is accurate. I like this also because it avoids running additional receptor commands when they are not necessary. That's something the if here was accomplishing before, and I didn't want to lose that.


if not self.task.instance.result_traceback:
try:
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
lines = resultsock.readlines()
receptor_output = b"".join(lines).decode()
if receptor_output:
self.task.instance.result_traceback = receptor_output
if 'got an unexpected keyword argument' in receptor_output:
self.task.instance.result_traceback = "{}\n\n{}".format(receptor_output, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
self.task.instance.save(update_fields=['result_traceback'])
elif detail:
self.task.instance.result_traceback = detail
self.task.instance.save(update_fields=['result_traceback'])
else:
logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}')
except Exception:
raise RuntimeError(detail)
try:
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
lines = resultsock.readlines()
receptor_output = b"".join(lines).decode()
if receptor_output:
self.task.runner_callback.delay_update(result_traceback=receptor_output)
elif detail:
self.task.runner_callback.delay_update(result_traceback=detail)
else:
logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}')
except Exception:
raise RuntimeError(detail)

return res

Expand Down
52 changes: 52 additions & 0 deletions awx/main/tests/unit/tasks/test_runner_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from awx.main.tasks.callback import RunnerCallback
from awx.main.constants import ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE

from django.utils.translation import ugettext_lazy as _


def test_delay_update(mock_me):
rc = RunnerCallback()
rc.delay_update(foo='bar')
assert rc.extra_update_fields == {'foo': 'bar'}
rc.delay_update(foo='foobar')
assert rc.extra_update_fields == {'foo': 'foobar'}
rc.delay_update(bar='foo')
assert rc.get_delayed_update_fields() == {'foo': 'foobar', 'bar': 'foo', 'emitted_events': 0}


def test_delay_update_skip_if_set(mock_me):
rc = RunnerCallback()
rc.delay_update(foo='bar', skip_if_already_set=True)
assert rc.extra_update_fields == {'foo': 'bar'}
rc.delay_update(foo='foobar', skip_if_already_set=True)
assert rc.extra_update_fields == {'foo': 'bar'}


def test_delay_update_failure_fields(mock_me):
rc = RunnerCallback()
rc.delay_update(job_explanation='1')
rc.delay_update(job_explanation=_('2'))
assert rc.extra_update_fields == {'job_explanation': '1\n2'}
rc.delay_update(result_traceback='1')
rc.delay_update(result_traceback=_('2'))
rc.delay_update(result_traceback=_('3'), skip_if_already_set=True)
assert rc.extra_update_fields == {'job_explanation': '1\n2', 'result_traceback': '1\n2'}


def test_duplicate_updates(mock_me):
rc = RunnerCallback()
rc.delay_update(job_explanation='really long summary...')
rc.delay_update(job_explanation='really long summary...')
rc.delay_update(job_explanation='really long summary...')
assert rc.extra_update_fields == {'job_explanation': 'really long summary...'}


def test_special_ansible_runner_message(mock_me):
rc = RunnerCallback()
rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: foo.py')
rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: bar.py')
assert rc.get_delayed_update_fields().get('result_traceback') == (
'Traceback:\ngot an unexpected keyword argument\nFile: foo.py\n'
'Traceback:\ngot an unexpected keyword argument\nFile: bar.py\n'
f'{ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE}'
)