From 8c9732a30995b4ddaaf3f46610b962e9ccc79d1e Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:19:54 -0400 Subject: [PATCH 01/12] refactor: Update gmx lysozyme example --- .../flow.gmx-lysozyme-in-water/project.py | 71 +++++++------------ 1 file changed, 24 insertions(+), 47 deletions(-) diff --git a/projects/flow.gmx-lysozyme-in-water/project.py b/projects/flow.gmx-lysozyme-in-water/project.py index d084790b..de227ef0 100644 --- a/projects/flow.gmx-lysozyme-in-water/project.py +++ b/projects/flow.gmx-lysozyme-in-water/project.py @@ -1,10 +1,8 @@ -import flow import pexpect # Used to automate interaction with GROMACS interface. import signac from flow import FlowProject gmx_exec = "gmx" # or use gmx_mpi if available -mpi_exec = "mpirun" """Define file level constants.""" @@ -96,16 +94,10 @@ def _grompp_str(op_name, gro_name, checkpoint_file=None): return workspace_command(cmd) -def _mdrun_str(op_name, np=1, nt=None, verbose=False): +def _mdrun_str(op_name, nt=None, verbose=False): """Helper function, returns mdrun command string for operation.""" num_threads = 1 if nt is None else nt - num_nodes = np // num_threads - cmd = ( - "OMP_NUM_THREADS={num_threads} {mpi_exec} -n {np} {gmx} " - "mdrun -ntomp {num_threads} {verbose} -deffnm {op}" - ).format( - np=num_nodes, - mpi_exec=mpi_exec, + cmd = ("{gmx} mdrun -ntomp {num_threads} {verbose} -deffnm {op}").format( gmx=gmx_exec, num_threads=num_threads, op=op_name, @@ -115,9 +107,8 @@ def _mdrun_str(op_name, np=1, nt=None, verbose=False): # First three steps are simple configuration -@MyProject.operation @MyProject.post.isfile(gro_file) -@flow.cmd +@MyProject.operation(cmd=True) def pdb2gmx(job): return workspace_command( "{gmx} pdb2gmx -f {pdb_file} -o {gro_file} -water {water_model} " @@ -131,10 +122,9 @@ def pdb2gmx(job): ) -@MyProject.operation @MyProject.pre.after(pdb2gmx) @MyProject.post.isfile(boxed_file) -@flow.cmd +@MyProject.operation(cmd=True) def editconf(job): return workspace_command( "{gmx} editconf -f {gro_file} -o {boxed_file} -c -d {edge_spacing} " @@ -148,10 +138,9 @@ def editconf(job): ) -@MyProject.operation @MyProject.pre.isfile(boxed_file) @MyProject.post.isfile(solvated_file) -@flow.cmd +@MyProject.operation(cmd=True) def solvate(job): return workspace_command( "{gmx} solvate -cp {boxed_file} -cs {solvent_configuration} " @@ -176,17 +165,16 @@ def solvate(job): # automate responding to requested std input -@MyProject.operation @MyProject.pre.isfile(solvated_file) @MyProject.post.isfile(ionization_config) -@flow.cmd +@MyProject.operation(cmd=True) def grompp_add_ions(job): - return _grompp_str("ions", solvated_file) + return _grompp_str("ions", solvated_file).format(job) -@MyProject.operation @MyProject.pre.after(grompp_add_ions) @MyProject.post(prepared_for_simulation) +@MyProject.operation def ionize(job): """Exploit the pexpect module to run.""" with job: @@ -210,73 +198,62 @@ def ionize(job): # Minimization -@MyProject.operation @MyProject.pre(prepared_for_simulation) @MyProject.post.isfile(em_op + ".tpr") -@flow.cmd +@MyProject.operation(cmd=True) def grompp_minim(job): - return _grompp_str("minim", ionized_file) + return _grompp_str("minim", ionized_file).format(job) -@MyProject.operation @MyProject.pre.after(grompp_minim) @MyProject.post.isfile(em_file) -@flow.cmd +@MyProject.operation(cmd=True) def minim(job): - return _mdrun_str("minim") + return _mdrun_str("minim").format(job) # Equilibration: NVT then NPT -@MyProject.operation @MyProject.pre.after(minim) @MyProject.post.isfile(nvt_op + ".tpr") -@flow.cmd +@MyProject.operation(cmd=True) def grompp_nvt(job): - return _grompp_str("nvt", em_file) + return _grompp_str("nvt", em_file).format(job) -@MyProject.operation @MyProject.pre.after(grompp_nvt) @MyProject.post.isfile(nvt_file) -@flow.cmd -@flow.directives(np=16) +@MyProject.operation(cmd=True, directives={"np": 16}) def nvt(job): - return _mdrun_str("nvt") + return _mdrun_str("nvt").format(job) -@MyProject.operation @MyProject.pre.after(nvt) @MyProject.post.isfile(npt_op + ".tpr") -@flow.cmd +@MyProject.operation(cmd=True) def grompp_npt(job): - return _grompp_str("npt", nvt_file) + return _grompp_str("npt", nvt_file).format(job) -@MyProject.operation @MyProject.pre.isfile(npt_op + ".tpr") @MyProject.post.isfile(npt_file) -@flow.cmd -@flow.directives(np=16) +@MyProject.operation(cmd=True, directives={"np": 16}) def npt(job): - return _mdrun_str("npt") + return _mdrun_str("npt").format(job) # Final run -@MyProject.operation @MyProject.pre.isfile(npt_file) @MyProject.post.isfile(production_op + ".tpr") -@flow.cmd +@MyProject.operation(cmd=True) def grompp_md(job): - return _grompp_str("md", npt_file) + return _grompp_str("md", npt_file).format(job) -@MyProject.operation @MyProject.pre.after(grompp_md) @MyProject.post(finished) -@flow.cmd -@flow.directives(np=16) +@MyProject.operation(cmd=True, directives={"np": 16}) def md(job): - return _mdrun_str("md") + return _mdrun_str("md").format(job) if __name__ == "__main__": From da5f876fff346c61158d0e1e0c8466dc474cf08c Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:22:40 -0400 Subject: [PATCH 02/12] refactor: Update flow aggregation-mpi test --- projects/flow.aggregation-mpi/project.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/projects/flow.aggregation-mpi/project.py b/projects/flow.aggregation-mpi/project.py index 842a788e..89e776c4 100644 --- a/projects/flow.aggregation-mpi/project.py +++ b/projects/flow.aggregation-mpi/project.py @@ -1,4 +1,4 @@ -from flow import FlowProject, aggregator, directives +from flow import FlowProject, aggregator class Project(FlowProject): @@ -22,9 +22,10 @@ def mpi_task(job, comm): print(f"In the mpi_task function, {rank=} of {size=} has {data=}.") -@Project.operation -@directives(nranks=lambda *jobs: RANKS_PER_JOB * len(jobs)) -@aggregator.groupsof(num=JOBS_PER_AGGREGATE) +@Project.operation( + directives={"nranks": lambda *jobs: RANKS_PER_JOB * len(jobs)}, + aggregator=aggregator.groupsof(num=JOBS_PER_AGGREGATE), +) def do_mpi_task(*jobs): from mpi4py import MPI From f99bca9ef5febf607a6f66399ef2e2c8c263a20e Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:26:15 -0400 Subject: [PATCH 03/12] refactor: Update flow.gmx-mtools example --- projects/flow.gmx-mtools/src/project.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/projects/flow.gmx-mtools/src/project.py b/projects/flow.gmx-mtools/src/project.py index 1fa78fc9..3a70b6b3 100644 --- a/projects/flow.gmx-mtools/src/project.py +++ b/projects/flow.gmx-mtools/src/project.py @@ -1,5 +1,4 @@ """Define the project's workflow logic.""" -import flow import mbuild as mb import signac from flow import FlowProject @@ -23,7 +22,7 @@ def _mdrun_str(op_name): def gromacs_command(name, gro, sys): """Simplify GROMACS operations""" - return "cd {{job.ws}} ; {} && {}".format( + return "{} && {}".format( _grompp_str(project_root_directory, name, gro, sys), _mdrun_str(name), ) @@ -67,28 +66,25 @@ def initialize(job): system.save("init.top", forcefield_name="oplsaa", overwrite=True) -@MyProject.operation @MyProject.pre(initialized) @MyProject.post(minimized) -@flow.cmd +@MyProject.operation(cmd=True, with_job=True) def em(job): - return gromacs_command(name="em", gro="init", sys="init") + return gromacs_command(name="em", gro="init", sys="init").format(job) -@MyProject.operation @MyProject.pre(minimized) @MyProject.post(equilibrated) -@flow.cmd +@MyProject.operation(cmd=True, with_job=True) def equil(job): - return gromacs_command(name="equil", gro="em", sys="init") + return gromacs_command(name="equil", gro="em", sys="init").format(job) -@MyProject.operation @MyProject.pre(equilibrated) @MyProject.post(sampled) -@flow.cmd +@MyProject.operation(cmd=True, with_job=True) def sample(job): - return gromacs_command(name="sample", gro="equil", sys="init") + return gromacs_command(name="sample", gro="equil", sys="init").format(job) if __name__ == "__main__": From 51318c04e82887b0ca83730db37fb8abc3ad4e2b Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:30:11 -0400 Subject: [PATCH 04/12] refactor: Update flow.2D-random walk operation decorator --- projects/flow.2D-random-walk/src/project.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/projects/flow.2D-random-walk/src/project.py b/projects/flow.2D-random-walk/src/project.py index 86324aff..b8f4e638 100644 --- a/projects/flow.2D-random-walk/src/project.py +++ b/projects/flow.2D-random-walk/src/project.py @@ -124,10 +124,9 @@ def plot_mean_squared_displacement(job): @agg_analyze_and_plot -@std_aggregator @RandomWalkProject.pre(all_simulated) @RandomWalkProject.post.true("msd_analyzed") -@RandomWalkProject.operation +@RandomWalkProject.operation(aggregator=std_aggregator) def compute_mean_squared_displacement(*jobs): """Compute and store the mean squared displacement for all std.""" msd = np.zeros(jobs[0].doc.run_steps + 1) @@ -142,12 +141,15 @@ def compute_mean_squared_displacement(*jobs): # Since this uses a separate aggragator to restrict aggregates to the first 5 replicas, # we cannot assign this operation to either agg_plot or agg_analyze_and_plot -@flow.aggregator.groupby( - "standard_deviation", select=lambda job: job.sp.replica <= 4, sort_by="replica" -) + + @RandomWalkProject.pre(all_simulated) @RandomWalkProject.post.true("plotted_walks") -@RandomWalkProject.operation +@RandomWalkProject.operation( + aggregator=flow.aggregator.groupby( + "standard_deviation", select=lambda job: job.sp.replica <= 4, sort_by="replica" + ) +) def plot_walks(*jobs): """Plot the first 5 replicas random walks for each standard_deviation.""" fig, ax = plt.subplots() @@ -168,10 +170,9 @@ def plot_walks(*jobs): @agg_analyze_and_plot @agg_plot -@std_aggregator @RandomWalkProject.pre(all_simulated) @RandomWalkProject.post.true("plotted_histogram") -@RandomWalkProject.operation +@RandomWalkProject.operation(aggregator=std_aggregator) def plot_histogram(*jobs): """Create a 2D histogram of the final positions of random walks per std.""" final_positions = np.array( From ef4a212409e943e39a675e4902abb506485ec8ba Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:31:41 -0400 Subject: [PATCH 05/12] refactor: Update flow.aggregation-plotting operation decorator --- projects/flow.aggregation-plotting/src/project.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/projects/flow.aggregation-plotting/src/project.py b/projects/flow.aggregation-plotting/src/project.py index 3be75bbe..b4bd505f 100644 --- a/projects/flow.aggregation-plotting/src/project.py +++ b/projects/flow.aggregation-plotting/src/project.py @@ -15,8 +15,7 @@ def get_pressure(crystal_name, density): return pressure -@aggregator.groupby("crystal", sort_by="density") -@Project.operation +@Project.operation(aggregator=aggregator.groupby("crystal", sort_by="density")) def plot_pressure_by_crystal(*jobs): """Plot the pressure as a function of density for each group.""" pressures = {} @@ -40,8 +39,7 @@ def plot_pressure_by_crystal(*jobs): plt.close() -@aggregator(sort_by="density") -@Project.operation +@Project.operation(aggregator=aggregator(sort_by="density")) def plot_pressure_all(*jobs): """Plot pressure for all data on the same axes.""" crystal_pressures = {"fcc": [], "bcc": []} From fa909b7fee70af787e21326c3b44c43d0220d761 Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 31 Oct 2022 12:33:29 -0400 Subject: [PATCH 06/12] refactor: Update Aggregation tutorial with new operation decorator --- notebooks/signac_301_Aggregation_Tutorial.ipynb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/notebooks/signac_301_Aggregation_Tutorial.ipynb b/notebooks/signac_301_Aggregation_Tutorial.ipynb index 35fc6b73..bd76daa5 100644 --- a/notebooks/signac_301_Aggregation_Tutorial.ipynb +++ b/notebooks/signac_301_Aggregation_Tutorial.ipynb @@ -36,7 +36,7 @@ "\n", "Aggregation allows a **signac-flow** operation to act on multiple jobs, rather than one job at a time.\n", "\n", - "An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when the `@flow.aggregator` decorator is applied to an operation.\n", + "An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when a `flow.aggregator` object is applied to the `FlowProject.operation` directive.\n", "\n", "Please refer to the [documentation](https://docs.signac.io/en/latest/aggregation.html) for detailed instructions on how to use aggregation." ] @@ -189,8 +189,7 @@ " pass\n", "\n", "\n", - "@aggregator()\n", - "@AggregationProject.operation\n", + "@AggregationProject.operation(aggregator=aggregator=())\n", "@AggregationProject.post(lambda *jobs: project.doc.get(\"average_temperature\", False))\n", "def compute_average_temperature(*jobs):\n", " \"\"\"Compute the average temperature using the state point parameter,\n", @@ -201,8 +200,7 @@ " project.document[\"average_temperature\"] = float(average_temp)\n", "\n", "\n", - "@aggregator(sort_by=\"day\")\n", - "@AggregationProject.operation\n", + "@AggregationProject.operation(aggregator=aggregator(sort_by=\"day\"))\n", "@AggregationProject.pre.after(compute_average_temperature)\n", "def plot_daily_temperature(*jobs):\n", " \"\"\"Graph of daily temperature for the year.\"\"\"\n", @@ -224,8 +222,8 @@ " plt.show()\n", "\n", "\n", - "@aggregator(sort_by=\"day\", select=lambda job: job.sp[\"day\"] % 7 == 0)\n", - "@AggregationProject.operation\n", + "\n", + "@AggregationProject.operation(aggregator=aggregator(sort_by=\"day\", select=lambda job: job.sp[\"day\"] % 7 == 0))\n", "def plot_weekly_temperature(*jobs):\n", " \"\"\"Graph the temperature for only one day of each week.\"\"\"\n", " print(\"Generating plot of weekly temperature.\")\n", From 4c19bf77fce8660b722e2d28d736705a298779a0 Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Thu, 10 Nov 2022 12:10:29 -0500 Subject: [PATCH 07/12] doc: Fix comment regarding aggregator use. Use a more appropriate verb. Co-authored-by: Bradley Dice --- notebooks/signac_301_Aggregation_Tutorial.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/signac_301_Aggregation_Tutorial.ipynb b/notebooks/signac_301_Aggregation_Tutorial.ipynb index bd76daa5..172f511e 100644 --- a/notebooks/signac_301_Aggregation_Tutorial.ipynb +++ b/notebooks/signac_301_Aggregation_Tutorial.ipynb @@ -36,7 +36,7 @@ "\n", "Aggregation allows a **signac-flow** operation to act on multiple jobs, rather than one job at a time.\n", "\n", - "An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when a `flow.aggregator` object is applied to the `FlowProject.operation` directive.\n", + "An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when a `flow.aggregator` object is provided to the `FlowProject.operation` decorator.\n", "\n", "Please refer to the [documentation](https://docs.signac.io/en/latest/aggregation.html) for detailed instructions on how to use aggregation." ] From 1ec7ce6c41355ff87c0c22d71a4954c37626cb72 Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Thu, 12 Oct 2023 16:37:16 -0400 Subject: [PATCH 08/12] fix: decorator ordering --- notebooks/signac_301_Aggregation_Tutorial.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/signac_301_Aggregation_Tutorial.ipynb b/notebooks/signac_301_Aggregation_Tutorial.ipynb index 172f511e..281bc593 100644 --- a/notebooks/signac_301_Aggregation_Tutorial.ipynb +++ b/notebooks/signac_301_Aggregation_Tutorial.ipynb @@ -189,8 +189,8 @@ " pass\n", "\n", "\n", - "@AggregationProject.operation(aggregator=aggregator=())\n", "@AggregationProject.post(lambda *jobs: project.doc.get(\"average_temperature\", False))\n", + "@AggregationProject.operation(aggregator=aggregator=())\n", "def compute_average_temperature(*jobs):\n", " \"\"\"Compute the average temperature using the state point parameter,\n", " \"temperature\", of all jobs present in the signac project and\n", From 4c3c3076f263039ceb2d32d2d35e2555c92b8584 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Oct 2023 20:42:09 +0000 Subject: [PATCH 09/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- projects/flow.aggregation-mpi/project.py | 1 + 1 file changed, 1 insertion(+) diff --git a/projects/flow.aggregation-mpi/project.py b/projects/flow.aggregation-mpi/project.py index 6fe3cc7a..8c517660 100644 --- a/projects/flow.aggregation-mpi/project.py +++ b/projects/flow.aggregation-mpi/project.py @@ -24,6 +24,7 @@ def mpi_task(job, comm): mpi_aggregator = aggregator.groupsof(num=JOBS_PER_AGGREGATE) + @Project.operation( aggregator=mpi_aggregator, directives={"nranks": lambda *jobs: RANKS_PER_JOB * len(jobs)}, From c8cd0412d1f194fd7361eee9e690813978f9b95b Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Mon, 13 Nov 2023 18:01:01 -0500 Subject: [PATCH 10/12] fix: Remove merge conflict --- projects/flow.2D-random-walk/src/project.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/projects/flow.2D-random-walk/src/project.py b/projects/flow.2D-random-walk/src/project.py index 283fa1d2..f80bf907 100644 --- a/projects/flow.2D-random-walk/src/project.py +++ b/projects/flow.2D-random-walk/src/project.py @@ -147,15 +147,7 @@ def compute_mean_squared_displacement(*jobs): @RandomWalkProject.pre(all_simulated) @RandomWalkProject.post.true("plotted_walks") -<<<<<<< HEAD -@RandomWalkProject.operation( - aggregator=flow.aggregator.groupby( - "standard_deviation", select=lambda job: job.sp.replica <= 4, sort_by="replica" - ) -) -======= @RandomWalkProject.operation(aggregator=first_five_replicas) ->>>>>>> main def plot_walks(*jobs): """Plot the first 5 replicas random walks for each standard_deviation.""" fig, ax = plt.subplots() From 689fb6faf3b2f6f2784b4dc0894ce330458e4af4 Mon Sep 17 00:00:00 2001 From: Brandon Butler Date: Tue, 14 Nov 2023 14:54:10 -0500 Subject: [PATCH 11/12] fix: Add actual mpi/OMP parallelism to gmx-lysozyme --- projects/flow.gmx-lysozyme-in-water/project.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/flow.gmx-lysozyme-in-water/project.py b/projects/flow.gmx-lysozyme-in-water/project.py index 3a42ad58..7ffcba5b 100644 --- a/projects/flow.gmx-lysozyme-in-water/project.py +++ b/projects/flow.gmx-lysozyme-in-water/project.py @@ -239,9 +239,9 @@ def grompp_md(job): @MyProject.pre.after(grompp_md) @MyProject.post(finished) -@MyProject.operation(cmd=True, directives={"np": 16}, with_job=True) +@MyProject.operation(cmd=True, directives={"nranks": 4, "omp_num_threads": 4}, with_job=True) def md(job): - return _mdrun_str("md").format(job) + return _mdrun_str("md", nt=4).format(job) if __name__ == "__main__": From f4c4d8eb5bbca57ba42a6a5632d1432b4a0e3cdf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Nov 2023 19:55:16 +0000 Subject: [PATCH 12/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- projects/flow.gmx-lysozyme-in-water/project.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/projects/flow.gmx-lysozyme-in-water/project.py b/projects/flow.gmx-lysozyme-in-water/project.py index 7ffcba5b..b6e757bc 100644 --- a/projects/flow.gmx-lysozyme-in-water/project.py +++ b/projects/flow.gmx-lysozyme-in-water/project.py @@ -239,7 +239,9 @@ def grompp_md(job): @MyProject.pre.after(grompp_md) @MyProject.post(finished) -@MyProject.operation(cmd=True, directives={"nranks": 4, "omp_num_threads": 4}, with_job=True) +@MyProject.operation( + cmd=True, directives={"nranks": 4, "omp_num_threads": 4}, with_job=True +) def md(job): return _mdrun_str("md", nt=4).format(job)