diff --git a/custodian/gaussian/handlers.py b/custodian/gaussian/handlers.py index 74f5e5b0..0983d213 100644 --- a/custodian/gaussian/handlers.py +++ b/custodian/gaussian/handlers.py @@ -69,7 +69,7 @@ class GaussianErrorHandler(ErrorHandler): error_patt = re.compile("|".join(list(error_defs))) recom_mem_patt = re.compile( - r"Use %mem=([0-9]+)MW to provide the minimum " r"amount of memory required to complete this " r"step." + r"Use %mem=([0-9]+)MW to provide the minimum amount of memory required to complete this step." ) conv_critera = { "max_force": re.compile(r"\s+(Maximum Force)\s+(-?\d+.?\d*|.*)\s+(-?\d+.?\d*)"), @@ -453,7 +453,7 @@ def _monitor_convergence(data: dict[str, dict[str, Any]], directory: str = "./") import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator - fig, ax = plt.subplots(ncols=2, nrows=2, figsize=(12, 10)) + _fig, ax = plt.subplots(ncols=2, nrows=2, figsize=(12, 10)) for i, (k, v) in enumerate(data["values"].items()): row = int(np.floor(i / 2)) col = i % 2 @@ -629,9 +629,7 @@ def correct(self, directory: str = "./"): # restart the job at the point it stopped while forcing Gaussian # to rebuild the set of redundant internals if not list(filter(re.compile(r"%[Cc][Hh][Kk]").match, self.gin.link0_parameters.keys())): - raise KeyError( - "This remedy reads coords from checkpoint " "file. Consider adding CHK to link0_parameters" - ) + raise KeyError("This remedy reads coords from checkpoint file. Consider adding CHK to link0_parameters") self.gin = GaussianInput( mol=None, charge=self.gin.charge, @@ -661,7 +659,7 @@ def correct(self, directory: str = "./"): self.gin.input_parameters.update({"surface": "SAS"}) actions.append({"surface": "SAS"}) else: - self.logger.info("Not sure how to fix " "solute_solvent_surface_error if surface is " "already SAS!") + self.logger.info("Not sure how to fix solute_solvent_surface_error if surface is already SAS!") return {"errors": [self.errors], "actions": None} elif "internal_coords" in self.errors: @@ -700,22 +698,22 @@ def correct(self, directory: str = "./"): if set(last_lines) != {"\n"}: # if the required blank lines at the end of the input file are # missing, just rewrite the file - self.logger.info("Missing blank line at the end of the input " "file.") + self.logger.info("Missing blank line at the end of the input file.") actions.append({"blank_lines": "rewrite_input_file"}) else: - self.logger.info("Not sure how to fix zmatrix error. " "Check manually?") + self.logger.info("Not sure how to fix zmatrix error. Check manually?") return {"errors": [self.errors], "actions": None} elif "coords" in self.errors: if "connectivity" in self.gin.route_parameters.get("geom"): - self.logger.info("Explicit atom bonding is requested but no " "such input is provided") + self.logger.info("Explicit atom bonding is requested but no such input is provided") if isinstance(self.gin.route_parameters["geom"], dict) and len(self.gin.route_parameters["geom"]) > 1: self.gin.route_parameters["geom"].pop("connectivity", None) else: del self.gin.route_parameters["geom"] actions.append({"coords": "remove_connectivity"}) else: - self.logger.info("Missing connectivity info. Not sure how to " "fix this error. Exiting!") + self.logger.info("Missing connectivity info. Not sure how to fix this error. Exiting!") return {"errors": [self.errors], "actions": None} elif "found_coords" in self.errors: @@ -728,9 +726,7 @@ def correct(self, directory: str = "./"): self.gin._mol = None actions.append({"mol": "remove_from_input"}) else: - self.logger.info( - "Not sure why atom specifications should not " "be found in the input. Examine manually!" - ) + self.logger.info("Not sure why atom specifications should not be found in the input. Examine manually!") return {"errors": [self.errors], "actions": None} elif "coord_inputs" in self.errors: @@ -764,20 +760,18 @@ def correct(self, directory: str = "./"): # initial guess be read from the checkpoint file but forgot to # take the geom from the checkpoint file, add geom=check if not glob.glob("*.[Cc][Hh][Kk]"): - raise FileNotFoundError( - "This remedy reads geometry from " "checkpoint file. This file is " "missing!" - ) + raise FileNotFoundError("This remedy reads geometry from checkpoint file. This file is missing!") GaussianErrorHandler._update_route_params(self.gin.route_parameters, "geom", "check") self.gin.route_parameters["geom"] = "check" actions.append({"mol": "get_from_checkpoint"}) else: # error cannot be fixed automatically. Return None for actions - self.logger.info("Molecule is not found in the input file. " "Fix manually!") + self.logger.info("Molecule is not found in the input file. Fix manually!") # TODO: check if logger.info is enough here or return is needed return {"errors": list(self.errors), "actions": None} elif any(err in self.errors for err in ["empty_file", "bad_file"]): - self.logger.error("Required checkpoint file is bad. Fix " "manually!") + self.logger.error("Required checkpoint file is bad. Fix manually!") return {"errors": list(self.errors), "actions": None} elif "missing_file" in self.errors: @@ -786,7 +780,7 @@ def correct(self, directory: str = "./"): elif "syntax" in self.errors: # error cannot be fixed automatically. Return None for actions - self.logger.info("A syntax error was detected in the input file. " "Fix manually!") + self.logger.info("A syntax error was detected in the input file. Fix manually!") return {"errors": list(self.errors), "actions": None} elif "insufficient_mem" in self.errors: @@ -797,11 +791,11 @@ def correct(self, directory: str = "./"): self.gin.link0_parameters[mem_key] = f"{mem}MB" actions.append({"memory": "increase_to_gaussian_recommendation"}) else: - self.logger.info("Check job memory requirements manually and " "set as needed.") + self.logger.info("Check job memory requirements manually and set as needed.") return {"errors": list(self.errors), "actions": None} else: - self.logger.info("Must have gotten an error that is parsed but not " "handled yet. Fix manually!") + self.logger.info("Must have gotten an error that is parsed but not handled yet. Fix manually!") return {"errors": list(self.errors), "actions": None} os.rename( @@ -894,6 +888,6 @@ def correct(self, directory: str = "./") -> dict: f.write("\n".join(input_str)) return {"errors": ["wall_time_limit"], "actions": None} self.logger.info( - "Wall time handler requires a read-write gaussian " "file to be available. No such file is found." + "Wall time handler requires a read-write gaussian file to be available. No such file is found." ) return {"errors": ["Walltime reached but not rwf file found"], "actions": None} diff --git a/tests/gaussian/test_handlers.py b/tests/gaussian/test_handlers.py index 9ca95854..ea4ea5ba 100644 --- a/tests/gaussian/test_handlers.py +++ b/tests/gaussian/test_handlers.py @@ -29,7 +29,7 @@ def gunzip_file(gauss_file): class TestGaussianErrorHandler(TestCase): def setUp(self): - os.makedirs(SCR_DIR) + os.makedirs(SCR_DIR, exist_ok=True) os.chdir(SCR_DIR) def test_opt_steps_cycles(self): @@ -316,7 +316,7 @@ def tearDown(self): class TestWallTimeErrorHandler(TestCase): def setUp(self): - os.makedirs(SCR_DIR) + os.makedirs(SCR_DIR, exist_ok=True) os.chdir(SCR_DIR) os.environ.pop("JOB_START_TIME", None) gunzip_file(f"{TEST_DIR}/walltime.out.gz") @@ -331,7 +331,7 @@ def test_walltime_init(self): output_file="wall_time.out", ) init_time = handler.init_time - assert os.environ.get("JOB_START_TIME") == init_time.strftime("%a %b %d %H:%M:%S UTC %Y") + assert os.environ.get("JOB_START_TIME") == f"{init_time:%a %b %d %H:%M:%S UTC %Y}" # Test that walltime persists if new handler is created handler = WallTimeErrorHandler( wall_time=3600, @@ -339,7 +339,7 @@ def test_walltime_init(self): input_file="walltime.com", output_file="walltime.out", ) - assert os.environ.get("JOB_START_TIME") == init_time.strftime("%a %b %d %H:%M:%S UTC %Y") + assert os.environ.get("JOB_START_TIME") == f"{init_time:%a %b %d %H:%M:%S UTC %Y}" def test_walltime_check_and_correct(self): # Try a 1 hr wall time with a 5 mins buffer diff --git a/tests/gaussian/test_jobs.py b/tests/gaussian/test_jobs.py index 978f2477..e69a10e9 100644 --- a/tests/gaussian/test_jobs.py +++ b/tests/gaussian/test_jobs.py @@ -29,7 +29,7 @@ def setUp(self): self.backup = True self.directory = SCR_DIR - os.makedirs(SCR_DIR) + os.makedirs(SCR_DIR, exist_ok=True) shutil.copyfile(f"{TEST_DIR}/mol_opt.com", f"{SCR_DIR}/test.com") os.chdir(SCR_DIR) @@ -41,7 +41,7 @@ def tearDown(self): os.remove(file_path) def test_normal(self): - g = GaussianJob( + job = GaussianJob( self.gaussian_cmd, self.input_file, self.output_file, @@ -49,17 +49,17 @@ def test_normal(self): self.suffix, self.backup, ) - g.setup() + job.setup() assert os.path.exists(f"{SCR_DIR}/test.com.orig") with gzip.open(f"{TEST_DIR}/mol_opt.out.gz", "rb") as f_in, open(f"{TEST_DIR}/mol_opt.out", "wb") as f_out: shutil.copyfileobj(f_in, f_out) shutil.copy(f"{TEST_DIR}/mol_opt.out", f"{SCR_DIR}/test.out") - g.postprocess() + job.postprocess() assert os.path.exists(f"{SCR_DIR}/test.com{self.suffix}") assert os.path.exists(f"{SCR_DIR}/test.out{self.suffix}") def test_better_guess(self): - g = GaussianJob.generate_better_guess( + job_gen = GaussianJob.generate_better_guess( self.gaussian_cmd, self.input_file, self.output_file, @@ -68,7 +68,7 @@ def test_better_guess(self): True, self.directory, ) - jobs = list(g) + jobs = list(job_gen) assert len(jobs) == 1, "One job should be generated under normal conditions." jobs[0].setup() assert os.path.exists(f"{SCR_DIR}/test.com.orig") diff --git a/tests/vasp/test_handlers.py b/tests/vasp/test_handlers.py index b0702fdf..ad0f3059 100644 --- a/tests/vasp/test_handlers.py +++ b/tests/vasp/test_handlers.py @@ -685,7 +685,7 @@ def test_check_correct_electronic(self): assert handler.check() dct = handler.correct() assert dct["errors"] == ["Unconverged"] - assert [{"dict": "INCAR", "action": {"_set": {"ALGO": "Damped", "TIME": 0.5}}}] == dct["actions"] + assert dct["actions"] == [{"dict": "INCAR", "action": {"_set": {"ALGO": "Damped", "TIME": 0.5}}}] def test_check_correct_electronic_repeat(self): shutil.copy("vasprun.xml.electronic2", "vasprun.xml") @@ -714,7 +714,7 @@ def test_amin(self): handler = UnconvergedErrorHandler() assert handler.check() dct = handler.correct() - assert [{"dict": "INCAR", "action": {"_set": {"AMIN": 0.01}}}] == dct["actions"] + assert dct["actions"] == [{"dict": "INCAR", "action": {"_set": {"AMIN": 0.01}}}] def test_as_from_dict(self): handler = UnconvergedErrorHandler("random_name.xml")