From b7205ed32aae6531595fef78cf8a5d6101b84881 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 27 May 2022 05:15:34 +0930 Subject: [PATCH] pyln-testing: use files for stdout and stderr, not threads. Some flakes are caused by weird races in this code. Plus, if we get things to write straight to files, we might see things in there on post-mortem which happen after the python runner exits. It's a bit less efficient, but much simpler. Let's see if it helps! Signed-off-by: Rusty Russell --- contrib/pyln-testing/pyln/testing/utils.py | 146 ++++++++------------- tests/test_misc.py | 23 ++-- tests/test_wallet.py | 45 +++---- 3 files changed, 86 insertions(+), 128 deletions(-) diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py index ab0dba2f21ee..57445575efc7 100644 --- a/contrib/pyln-testing/pyln/testing/utils.py +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -174,13 +174,21 @@ class TailableProc(object): tail the processes and react to their output. """ - def __init__(self, outputDir=None, verbose=True): + def __init__(self, outputDir, verbose=True): self.logs = [] - self.logs_cond = threading.Condition(threading.RLock()) self.env = os.environ.copy() self.running = False self.proc = None self.outputDir = outputDir + if not os.path.exists(outputDir): + os.makedirs(outputDir) + # Create and open them. + self.stdout_filename = os.path.join(outputDir, "log") + self.stderr_filename = os.path.join(outputDir, "errlog") + self.stdout_write = open(self.stdout_filename, "wt") + self.stderr_write = open(self.stderr_filename, "wt") + self.stdout_read = open(self.stdout_filename, "rt") + self.stderr_read = open(self.stderr_filename, "rt") self.logsearch_start = 0 self.err_logs = [] self.prefix = "" @@ -192,29 +200,17 @@ def __init__(self, outputDir=None, verbose=True): # pass it to the log matcher and not print it to stdout). self.log_filter = lambda line: False - def start(self, stdin=None, stdout=None, stderr=None): + def start(self, stdin=None): """Start the underlying process and start monitoring it. """ logging.debug("Starting '%s'", " ".join(self.cmd_line)) self.proc = subprocess.Popen(self.cmd_line, stdin=stdin, - stdout=stdout if stdout else subprocess.PIPE, - stderr=stderr, + stdout=self.stdout_write, + stderr=self.stderr_write, env=self.env) - self.thread = threading.Thread(target=self.tail) - self.thread.daemon = True - self.thread.start() - self.running = True - - def save_log(self): - if self.outputDir: - logpath = os.path.join(self.outputDir, 'log') - with open(logpath, 'w') as f: - for l in self.logs: - f.write(l + '\n') def stop(self, timeout=10): - self.save_log() self.proc.terminate() # Now give it some time to react to the signal @@ -224,56 +220,32 @@ def stop(self, timeout=10): self.proc.kill() self.proc.wait() - self.thread.join() - return self.proc.returncode def kill(self): """Kill process without giving it warning.""" self.proc.kill() self.proc.wait() - self.thread.join() - - def tail(self): - """Tail the stdout of the process and remember it. - Stores the lines of output produced by the process in - self.logs and signals that a new line was read so that it can - be picked up by consumers. + def logs_catchup(self): + """Save the latest stdout / stderr contents; return true if we got anything. """ - for line in iter(self.proc.stdout.readline, ''): - if len(line) == 0: - break - - line = line.decode('UTF-8', 'replace').rstrip() - - if self.log_filter(line): - continue - - if self.verbose: - sys.stdout.write("{}: {}\n".format(self.prefix, line)) - - with self.logs_cond: - self.logs.append(line) - self.logs_cond.notifyAll() - - self.running = False - self.proc.stdout.close() - - if self.proc.stderr: - for line in iter(self.proc.stderr.readline, ''): - - if line is None or len(line) == 0: - break - - line = line.rstrip().decode('UTF-8', 'replace') - self.err_logs.append(line) - - self.proc.stderr.close() + new_stdout = self.stdout_read.readlines() + if self.verbose: + for line in new_stdout: + sys.stdout.write("{}: {}".format(self.prefix, line)) + self.logs += [l.rstrip() for l in new_stdout] + new_stderr = self.stderr_read.readlines() + if self.verbose: + for line in new_stderr: + sys.stderr.write("{}-stderr: {}".format(self.prefix, line)) + self.err_logs += [l.rstrip() for l in new_stderr] + return len(new_stdout) > 0 or len(new_stderr) > 0 def is_in_log(self, regex, start=0): """Look for `regex` in the logs.""" + self.logs_catchup() ex = re.compile(regex) for l in self.logs[start:]: if ex.search(l): @@ -286,6 +258,7 @@ def is_in_log(self, regex, start=0): def is_in_stderr(self, regex): """Look for `regex` in stderr.""" + self.logs_catchup() ex = re.compile(regex) for l in self.err_logs: if ex.search(l): @@ -311,31 +284,29 @@ def wait_for_logs(self, regexs, timeout=TIMEOUT): logging.debug("Waiting for {} in the logs".format(regexs)) exs = [re.compile(r) for r in regexs] start_time = time.time() - pos = self.logsearch_start while True: - if timeout is not None and time.time() > start_time + timeout: - print("Time-out: can't find {} in logs".format(exs)) - for r in exs: - if self.is_in_log(r): - print("({} was previously in logs!)".format(r)) - raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) - - with self.logs_cond: - if pos >= len(self.logs): - if not self.running: - raise ValueError('Process died while waiting for logs') - self.logs_cond.wait(1) - continue - - for r in exs.copy(): - self.logsearch_start = pos + 1 - if r.search(self.logs[pos]): - logging.debug("Found '%s' in logs", r) - exs.remove(r) - break - if len(exs) == 0: - return self.logs[pos] - pos += 1 + if self.logsearch_start >= len(self.logs): + if not self.logs_catchup(): + time.sleep(0.25) + + if timeout is not None and time.time() > start_time + timeout: + print("Time-out: can't find {} in logs".format(exs)) + for r in exs: + if self.is_in_log(r): + print("({} was previously in logs!)".format(r)) + raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) + continue + + line = self.logs[self.logsearch_start] + self.logsearch_start += 1 + for r in exs.copy(): + if r.search(line): + logging.debug("Found '%s' in logs", r) + exs.remove(r) + if len(exs) == 0: + return line + # Don't match same line with different regexs! + break def wait_for_log(self, regex, timeout=TIMEOUT): """Look for `regex` in the logs. @@ -620,10 +591,9 @@ def cmd_line(self): return self.cmd_prefix + [self.executable] + opts - def start(self, stdin=None, stdout=None, stderr=None, - wait_for_initialized=True): + def start(self, stdin=None, wait_for_initialized=True): self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport - TailableProc.start(self, stdin, stdout, stderr) + TailableProc.start(self, stdin) if wait_for_initialized: self.wait_for_log("Server started with public key") logging.info("LightningD started") @@ -852,8 +822,8 @@ def is_synced_with_bitcoin(self, info=None): info = self.rpc.getinfo() return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info - def start(self, wait_for_bitcoind_sync=True, stderr=None): - self.daemon.start(stderr=stderr) + def start(self, wait_for_bitcoind_sync=True): + self.daemon.start() # Cache `getinfo`, we'll be using it a lot self.info = self.rpc.getinfo() # This shortcut is sufficient for our simple tests. @@ -878,7 +848,6 @@ def stop(self, timeout=10): if self.rc is None: self.rc = self.daemon.stop() - self.daemon.save_log() self.daemon.cleanup() if self.rc != 0 and not self.may_fail: @@ -1417,12 +1386,7 @@ def get_node(self, node_id=None, options=None, dbfile=None, if start: try: - # Capture stderr if we're failing - if expect_fail: - stderr = subprocess.PIPE - else: - stderr = None - node.start(wait_for_bitcoind_sync, stderr=stderr) + node.start(wait_for_bitcoind_sync) except Exception: if expect_fail: return node diff --git a/tests/test_misc.py b/tests/test_misc.py index c125c65d5d92..dd3ee8ab8ef6 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -109,13 +109,13 @@ def crash_bitcoincli(r): # Ignore BROKEN log message about blocksonly mode. l2 = node_factory.get_node(start=False, expect_fail=True, allow_broken_log=True) - with pytest.raises(ValueError): - l2.start(stderr=subprocess.PIPE) + l2.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l2.daemon.wait() == 1 assert l2.daemon.is_in_stderr(r".*deactivating transaction relay is not" - " supported.") is not None - # wait_for_log gets upset since daemon is not running. - wait_for(lambda: l2.daemon.is_in_log('deactivating transaction' - ' relay is not supported')) + " supported.") + assert l2.daemon.is_in_log('deactivating transaction' + ' relay is not supported') def test_bitcoin_ibd(node_factory, bitcoind): @@ -1208,8 +1208,10 @@ def test_rescan(node_factory, bitcoind): l1.daemon.opts['rescan'] = -500000 l1.stop() bitcoind.generate_block(4) - with pytest.raises(ValueError): - l1.start() + l1.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l1.daemon.wait() == 1 + assert l1.daemon.is_in_stderr(r"bitcoind has gone backwards from 500000 to 105 blocks!") # Restarting with future absolute blockheight is fine if we can find it. l1.daemon.opts['rescan'] = -105 @@ -2057,8 +2059,9 @@ def test_new_node_is_mainnet(node_factory): del l1.daemon.opts['network'] # Wrong chain, will fail to start, but that's OK. - with pytest.raises(ValueError): - l1.start() + l1.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l1.daemon.wait() == 1 # Should create these assert os.path.isfile(os.path.join(netdir, "hsm_secret")) diff --git a/tests/test_wallet.py b/tests/test_wallet.py index 9c3854788298..16e2e0cee64e 100644 --- a/tests/test_wallet.py +++ b/tests/test_wallet.py @@ -1037,8 +1037,7 @@ def test_hsm_secret_encryption(node_factory): # Test we cannot restore the same wallet with another password l1.daemon.opts.update({"encrypted-hsm": None}) - l1.daemon.start(stdin=slave_fd, stderr=subprocess.STDOUT, - wait_for_initialized=False) + l1.daemon.start(stdin=slave_fd, wait_for_initialized=False) l1.daemon.wait_for_log(r'Enter hsm_secret password') write_all(master_fd, password[2:].encode("utf-8")) assert(l1.daemon.proc.wait(WAIT_TIMEOUT) == HSM_BAD_PASSWORD) @@ -1068,8 +1067,8 @@ def test_hsm_secret_encryption(node_factory): class HsmTool(TailableProc): """Helper for testing the hsmtool as a subprocess""" - def __init__(self, *args): - TailableProc.__init__(self) + def __init__(self, directory, *args): + TailableProc.__init__(self, os.path.join(directory, "hsmtool")) self.prefix = "hsmtool" assert hasattr(self, "env") if VALGRIND: @@ -1103,9 +1102,8 @@ def test_hsmtool_secret_decryption(node_factory): # We can't use a wrong password ! master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, "A wrong pass\n\n".encode("utf-8")) hsmtool.proc.wait(WAIT_TIMEOUT) @@ -1113,8 +1111,7 @@ def test_hsmtool_secret_decryption(node_factory): # Decrypt it with hsmtool master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 @@ -1127,9 +1124,8 @@ def test_hsmtool_secret_decryption(node_factory): # Test we can encrypt it offline master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("encrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "encrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) hsmtool.wait_for_log(r"Confirm hsm_secret password:") @@ -1142,7 +1138,7 @@ def test_hsmtool_secret_decryption(node_factory): l1.daemon.opts.update({"encrypted-hsm": None}) master_fd, slave_fd = os.openpty() - l1.daemon.start(stdin=slave_fd, stderr=subprocess.STDOUT, + l1.daemon.start(stdin=slave_fd, wait_for_initialized=False) l1.daemon.wait_for_log(r'The hsm_secret is encrypted') @@ -1154,9 +1150,8 @@ def test_hsmtool_secret_decryption(node_factory): # And finally test that we can also decrypt if encrypted with hsmtool master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 @@ -1166,9 +1161,8 @@ def test_hsmtool_secret_decryption(node_factory): # We can roundtrip encryption and decryption using a password provided # through stdin. - hsmtool = HsmTool("encrypt", hsm_path) - hsmtool.start(stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "encrypt", hsm_path) + hsmtool.start(stdin=subprocess.PIPE) hsmtool.proc.stdin.write(password.encode("utf-8")) hsmtool.proc.stdin.write(password.encode("utf-8")) hsmtool.proc.stdin.flush() @@ -1176,9 +1170,8 @@ def test_hsmtool_secret_decryption(node_factory): assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log("Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) hsmtool.wait_for_log("Successfully decrypted") @@ -1236,19 +1229,17 @@ def test_hsmtool_generatehsm(node_factory): hsm_path = os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, "hsm_secret") - hsmtool = HsmTool("generatehsm", hsm_path) + hsmtool = HsmTool(node_factory.directory, "generatehsm", hsm_path) # You cannot re-generate an already existing hsm_secret master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 2 os.remove(hsm_path) # We can generate a valid hsm_secret from a wordlist and a "passphrase" master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Select your language:") write_all(master_fd, "0\n".encode("utf-8")) hsmtool.wait_for_log(r"Introduce your BIP39 word list")