Skip to content

Commit

Permalink
Merge pull request #4039 from whisperity/feat/store/timeout-message
Browse files Browse the repository at this point in the history
feat(store): Explicitly time the client out if the connection hung
  • Loading branch information
dkrupp authored Oct 13, 2023
2 parents 72bfd1a + 15af7d8 commit 0a3cc8b
Showing 1 changed file with 118 additions and 7 deletions.
125 changes: 118 additions & 7 deletions web/client/codechecker_client/cmd/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
import hashlib
import json
import os
import signal
import sys
import tempfile
import zipfile
import zlib

from collections import defaultdict, namedtuple
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager
from datetime import timedelta
from threading import Timer
from typing import Dict, Iterable, List, Set, Tuple

from codechecker_api.codeCheckerDBAccess_v6.ttypes import StoreLimitKind
Expand Down Expand Up @@ -713,6 +717,70 @@ def storing_analysis_statistics(client, inputs, run_name):
os.remove(zip_file)


class WatchdogError(Exception):
"""
Custom exception type thrown by '_timeout_watchdog'. This is used instead
of the built-in 'TimeoutError' because TimeoutError <: OSError and other
handlers in the call chain during a timeout might catch the more generic
OSError instead, resulting in not running the appropriate handler.
"""
def __init__(self, timeout: timedelta, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = timeout


@contextmanager
def _timeout_watchdog(timeout: timedelta, trap: int):
"""
Sets up a context with a 'threading.Timer' which will signal 'trap' the
current process if 'timeout' passes. If and only if this 'trap' signal
arrives, a 'WatchdogError' is thrown for appropriate handling outside the
context.
The timer is started immediately as this function returns the context.
This is done in order to prevent the current process from hanging
indefinitely in case the stored data is too big and the server spends more
time processing it that what the network connection would stay alive for.
The fact that this works is **NOT GUARANTEED**! In some cases, based on
likely the kernel version, its implementation, and the exact network
topology, the client may enter an *UNINTERRUPTIBLE* sleep ('D') at which
point signals *MAY* not be accurately received. However, manual testing
revealbed that resurrection of hung clients is possible even if the process
is shown as "uninterruptible" by simply killing it from the outside.
This is a **TEMPORARY** measure to alleviate hangs until the proper
"pollable asynchronous store" feature is implemented, see:
http://github.com/Ericsson/codechecker/issues/3672
"""
def _signal_handler(sig: int, frame):
if sig == trap:
signal.signal(trap, signal.SIG_DFL)
raise WatchdogError(timeout,
"Timeout watchdog hit %d seconds (%s)"
% (timeout.total_seconds(), str(timeout)))

pid = os.getpid()
timer = Timer(timeout.total_seconds(),
lambda: os.kill(pid, trap))
LOG.debug("Set up timer for %d seconds (%s) for PID %d",
timeout.total_seconds(), str(timeout), pid)
try:
signal.signal(trap, _signal_handler)
timer.start()
yield timer
except Exception as e:
LOG.debug("External exception within timeout-sensing context: %s",
str(e))
raise e
finally:
timer.cancel()
signal.signal(trap, signal.SIG_DFL)
LOG.debug("Timeout timer of %d seconds (%s) for PID %d stopped.",
timeout.total_seconds(), str(timeout), pid)


def main(args):
"""
Store the defect results in the specified input list as bug reports in the
Expand Down Expand Up @@ -798,13 +866,56 @@ def main(args):

LOG.info("Storing results to the server...")

client.massStoreRun(args.name,
args.tag if 'tag' in args else None,
str(context.version),
b64zip,
'force' in args,
trim_path_prefixes,
description)
try:
with _timeout_watchdog(timedelta(hours=1),
signal.SIGUSR1):
client.massStoreRun(args.name,
args.tag if 'tag' in args else None,
str(context.version),
b64zip,
'force' in args,
trim_path_prefixes,
description)
except WatchdogError as we:
LOG.warning("%s", str(we))

# Showing parts of the exception stack is important here.
# We **WANT** to see that the timeout happened during a wait on
# Thrift reading from the TCP connection (something deep in the
# Python library code at "sock.recv_into").
import traceback
_, _, tb = sys.exc_info()
frames = traceback.extract_tb(tb)
first, last = frames[0], frames[-2]
formatted_frames = traceback.format_list([first, last])
fmt_first, fmt_last = formatted_frames[0], formatted_frames[1]
LOG.info("Timeout was triggered during:\n%s", fmt_first)
LOG.info("Timeout interrupted this low-level operation:\n%s",
fmt_last)

LOG.error("Timeout!"
"\n\tThe server's reply did not arrive after "
"%d seconds (%s) elapsed since the server-side "
"processing began."
"\n\n\tThis does *NOT* mean that there was an issue "
"with the run you were storing!"
"\n\tThe server might still be processing the results..."
"\n\tHowever, it is more likely that the "
"server had already finished, but the client did not "
"receive a response."
"\n\tUsually, this is caused by the underlying TCP "
"connection failing to signal a low-level disconnect."
"\n\tClients potentially hanging indefinitely in these "
"scenarios is an unfortunate and known issue."
"\n\t\tSee http://github.com/Ericsson/codechecker/"
"issues/3672 for details!"
"\n\n\tThis error here is a temporary measure to ensure "
"an infinite hang is replaced with a well-explained "
"timeout."
"\n\tA more proper solution will be implemented in a "
"subsequent version of CodeChecker.",
we.timeout.total_seconds(), str(we.timeout))
sys.exit(1)

# Storing analysis statistics if the server allows them.
if client.allowsStoringAnalysisStatistics():
Expand Down

0 comments on commit 0a3cc8b

Please sign in to comment.