Skip to content

Commit

Permalink
Fix unnecessary serialisation of PassManager in serial contexts (#1…
Browse files Browse the repository at this point in the history
…2410)

* Fix unnecessary serialisation of `PassManager` in serial contexts

This exposes the interal decision in `parallel_map` of whether to
actually run in serial or not.  If not, there's no need for
`PassManager` to side-car its `dill` serialisation onto the side of the
IPC (we use `dill` because we need to pickle lambdas), which can be an
unfortunately huge cost for certain IBM pulse-enabled backends.

* Remove new function from public API

This makes the patch series safe for backport to 1.1.

(cherry picked from commit b12e9ec)

# Conflicts:
#	qiskit/utils/__init__.py
#	qiskit/utils/parallel.py
  • Loading branch information
jakelishman authored and mergify[bot] committed Jun 4, 2024
1 parent 7da7bf4 commit 5befb88
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
22 changes: 11 additions & 11 deletions qiskit/passmanager/passmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import dill

from qiskit.utils.parallel import parallel_map
from qiskit.utils.parallel import parallel_map, should_run_in_parallel
from .base_tasks import Task, PassManagerIR
from .exceptions import PassManagerError
from .flow_controllers import FlowControllerLinear
Expand Down Expand Up @@ -220,16 +220,16 @@ def callback_func(**kwargs):
in_programs = [in_programs]
is_list = False

if len(in_programs) == 1:
out_program = _run_workflow(
program=in_programs[0],
pass_manager=self,
callback=callback,
**kwargs,
)
if is_list:
return [out_program]
return out_program
# If we're not going to run in parallel, we want to avoid spending time `dill` serialising
# ourselves, since that can be quite expensive.
if len(in_programs) == 1 or not should_run_in_parallel(num_processes):
out = [
_run_workflow(program=program, pass_manager=self, callback=callback, **kwargs)
for program in in_programs
]
if len(in_programs) == 1 and not is_list:
return out[0]
return out

del callback
del kwargs
Expand Down
10 changes: 10 additions & 0 deletions qiskit/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
.. autofunction:: detach_prefix
.. autofunction:: wrap_method
<<<<<<< HEAD
Algorithm Utilities
===================
Expand Down Expand Up @@ -58,6 +59,10 @@
``ProcessPoolExecutor``. Tasks can be executed in parallel using this function.
It has a built-in event publisher to show the progress of the parallel
tasks.
=======
A helper function for calling a custom function with Python
:class:`~concurrent.futures.ProcessPoolExecutor`. Tasks can be executed in parallel using this function.
>>>>>>> b12e9ec3c (Fix unnecessary serialisation of `PassManager` in serial contexts (#12410))
.. autofunction:: parallel_map
Expand All @@ -84,13 +89,17 @@

from . import optionals

<<<<<<< HEAD
from .circuit_utils import summarize_circuits
from .entangler_map import get_entangler_map, validate_entangler_map
from .backend_utils import has_ibmq, has_aer
from .name_unnamed_args import name_args
from .algorithm_globals import algorithm_globals

from .parallel import parallel_map
=======
from .parallel import parallel_map, should_run_in_parallel
>>>>>>> b12e9ec3c (Fix unnecessary serialisation of `PassManager` in serial contexts (#12410))


__all__ = [
Expand All @@ -114,4 +123,5 @@
"is_main_process",
"apply_prefix",
"parallel_map",
"should_run_in_parallel",
]
36 changes: 27 additions & 9 deletions qiskit/utils/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from the multiprocessing library.
"""

from __future__ import annotations

import os
from concurrent.futures import ProcessPoolExecutor
import sys
Expand Down Expand Up @@ -103,6 +105,21 @@ def _task_wrapper(param):
return task(value, *task_args, **task_kwargs)


def should_run_in_parallel(num_processes: int | None = None) -> bool:
"""Return whether the current parallelisation configuration suggests that we should run things
like :func:`parallel_map` in parallel (``True``) or degrade to serial (``False``).
Args:
num_processes: the number of processes requested for use (if given).
"""
num_processes = CPU_COUNT if num_processes is None else num_processes
return (
num_processes > 1
and os.getenv("QISKIT_IN_PARALLEL", "FALSE") == "FALSE"
and CONFIG.get("parallel_enabled", PARALLEL_DEFAULT)
)


def parallel_map( # pylint: disable=dangerous-default-value
task, values, task_args=(), task_kwargs={}, num_processes=CPU_COUNT
):
Expand All @@ -112,21 +129,20 @@ def parallel_map( # pylint: disable=dangerous-default-value
result = [task(value, *task_args, **task_kwargs) for value in values]
On Windows this function defaults to a serial implementation to avoid the
overhead from spawning processes in Windows.
This will parallelise the results if the number of ``values`` is greater than one, and the
current system configuration permits parallelization.
Args:
task (func): Function that is to be called for each value in ``values``.
values (array_like): List or array of values for which the ``task``
function is to be evaluated.
values (array_like): List or array of values for which the ``task`` function is to be
evaluated.
task_args (list): Optional additional arguments to the ``task`` function.
task_kwargs (dict): Optional additional keyword argument to the ``task`` function.
num_processes (int): Number of processes to spawn.
Returns:
result: The result list contains the value of
``task(value, *task_args, **task_kwargs)`` for
each value in ``values``.
result: The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for
each value in ``values``.
Raises:
QiskitError: If user interrupts via keyboard.
Expand All @@ -147,6 +163,7 @@ def func(_):
if len(values) == 1:
return [task(values[0], *task_args, **task_kwargs)]

<<<<<<< HEAD
Publisher().publish("terra.parallel.start", len(values))
nfinished = [0]

Expand All @@ -160,6 +177,9 @@ def _callback(_):
and os.getenv("QISKIT_IN_PARALLEL") == "FALSE"
and CONFIG.get("parallel_enabled", PARALLEL_DEFAULT)
):
=======
if should_run_in_parallel(num_processes):
>>>>>>> b12e9ec3c (Fix unnecessary serialisation of `PassManager` in serial contexts (#12410))
os.environ["QISKIT_IN_PARALLEL"] = "TRUE"
try:
results = []
Expand All @@ -183,8 +203,6 @@ def _callback(_):
os.environ["QISKIT_IN_PARALLEL"] = "FALSE"
return results

# Cannot do parallel on Windows , if another parallel_map is running in parallel,
# or len(values) == 1.
results = []
for _, value in enumerate(values):
result = task(value, *task_args, **task_kwargs)
Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/parallel-check-8186a8f074774a1f.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
:meth:`.PassManager.run` will no longer waste time serializing itself when given multiple inputs
if it is only going to work in serial.

0 comments on commit 5befb88

Please sign in to comment.