Skip to content

Commit

Permalink
Implement sensible partitioning (#724)
Browse files Browse the repository at this point in the history
* Added calculate_n_partitions function

* Implement calculate_n_partitions_function in parallel_groupby

* Implement calculate_n_partitions function in parallel_get_rms_measurements

* Implement calculate_n_partitions function in calculate_measurement_pair_metrics

* Implement calculate_n_partitions function in parallel_groupby

* Set index

* Missing commit

* Fixed df name

* Fixed df name

* Fixed df name

* Fix partition size var name

* Correct calculate_n_partitions import

* Fixed circular import

* Updated changelog
  • Loading branch information
ddobie authored Jul 12, 2024
1 parent 6209e0d commit a6a7c91
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Added support calculate_n_partitions for sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724)
- Added support for compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694)
- Added links to Data Central DAS and the Fink Broker to the source page [#697](https://github.com/askap-vast/vast-pipeline/pull/697/)
- Added `n_new_sources` column to run model to store the number of new sources in a pipeline run [#676](https://github.com/askap-vast/vast-pipeline/pull/676).
Expand Down Expand Up @@ -76,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Fixed

- Implemented sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724)
- Fixed outdated `runpipeline` section on CLI docs page [#685](https://github.com/askap-vast/vast-pipeline/pull/685).
- Fixed link to JupyterHub [#676](https://github.com/askap-vast/vast-pipeline/pull/676).
- Ensure Image models are not created if the catalogue ingest fails [#648](https://github.com/askap-vast/vast-pipeline/pull/648).
Expand Down Expand Up @@ -114,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#724](https://github.com/askap-vast/vast-pipeline/pull/724): fix, feat: Implemented sensible dask dataframe partitioning
- [#694](https://github.com/askap-vast/vast-pipeline/pull/694): feat: Handle compressed fits files.
- [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8.
- [#701](https://github.com/askap-vast/vast-pipeline/pull/701): fix: Update Gr1N poetry to v8, force python 3.8.10.
Expand Down
8 changes: 5 additions & 3 deletions vast_pipeline/pipeline/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
reconstruct_associtaion_dfs
)
from vast_pipeline.pipeline.config import PipelineConfig
from vast_pipeline.utils.utils import StopWatch
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1588,10 +1588,12 @@ def parallel_association(
id_incr_par_assoc = max(done_source_ids) if add_mode else 0

n_cpu = cpu_count() - 1

logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(images_df, n_cpu)

# pass each skyreg_group through the normal association process.
results = (
dd.from_pandas(images_df, n_cpu)
dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=n_partitions)
.groupby('skyreg_group')
.apply(
association,
Expand Down
6 changes: 4 additions & 2 deletions vast_pipeline/pipeline/new_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)

from vast_pipeline.models import Image, Run
from vast_pipeline.utils.utils import StopWatch
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions
from vast_pipeline.image.utils import open_fits


Expand Down Expand Up @@ -219,9 +219,11 @@ def parallel_get_rms_measurements(
}

n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(out, n_cpu)

out = (
dd.from_pandas(out, n_cpu)
dd.from_pandas(out, npartitions=n_partitions)
.groupby('img_diff_rms_path')
.apply(
get_image_rms_measurements,
Expand Down
6 changes: 5 additions & 1 deletion vast_pipeline/pipeline/pairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import pandas as pd
from psutil import cpu_count

from vast_pipeline.utils.utils import calculate_n_partitions


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,6 +67,8 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame:
m_peak, m_int - variability modulation index
"""
n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df.set_index('source'), n_cpu)

"""Create a DataFrame containing all measurement ID combinations per source.
Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is
Expand All @@ -86,7 +90,7 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame:
11128 0 6216 23534
"""
measurement_combinations = (
dd.from_pandas(df, n_cpu)
dd.from_pandas(df, npartitions=n_partitions)
.groupby("source")["id"]
.apply(
lambda x: pd.DataFrame(list(combinations(x, 2))), meta={0: "i", 1: "i"},)
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from vast_pipeline.image.main import FitsImage, SelavyImage
from vast_pipeline.image.utils import open_fits
from vast_pipeline.utils.utils import (
eq_to_cart, StopWatch, optimize_ints, optimize_floats
eq_to_cart, StopWatch, optimize_ints, optimize_floats,
calculate_n_partitions
)
from vast_pipeline.models import (
Band, Image, Run, SkyRegion
Expand Down Expand Up @@ -704,7 +705,10 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame:
'related_list': 'O'
}
n_cpu = cpu_count() - 1
out = dd.from_pandas(df, n_cpu)
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df, n_cpu)

out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions)
out = (
out.groupby('source')
.apply(
Expand Down Expand Up @@ -763,7 +767,10 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame:
'wavg_dec': 'f',
}
n_cpu = cpu_count() - 1
out = dd.from_pandas(df, n_cpu)
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df, n_cpu)

out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions)
out = (
out.groupby('source')
.apply(calc_ave_coord, meta=col_dtype)
Expand Down
23 changes: 23 additions & 0 deletions vast_pipeline/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,26 @@ def dict_merge(

def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"):
return datetime.now().strftime(fmt).format(fname=fname)

def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
"""
This function will calculate how many partitions a dataframe should be
split into.
Args:
df: The pandas dataframe to be partitionined.
n_cpu: The number of available CPUs.
partition_size: The optimal partition size in MB.
Returns:
The optimal number of partitions.
"""
mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6
n_partitions = int(np.ceil(mem_usage_mb/partition_size_mb))

# n_partitions should be >= n_cpu for optimal parallel processing
if n_partitions < n_cpu:
n_partitions=n_cpu

logger.debug("Using {n_partitions} partions of {partition_size}MB")

return n_partitions

0 comments on commit a6a7c91

Please sign in to comment.