Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intentionally drop job event websocket messages in excess of 30 per second (configurable) #10053

Merged
merged 20 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions awx/main/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,17 @@
category_slug='jobs',
)

register(
'MAX_WEBSOCKET_EVENT_RATE',
field_class=fields.IntegerField,
min_value=0,
default=30,
label=_('Job Event Maximum Websocket Messages Per Second'),
help_text=_('Maximum number of messages to update the UI live job output with per second. Value of 0 means no limit.'),
category=_('Jobs'),
category_slug='jobs',
)

register(
'SCHEDULE_MAX_JOBS',
field_class=fields.IntegerField,
Expand Down
1 change: 1 addition & 0 deletions awx/main/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
}
CAN_CANCEL = ('new', 'pending', 'waiting', 'running')
ACTIVE_STATES = CAN_CANCEL
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
CENSOR_VALUE = '************'
ENV_BLOCKLIST = frozenset(
(
Expand Down
9 changes: 8 additions & 1 deletion awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def flush(self, force=False):
logger.exception('Database Error Saving Job Event')
duration_to_save = time.perf_counter() - duration_to_save
for e in events:
emit_event_detail(e)
if not getattr(e, '_skip_websocket_message', False):
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
Expand Down Expand Up @@ -207,7 +208,13 @@ def perform_work(self, body):
GuidMiddleware.set_guid('')
return

skip_websocket_message = body.pop('skip_websocket_message', False)

event = cls.create_from_data(**body)

if skip_websocket_message:
event._skip_websocket_message = True

self.buff.setdefault(cls, []).append(event)

retries = 0
Expand Down
4 changes: 1 addition & 3 deletions awx/main/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from awx.main import consumers
from awx.main.managers import DeferJobCreatedManager
from awx.main.fields import JSONField
from awx.main.constants import MINIMAL_EVENTS
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore

Expand Down Expand Up @@ -57,9 +58,6 @@ def create_host_status_counts(event_data):
return dict(host_status_counts)


MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])


def emit_event_detail(event):
if settings.UI_LIVE_UPDATES_ENABLED is False and event.event not in MINIMAL_EVENTS:
return
Expand Down
36 changes: 34 additions & 2 deletions awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# All Rights Reserved.

# Python
from collections import OrderedDict, namedtuple
from collections import OrderedDict, namedtuple, deque
import errno
import functools
import importlib
Expand Down Expand Up @@ -57,7 +57,7 @@

# AWX
from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS
from awx.main.access import access_registry
from awx.main.redact import UriCleaner
from awx.main.models import (
Expand Down Expand Up @@ -740,6 +740,7 @@ def __init__(self):
self.host_map = {}
self.guid = GuidMiddleware.get_guid()
self.job_created = None
self.recent_event_timings = deque(maxlen=settings.MAX_WEBSOCKET_EVENT_RATE)

def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
Expand Down Expand Up @@ -1151,6 +1152,37 @@ def event_handler(self, event_data):
if 'event_data' in event_data:
event_data['event_data']['guid'] = self.guid

# To prevent overwhelming the broadcast queue, skip some websocket messages
if self.recent_event_timings:
cpu_time = time.time()
first_window_time = self.recent_event_timings[0]
last_window_time = self.recent_event_timings[-1]

if event_data.get('event') in MINIMAL_EVENTS:
should_emit = True # always send some types like playbook_on_stats
elif event_data.get('stdout') == '' and event_data['start_line'] == event_data['end_line']:
should_emit = False # exclude events with no output
else:
should_emit = any(
[
# if 30the most recent websocket message was sent over 1 second ago
cpu_time - first_window_time > 1.0,
# if the very last websocket message came in over 1/30 seconds ago
self.recent_event_timings.maxlen * (cpu_time - last_window_time) > 1.0,
# if the queue is not yet full
len(self.recent_event_timings) != self.recent_event_timings.maxlen,
]
)

if should_emit:
self.recent_event_timings.append(cpu_time)
else:
event_data.setdefault('event_data', {})
event_data['skip_websocket_message'] = True

elif self.recent_event_timings.maxlen:
self.recent_event_timings.append(time.time())

event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
self.event_ct += 1
Expand Down
3 changes: 2 additions & 1 deletion awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ def IS_TESTING(argv=None):
# beyond this limit and the value will be removed
MAX_EVENT_RES_DATA = 700000

# Note: This setting may be overridden by database settings.
# Note: These settings may be overridden by database settings.
EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024
MAX_WEBSOCKET_EVENT_RATE = 30

# The amount of time before a stdout file is expired and removed locally
# Note that this can be recreated if the stdout is downloaded
Expand Down
83 changes: 74 additions & 9 deletions awx/ui_next/src/screens/Job/JobOutput/JobOutput.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { HostStatusBar, OutputToolbar } from './shared';
import getRowRangePageSize from './shared/jobOutputUtils';
import { getJobModel, isJobRunning } from '../../../util/jobs';
import useRequest, { useDismissableError } from '../../../util/useRequest';
import useInterval from '../../../util/useInterval';
import {
parseQueryString,
mergeParams,
Expand Down Expand Up @@ -297,8 +298,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
const listRef = useRef(null);
const previousWidth = useRef(0);
const jobSocketCounter = useRef(0);
const interval = useRef(null);
const isMounted = useIsMounted();
const scrollTop = useRef(0);
const scrollHeight = useRef(0);
const history = useHistory();
const [contentError, setContentError] = useState(null);
const [cssMap, setCssMap] = useState({});
Expand All @@ -310,6 +312,17 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
const [showCancelModal, setShowCancelModal] = useState(false);
const [remoteRowCount, setRemoteRowCount] = useState(0);
const [results, setResults] = useState({});
const [isFollowModeEnabled, setIsFollowModeEnabled] = useState(
isJobRunning(job.status)
);
const [isMonitoringWebsocket, setIsMonitoringWebsocket] = useState(false);

useInterval(
() => {
monitorJobSocketCounter();
},
isMonitoringWebsocket ? 5000 : null
);

useEffect(() => {
loadJobEvents();
Expand All @@ -330,14 +343,15 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
}
}
});
interval.current = setInterval(() => monitorJobSocketCounter(), 5000);
setIsMonitoringWebsocket(true);
}

return function cleanup() {
if (ws) {
ws.close();
}
clearInterval(interval.current);
setIsMonitoringWebsocket(false);
isMounted.current = false;
};
}, [location.search]); // eslint-disable-line react-hooks/exhaustive-deps

Expand All @@ -347,6 +361,22 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
}
}, [currentlyLoading, cssMap, remoteRowCount]);

useEffect(() => {
if (jobStatus && !isJobRunning(jobStatus)) {
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
}

if (isMonitoringWebsocket) {
setIsMonitoringWebsocket(false);
}

if (isFollowModeEnabled) {
setTimeout(() => setIsFollowModeEnabled(false), 1000);
}
}
}, [jobStatus]); // eslint-disable-line react-hooks/exhaustive-deps

const {
error: cancelError,
isLoading: isCancelling,
Expand Down Expand Up @@ -381,14 +411,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
} = useDismissableError(deleteError);

const monitorJobSocketCounter = () => {
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
}
if (
jobSocketCounter.current === remoteRowCount &&
!isJobRunning(job.status)
) {
clearInterval(interval.current);
}
if (jobSocketCounter.current > remoteRowCount && isMounted.current) {
setRemoteRowCount(jobSocketCounter.current);
setIsMonitoringWebsocket(false);
}
};

Expand Down Expand Up @@ -492,6 +522,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};

const rowRenderer = ({ index, parent, key, style }) => {
if (listRef.current && isFollowModeEnabled) {
setTimeout(() => scrollToRow(remoteRowCount - 1), 0);
}
let actualLineTextHtml = [];
if (results[index]) {
const { lineTextHtml } = getLineTextHtml(results[index]);
Expand Down Expand Up @@ -584,7 +617,9 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};

const scrollToRow = rowIndex => {
listRef.current.scrollToRow(rowIndex);
if (listRef.current) {
listRef.current.scrollToRow(rowIndex);
}
};

const handleScrollPrevious = () => {
Expand All @@ -604,7 +639,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
};

const handleScrollLast = () => {
scrollToRow(remoteRowCount);
scrollToRow(remoteRowCount - 1);
};

const handleResize = ({ width }) => {
Expand Down Expand Up @@ -657,6 +692,27 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
history.push(qs ? `${pathname}?${qs}` : pathname);
};

const handleFollowToggle = () => {
if (isFollowModeEnabled) {
setIsFollowModeEnabled(false);
} else {
setIsFollowModeEnabled(true);
scrollToRow(remoteRowCount - 1);
}
};

const handleScroll = e => {
if (
isFollowModeEnabled &&
scrollTop.current > e.scrollTop &&
scrollHeight.current === e.scrollHeight
) {
setIsFollowModeEnabled(false);
}
scrollTop.current = e.scrollTop;
scrollHeight.current = e.scrollHeight;
};

const renderSearchComponent = () => (
<Search
qsConfig={QS_CONFIG}
Expand Down Expand Up @@ -763,6 +819,14 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
)}
</ToolbarItem>
</ToolbarToggleGroup>
{isJobRunning(job.status) ? (
<Button
variant={isFollowModeEnabled ? 'secondary' : 'primary'}
onClick={handleFollowToggle}
>
{isFollowModeEnabled ? t`Unfollow` : t`Follow`}
</Button>
) : null}
</SearchToolbarContent>
</SearchToolbar>
<PageControls
Expand Down Expand Up @@ -801,6 +865,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
scrollToAlignment="start"
width={width || 1}
overscanRowCount={20}
onScroll={handleScroll}
/>
)}
</>
Expand Down
6 changes: 5 additions & 1 deletion awxkit/awxkit/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import atexit
import json
import ssl
import datetime

from queue import Queue, Empty
from urllib.parse import urlparse
Expand Down Expand Up @@ -50,7 +51,7 @@ class WSClient(object):

# Subscription group types

def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None):
def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None, add_received_time=False):
# delay this import, because this is an optional dependency
import websocket

Expand Down Expand Up @@ -90,6 +91,7 @@ def __init__(self, token=None, hostname='', port=443, secure=True, session_id=No
self._message_cache = []
self._should_subscribe_to_pending_job = False
self._pending_unsubscribe = threading.Event()
self._add_received_time = add_received_time

def connect(self):
wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE}))
Expand Down Expand Up @@ -195,6 +197,8 @@ def unsubscribe(self, wait=True, timeout=10):
def _on_message(self, message):
message = json.loads(message)
log.debug('received message: {}'.format(message))
if self._add_received_time:
message['received_time'] = datetime.datetime.utcnow()

if all([message.get('group_name') == 'jobs', message.get('status') == 'pending', message.get('unified_job_id'), self._should_subscribe_to_pending_job]):
if bool(message.get('project_id')) == (self._should_subscribe_to_pending_job['events'] == 'project_update_events'):
Expand Down