From a6a7c91a18eff61d52eb1f014a155c035259a3b7 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 12 Jul 2024 11:46:08 +1000 Subject: [PATCH] Implement sensible partitioning (#724) * Added calculate_n_partitions function * Implement calculate_n_partitions_function in parallel_groupby * Implement calculate_n_partitions function in parallel_get_rms_measurements * Implement calculate_n_partitions function in calculate_measurement_pair_metrics * Implement calculate_n_partitions function in parallel_groupby * Set index * Missing commit * Fixed df name * Fixed df name * Fixed df name * Fix partition size var name * Correct calculate_n_partitions import * Fixed circular import * Updated changelog --- CHANGELOG.md | 3 +++ vast_pipeline/pipeline/association.py | 8 +++++--- vast_pipeline/pipeline/new_sources.py | 6 ++++-- vast_pipeline/pipeline/pairs.py | 6 +++++- vast_pipeline/pipeline/utils.py | 13 ++++++++++--- vast_pipeline/utils/utils.py | 23 +++++++++++++++++++++++ 6 files changed, 50 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d86dd369..e241e028d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Added +- Added support calculate_n_partitions for sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724) - Added support for compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694) - Added links to Data Central DAS and the Fink Broker to the source page [#697](https://github.com/askap-vast/vast-pipeline/pull/697/) - Added `n_new_sources` column to run model to store the number of new sources in a pipeline run [#676](https://github.com/askap-vast/vast-pipeline/pull/676). @@ -76,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Fixed +- Implemented sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724) - Fixed outdated `runpipeline` section on CLI docs page [#685](https://github.com/askap-vast/vast-pipeline/pull/685). - Fixed link to JupyterHub [#676](https://github.com/askap-vast/vast-pipeline/pull/676). - Ensure Image models are not created if the catalogue ingest fails [#648](https://github.com/askap-vast/vast-pipeline/pull/648). @@ -114,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### List of PRs +- [#724](https://github.com/askap-vast/vast-pipeline/pull/724): fix, feat: Implemented sensible dask dataframe partitioning - [#694](https://github.com/askap-vast/vast-pipeline/pull/694): feat: Handle compressed fits files. - [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8. - [#701](https://github.com/askap-vast/vast-pipeline/pull/701): fix: Update Gr1N poetry to v8, force python 3.8.10. diff --git a/vast_pipeline/pipeline/association.py b/vast_pipeline/pipeline/association.py index b66940ce7..d5e46cb09 100644 --- a/vast_pipeline/pipeline/association.py +++ b/vast_pipeline/pipeline/association.py @@ -20,7 +20,7 @@ reconstruct_associtaion_dfs ) from vast_pipeline.pipeline.config import PipelineConfig -from vast_pipeline.utils.utils import StopWatch +from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions logger = logging.getLogger(__name__) @@ -1588,10 +1588,12 @@ def parallel_association( id_incr_par_assoc = max(done_source_ids) if add_mode else 0 n_cpu = cpu_count() - 1 - + logger.debug(f"Running association with {n_cpu} CPUs") + n_partitions = calculate_n_partitions(images_df, n_cpu) + # pass each skyreg_group through the normal association process. results = ( - dd.from_pandas(images_df, n_cpu) + dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=n_partitions) .groupby('skyreg_group') .apply( association, diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index d30f490dd..459f30473 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -12,7 +12,7 @@ ) from vast_pipeline.models import Image, Run -from vast_pipeline.utils.utils import StopWatch +from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions from vast_pipeline.image.utils import open_fits @@ -219,9 +219,11 @@ def parallel_get_rms_measurements( } n_cpu = cpu_count() - 1 + logger.debug(f"Running association with {n_cpu} CPUs") + n_partitions = calculate_n_partitions(out, n_cpu) out = ( - dd.from_pandas(out, n_cpu) + dd.from_pandas(out, npartitions=n_partitions) .groupby('img_diff_rms_path') .apply( get_image_rms_measurements, diff --git a/vast_pipeline/pipeline/pairs.py b/vast_pipeline/pipeline/pairs.py index 470cbab3b..a7c5eb102 100644 --- a/vast_pipeline/pipeline/pairs.py +++ b/vast_pipeline/pipeline/pairs.py @@ -6,6 +6,8 @@ import pandas as pd from psutil import cpu_count +from vast_pipeline.utils.utils import calculate_n_partitions + logger = logging.getLogger(__name__) @@ -65,6 +67,8 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: m_peak, m_int - variability modulation index """ n_cpu = cpu_count() - 1 + logger.debug(f"Running association with {n_cpu} CPUs") + n_partitions = calculate_n_partitions(df.set_index('source'), n_cpu) """Create a DataFrame containing all measurement ID combinations per source. Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is @@ -86,7 +90,7 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: 11128 0 6216 23534 """ measurement_combinations = ( - dd.from_pandas(df, n_cpu) + dd.from_pandas(df, npartitions=n_partitions) .groupby("source")["id"] .apply( lambda x: pd.DataFrame(list(combinations(x, 2))), meta={0: "i", 1: "i"},) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 82437141c..bca35ce4f 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -24,7 +24,8 @@ from vast_pipeline.image.main import FitsImage, SelavyImage from vast_pipeline.image.utils import open_fits from vast_pipeline.utils.utils import ( - eq_to_cart, StopWatch, optimize_ints, optimize_floats + eq_to_cart, StopWatch, optimize_ints, optimize_floats, + calculate_n_partitions ) from vast_pipeline.models import ( Band, Image, Run, SkyRegion @@ -704,7 +705,10 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: 'related_list': 'O' } n_cpu = cpu_count() - 1 - out = dd.from_pandas(df, n_cpu) + logger.debug(f"Running association with {n_cpu} CPUs") + n_partitions = calculate_n_partitions(df, n_cpu) + + out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions) out = ( out.groupby('source') .apply( @@ -763,7 +767,10 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame: 'wavg_dec': 'f', } n_cpu = cpu_count() - 1 - out = dd.from_pandas(df, n_cpu) + logger.debug(f"Running association with {n_cpu} CPUs") + n_partitions = calculate_n_partitions(df, n_cpu) + + out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions) out = ( out.groupby('source') .apply(calc_ave_coord, meta=col_dtype) diff --git a/vast_pipeline/utils/utils.py b/vast_pipeline/utils/utils.py index 6ea4904fe..2fccddd37 100644 --- a/vast_pipeline/utils/utils.py +++ b/vast_pipeline/utils/utils.py @@ -378,3 +378,26 @@ def dict_merge( def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"): return datetime.now().strftime(fmt).format(fname=fname) + +def calculate_n_partitions(df, n_cpu, partition_size_mb=100): + """ + This function will calculate how many partitions a dataframe should be + split into. + + Args: + df: The pandas dataframe to be partitionined. + n_cpu: The number of available CPUs. + partition_size: The optimal partition size in MB. + Returns: + The optimal number of partitions. + """ + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + n_partitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + + # n_partitions should be >= n_cpu for optimal parallel processing + if n_partitions < n_cpu: + n_partitions=n_cpu + + logger.debug("Using {n_partitions} partions of {partition_size}MB") + + return n_partitions