From ff3b738d3b86f5f8b2dd5807a659dd21041beb99 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 3 May 2021 11:34:33 -0400 Subject: [PATCH] Add option to record websocket received time --- awxkit/awxkit/ws.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/awxkit/awxkit/ws.py b/awxkit/awxkit/ws.py index 41380d406e12..d56fccf71982 100644 --- a/awxkit/awxkit/ws.py +++ b/awxkit/awxkit/ws.py @@ -3,6 +3,7 @@ import atexit import json import ssl +import datetime from queue import Queue, Empty from urllib.parse import urlparse @@ -50,7 +51,7 @@ class WSClient(object): # Subscription group types - def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None): + def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None, add_received_time=False): # delay this import, because this is an optional dependency import websocket @@ -90,6 +91,7 @@ def __init__(self, token=None, hostname='', port=443, secure=True, session_id=No self._message_cache = [] self._should_subscribe_to_pending_job = False self._pending_unsubscribe = threading.Event() + self._add_received_time = add_received_time def connect(self): wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE})) @@ -195,6 +197,8 @@ def unsubscribe(self, wait=True, timeout=10): def _on_message(self, message): message = json.loads(message) log.debug('received message: {}'.format(message)) + if self._add_received_time: + message['received_time'] = datetime.datetime.utcnow() if all([message.get('group_name') == 'jobs', message.get('status') == 'pending', message.get('unified_job_id'), self._should_subscribe_to_pending_job]): if bool(message.get('project_id')) == (self._should_subscribe_to_pending_job['events'] == 'project_update_events'):