diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py index ab0dba2f21ee..57445575efc7 100644 --- a/contrib/pyln-testing/pyln/testing/utils.py +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -174,13 +174,21 @@ class TailableProc(object): tail the processes and react to their output. """ - def __init__(self, outputDir=None, verbose=True): + def __init__(self, outputDir, verbose=True): self.logs = [] - self.logs_cond = threading.Condition(threading.RLock()) self.env = os.environ.copy() self.running = False self.proc = None self.outputDir = outputDir + if not os.path.exists(outputDir): + os.makedirs(outputDir) + # Create and open them. + self.stdout_filename = os.path.join(outputDir, "log") + self.stderr_filename = os.path.join(outputDir, "errlog") + self.stdout_write = open(self.stdout_filename, "wt") + self.stderr_write = open(self.stderr_filename, "wt") + self.stdout_read = open(self.stdout_filename, "rt") + self.stderr_read = open(self.stderr_filename, "rt") self.logsearch_start = 0 self.err_logs = [] self.prefix = "" @@ -192,29 +200,17 @@ def __init__(self, outputDir=None, verbose=True): # pass it to the log matcher and not print it to stdout). self.log_filter = lambda line: False - def start(self, stdin=None, stdout=None, stderr=None): + def start(self, stdin=None): """Start the underlying process and start monitoring it. """ logging.debug("Starting '%s'", " ".join(self.cmd_line)) self.proc = subprocess.Popen(self.cmd_line, stdin=stdin, - stdout=stdout if stdout else subprocess.PIPE, - stderr=stderr, + stdout=self.stdout_write, + stderr=self.stderr_write, env=self.env) - self.thread = threading.Thread(target=self.tail) - self.thread.daemon = True - self.thread.start() - self.running = True - - def save_log(self): - if self.outputDir: - logpath = os.path.join(self.outputDir, 'log') - with open(logpath, 'w') as f: - for l in self.logs: - f.write(l + '\n') def stop(self, timeout=10): - self.save_log() self.proc.terminate() # Now give it some time to react to the signal @@ -224,56 +220,32 @@ def stop(self, timeout=10): self.proc.kill() self.proc.wait() - self.thread.join() - return self.proc.returncode def kill(self): """Kill process without giving it warning.""" self.proc.kill() self.proc.wait() - self.thread.join() - - def tail(self): - """Tail the stdout of the process and remember it. - Stores the lines of output produced by the process in - self.logs and signals that a new line was read so that it can - be picked up by consumers. + def logs_catchup(self): + """Save the latest stdout / stderr contents; return true if we got anything. """ - for line in iter(self.proc.stdout.readline, ''): - if len(line) == 0: - break - - line = line.decode('UTF-8', 'replace').rstrip() - - if self.log_filter(line): - continue - - if self.verbose: - sys.stdout.write("{}: {}\n".format(self.prefix, line)) - - with self.logs_cond: - self.logs.append(line) - self.logs_cond.notifyAll() - - self.running = False - self.proc.stdout.close() - - if self.proc.stderr: - for line in iter(self.proc.stderr.readline, ''): - - if line is None or len(line) == 0: - break - - line = line.rstrip().decode('UTF-8', 'replace') - self.err_logs.append(line) - - self.proc.stderr.close() + new_stdout = self.stdout_read.readlines() + if self.verbose: + for line in new_stdout: + sys.stdout.write("{}: {}".format(self.prefix, line)) + self.logs += [l.rstrip() for l in new_stdout] + new_stderr = self.stderr_read.readlines() + if self.verbose: + for line in new_stderr: + sys.stderr.write("{}-stderr: {}".format(self.prefix, line)) + self.err_logs += [l.rstrip() for l in new_stderr] + return len(new_stdout) > 0 or len(new_stderr) > 0 def is_in_log(self, regex, start=0): """Look for `regex` in the logs.""" + self.logs_catchup() ex = re.compile(regex) for l in self.logs[start:]: if ex.search(l): @@ -286,6 +258,7 @@ def is_in_log(self, regex, start=0): def is_in_stderr(self, regex): """Look for `regex` in stderr.""" + self.logs_catchup() ex = re.compile(regex) for l in self.err_logs: if ex.search(l): @@ -311,31 +284,29 @@ def wait_for_logs(self, regexs, timeout=TIMEOUT): logging.debug("Waiting for {} in the logs".format(regexs)) exs = [re.compile(r) for r in regexs] start_time = time.time() - pos = self.logsearch_start while True: - if timeout is not None and time.time() > start_time + timeout: - print("Time-out: can't find {} in logs".format(exs)) - for r in exs: - if self.is_in_log(r): - print("({} was previously in logs!)".format(r)) - raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) - - with self.logs_cond: - if pos >= len(self.logs): - if not self.running: - raise ValueError('Process died while waiting for logs') - self.logs_cond.wait(1) - continue - - for r in exs.copy(): - self.logsearch_start = pos + 1 - if r.search(self.logs[pos]): - logging.debug("Found '%s' in logs", r) - exs.remove(r) - break - if len(exs) == 0: - return self.logs[pos] - pos += 1 + if self.logsearch_start >= len(self.logs): + if not self.logs_catchup(): + time.sleep(0.25) + + if timeout is not None and time.time() > start_time + timeout: + print("Time-out: can't find {} in logs".format(exs)) + for r in exs: + if self.is_in_log(r): + print("({} was previously in logs!)".format(r)) + raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) + continue + + line = self.logs[self.logsearch_start] + self.logsearch_start += 1 + for r in exs.copy(): + if r.search(line): + logging.debug("Found '%s' in logs", r) + exs.remove(r) + if len(exs) == 0: + return line + # Don't match same line with different regexs! + break def wait_for_log(self, regex, timeout=TIMEOUT): """Look for `regex` in the logs. @@ -620,10 +591,9 @@ def cmd_line(self): return self.cmd_prefix + [self.executable] + opts - def start(self, stdin=None, stdout=None, stderr=None, - wait_for_initialized=True): + def start(self, stdin=None, wait_for_initialized=True): self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport - TailableProc.start(self, stdin, stdout, stderr) + TailableProc.start(self, stdin) if wait_for_initialized: self.wait_for_log("Server started with public key") logging.info("LightningD started") @@ -852,8 +822,8 @@ def is_synced_with_bitcoin(self, info=None): info = self.rpc.getinfo() return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info - def start(self, wait_for_bitcoind_sync=True, stderr=None): - self.daemon.start(stderr=stderr) + def start(self, wait_for_bitcoind_sync=True): + self.daemon.start() # Cache `getinfo`, we'll be using it a lot self.info = self.rpc.getinfo() # This shortcut is sufficient for our simple tests. @@ -878,7 +848,6 @@ def stop(self, timeout=10): if self.rc is None: self.rc = self.daemon.stop() - self.daemon.save_log() self.daemon.cleanup() if self.rc != 0 and not self.may_fail: @@ -1417,12 +1386,7 @@ def get_node(self, node_id=None, options=None, dbfile=None, if start: try: - # Capture stderr if we're failing - if expect_fail: - stderr = subprocess.PIPE - else: - stderr = None - node.start(wait_for_bitcoind_sync, stderr=stderr) + node.start(wait_for_bitcoind_sync) except Exception: if expect_fail: return node diff --git a/tests/test_misc.py b/tests/test_misc.py index c125c65d5d92..dd3ee8ab8ef6 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -109,13 +109,13 @@ def crash_bitcoincli(r): # Ignore BROKEN log message about blocksonly mode. l2 = node_factory.get_node(start=False, expect_fail=True, allow_broken_log=True) - with pytest.raises(ValueError): - l2.start(stderr=subprocess.PIPE) + l2.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l2.daemon.wait() == 1 assert l2.daemon.is_in_stderr(r".*deactivating transaction relay is not" - " supported.") is not None - # wait_for_log gets upset since daemon is not running. - wait_for(lambda: l2.daemon.is_in_log('deactivating transaction' - ' relay is not supported')) + " supported.") + assert l2.daemon.is_in_log('deactivating transaction' + ' relay is not supported') def test_bitcoin_ibd(node_factory, bitcoind): @@ -1208,8 +1208,10 @@ def test_rescan(node_factory, bitcoind): l1.daemon.opts['rescan'] = -500000 l1.stop() bitcoind.generate_block(4) - with pytest.raises(ValueError): - l1.start() + l1.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l1.daemon.wait() == 1 + assert l1.daemon.is_in_stderr(r"bitcoind has gone backwards from 500000 to 105 blocks!") # Restarting with future absolute blockheight is fine if we can find it. l1.daemon.opts['rescan'] = -105 @@ -2057,8 +2059,9 @@ def test_new_node_is_mainnet(node_factory): del l1.daemon.opts['network'] # Wrong chain, will fail to start, but that's OK. - with pytest.raises(ValueError): - l1.start() + l1.daemon.start(wait_for_initialized=False) + # Will exit with failure code. + assert l1.daemon.wait() == 1 # Should create these assert os.path.isfile(os.path.join(netdir, "hsm_secret")) diff --git a/tests/test_wallet.py b/tests/test_wallet.py index 9c3854788298..16e2e0cee64e 100644 --- a/tests/test_wallet.py +++ b/tests/test_wallet.py @@ -1037,8 +1037,7 @@ def test_hsm_secret_encryption(node_factory): # Test we cannot restore the same wallet with another password l1.daemon.opts.update({"encrypted-hsm": None}) - l1.daemon.start(stdin=slave_fd, stderr=subprocess.STDOUT, - wait_for_initialized=False) + l1.daemon.start(stdin=slave_fd, wait_for_initialized=False) l1.daemon.wait_for_log(r'Enter hsm_secret password') write_all(master_fd, password[2:].encode("utf-8")) assert(l1.daemon.proc.wait(WAIT_TIMEOUT) == HSM_BAD_PASSWORD) @@ -1068,8 +1067,8 @@ def test_hsm_secret_encryption(node_factory): class HsmTool(TailableProc): """Helper for testing the hsmtool as a subprocess""" - def __init__(self, *args): - TailableProc.__init__(self) + def __init__(self, directory, *args): + TailableProc.__init__(self, os.path.join(directory, "hsmtool")) self.prefix = "hsmtool" assert hasattr(self, "env") if VALGRIND: @@ -1103,9 +1102,8 @@ def test_hsmtool_secret_decryption(node_factory): # We can't use a wrong password ! master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, "A wrong pass\n\n".encode("utf-8")) hsmtool.proc.wait(WAIT_TIMEOUT) @@ -1113,8 +1111,7 @@ def test_hsmtool_secret_decryption(node_factory): # Decrypt it with hsmtool master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 @@ -1127,9 +1124,8 @@ def test_hsmtool_secret_decryption(node_factory): # Test we can encrypt it offline master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("encrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "encrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) hsmtool.wait_for_log(r"Confirm hsm_secret password:") @@ -1142,7 +1138,7 @@ def test_hsmtool_secret_decryption(node_factory): l1.daemon.opts.update({"encrypted-hsm": None}) master_fd, slave_fd = os.openpty() - l1.daemon.start(stdin=slave_fd, stderr=subprocess.STDOUT, + l1.daemon.start(stdin=slave_fd, wait_for_initialized=False) l1.daemon.wait_for_log(r'The hsm_secret is encrypted') @@ -1154,9 +1150,8 @@ def test_hsmtool_secret_decryption(node_factory): # And finally test that we can also decrypt if encrypted with hsmtool master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 @@ -1166,9 +1161,8 @@ def test_hsmtool_secret_decryption(node_factory): # We can roundtrip encryption and decryption using a password provided # through stdin. - hsmtool = HsmTool("encrypt", hsm_path) - hsmtool.start(stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "encrypt", hsm_path) + hsmtool.start(stdin=subprocess.PIPE) hsmtool.proc.stdin.write(password.encode("utf-8")) hsmtool.proc.stdin.write(password.encode("utf-8")) hsmtool.proc.stdin.flush() @@ -1176,9 +1170,8 @@ def test_hsmtool_secret_decryption(node_factory): assert hsmtool.proc.wait(WAIT_TIMEOUT) == 0 master_fd, slave_fd = os.openpty() - hsmtool = HsmTool("decrypt", hsm_path) - hsmtool.start(stdin=slave_fd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + hsmtool = HsmTool(node_factory.directory, "decrypt", hsm_path) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log("Enter hsm_secret password:") write_all(master_fd, password.encode("utf-8")) hsmtool.wait_for_log("Successfully decrypted") @@ -1236,19 +1229,17 @@ def test_hsmtool_generatehsm(node_factory): hsm_path = os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, "hsm_secret") - hsmtool = HsmTool("generatehsm", hsm_path) + hsmtool = HsmTool(node_factory.directory, "generatehsm", hsm_path) # You cannot re-generate an already existing hsm_secret master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) assert hsmtool.proc.wait(WAIT_TIMEOUT) == 2 os.remove(hsm_path) # We can generate a valid hsm_secret from a wordlist and a "passphrase" master_fd, slave_fd = os.openpty() - hsmtool.start(stdin=slave_fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + hsmtool.start(stdin=slave_fd) hsmtool.wait_for_log(r"Select your language:") write_all(master_fd, "0\n".encode("utf-8")) hsmtool.wait_for_log(r"Introduce your BIP39 word list")