diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 239622f9b..7ae9c5f20 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -23,7 +23,7 @@ jobs: - name: Install MQT Predictor run: pip install .[coverage] - name: Generate Report - run: pytest -v --cov --cov-config=pyproject.toml --cov-report=xml --ignore=tests/compilation/test_pretrained_models.py + run: pytest -v --cov --cov-config=pyproject.toml --cov-report=xml --ignore=tests/test_pretrained_models.py env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Upload coverage to Codecov diff --git a/pyproject.toml b/pyproject.toml index 6e9b197d5..4e95e7ee0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,11 +20,12 @@ dynamic = ["version"] dependencies = [ "mqt.bench @ git+https://github.com/cda-tum/mqt-bench.git", - "sb3_contrib>=2.0.0", - "scikit-learn>=1.3.0, <1.3.3", + "sb3_contrib>=2.0.0, <2.2.2", + "scikit-learn>=1.4.0,<1.4.2", "importlib_metadata>=4.4; python_version < '3.10'", "importlib_resources>=5.0; python_version < '3.10'", - "tensorboard>=2.11.0" + "tensorboard>=2.11.0, <2.16.3", + "bqskit>=1.1.0, <1.1.2", ] classifiers = [ @@ -99,7 +100,7 @@ implicit_reexport = true # recent versions of `gym` are typed, but stable-baselines3 pins a very old version of gym. # qiskit is not yet marked as typed, but is typed mostly. # the other libraries do not have type stubs. -module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*"] +module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*", "bqskit.*"] ignore_missing_imports = true [tool.ruff] diff --git a/src/mqt/predictor/ml/Predictor.py b/src/mqt/predictor/ml/Predictor.py index 02172391c..7c894c222 100644 --- a/src/mqt/predictor/ml/Predictor.py +++ b/src/mqt/predictor/ml/Predictor.py @@ -514,7 +514,7 @@ def generate_eval_all_datapoints( tmp_res = scores_filtered_sorted_accordingly[i] max_score = max(tmp_res) for j in range(len(tmp_res)): - plt.plot(i, tmp_res[j] / max_score, "b.", alpha=1.0, markersize=1.7, color=color_all) + plt.plot(i, tmp_res[j] / max_score, alpha=1.0, markersize=1.7, color=color_all) plt.plot( i, diff --git a/src/mqt/predictor/rl/Predictor.py b/src/mqt/predictor/rl/Predictor.py index ddc7e7c1a..43c23f7e4 100644 --- a/src/mqt/predictor/rl/Predictor.py +++ b/src/mqt/predictor/rl/Predictor.py @@ -23,7 +23,7 @@ def __init__(self, figure_of_merit: reward.figure_of_merit, device_name: str, lo logger.setLevel(logger_level) self.model = None - self.env = rl.PredictorEnv(figure_of_merit, device_name) + self.env = rl.PredictorEnv(reward_function=figure_of_merit, device_name=device_name) self.device_name = device_name self.figure_of_merit = figure_of_merit @@ -50,7 +50,7 @@ def compile_as_predicted( raise RuntimeError(msg) from e assert self.model - obs, _ = self.env.reset(qc) # type: ignore[unreachable] + obs, _ = self.env.reset(qc, seed=0) # type: ignore[unreachable] used_compilation_passes = [] terminated = False @@ -62,7 +62,6 @@ def compile_as_predicted( action_item = self.env.action_set[action] used_compilation_passes.append(action_item["name"]) obs, reward_val, terminated, truncated, info = self.env.step(action) - self.env.state._layout = self.env.layout # noqa: SLF001 if not self.env.error_occured: return self.env.state, used_compilation_passes @@ -94,11 +93,9 @@ def train_model( progress_bar = True logger.debug("Start training for: " + self.figure_of_merit + " on " + self.device_name) - env = rl.PredictorEnv(reward_function=self.figure_of_merit, device_name=self.device_name) - model = MaskablePPO( MaskableMultiInputActorCriticPolicy, - env, + self.env, verbose=verbose, tensorboard_log="./" + model_name + "_" + self.figure_of_merit + "_" + self.device_name, gamma=0.98, diff --git a/src/mqt/predictor/rl/PredictorEnv.py b/src/mqt/predictor/rl/PredictorEnv.py index db28979f6..74359a779 100644 --- a/src/mqt/predictor/rl/PredictorEnv.py +++ b/src/mqt/predictor/rl/PredictorEnv.py @@ -7,13 +7,14 @@ from pathlib import Path import numpy as np +from bqskit.ext import bqskit_to_qiskit, qiskit_to_bqskit from gymnasium import Env from gymnasium.spaces import Box, Dict, Discrete +from pytket.circuit import Qubit from pytket.extensions.qiskit import qiskit_to_tk, tk_to_qiskit from qiskit import QuantumCircuit -from qiskit.transpiler import CouplingMap, PassManager +from qiskit.transpiler import CouplingMap, PassManager, TranspileLayout from qiskit.transpiler.passes import CheckMap, GatesInBasis -from qiskit.transpiler.runningpassmanager import TranspileLayout from mqt.bench.devices import get_device_by_name from mqt.predictor import reward, rl @@ -35,6 +36,7 @@ def __init__( self.actions_routing_indices = [] self.actions_mapping_indices = [] self.actions_opt_indices = [] + self.actions_final_optimization_indices = [] self.used_actions: list[str] = [] self.device = get_device_by_name(device_name) @@ -60,6 +62,10 @@ def __init__( self.action_set[index] = elem self.actions_mapping_indices.append(index) index += 1 + for elem in rl.helper.get_actions_final_optimization(): + self.action_set[index] = elem + self.actions_final_optimization_indices.append(index) + index += 1 self.action_set[index] = rl.helper.get_action_terminate() self.action_terminate_index = index @@ -67,7 +73,10 @@ def __init__( self.reward_function = reward_function self.action_space = Discrete(len(self.action_set.keys())) self.num_steps = 0 - self.layout = None + self.layout: TranspileLayout | None = None + self.num_qubits_uncompiled_circuit = 0 + + self.has_parametrized_gates = False spaces = { "num_qubits": Discrete(128), @@ -113,6 +122,7 @@ def step(self, action: int) -> tuple[dict[str, Any], float, bool, bool, dict[Any if self.state.count_ops().get("unitary"): self.state = self.state.decompose(gates_to_decompose="unitary") + self.state._layout = self.layout # noqa: SLF001 obs = rl.helper.create_feature_dict(self.state) return obs, reward_val, done, False, {} @@ -160,11 +170,34 @@ def reset( self.valid_actions = self.actions_opt_indices + self.actions_synthesis_indices self.error_occured = False + + self.num_qubits_uncompiled_circuit = self.state.num_qubits + self.has_parametrized_gates = len(self.state.parameters) > 0 return rl.helper.create_feature_dict(self.state), {} def action_masks(self) -> list[bool]: """Returns a list of valid actions for the current state.""" - return [action in self.valid_actions for action in self.action_set] + action_mask = [action in self.valid_actions for action in self.action_set] + + # it is not clear how tket will handle the layout, so we remove all actions that are from "origin"=="tket" if a layout is set + if self.layout is not None: + action_mask = [ + action_mask[i] and self.action_set[i].get("origin") != "tket" for i in range(len(action_mask)) + ] + + if self.has_parametrized_gates or self.layout is not None: + # remove all actions that are from "origin"=="bqskit" because they are not supported for parametrized gates + # or after layout since using BQSKit after a layout is set may result in an error + action_mask = [ + action_mask[i] and self.action_set[i].get("origin") != "bqskit" for i in range(len(action_mask)) + ] + + # only allow VF2PostLayout if "ibm" is in the device name + if "ibm" not in self.device.name: + action_mask = [ + action_mask[i] and self.action_set[i].get("name") != "VF2PostLayout" for i in range(len(action_mask)) + ] + return action_mask def apply_action(self, action_index: int) -> QuantumCircuit | None: """Applies the given action to the current state and returns the altered state.""" @@ -172,21 +205,20 @@ def apply_action(self, action_index: int) -> QuantumCircuit | None: action = self.action_set[action_index] if action["name"] == "terminate": return self.state - if ( - action_index - in self.actions_layout_indices + self.actions_routing_indices + self.actions_mapping_indices - ): - transpile_pass = action["transpile_pass"](self.device.coupling_map) - elif action_index in self.actions_synthesis_indices: - transpile_pass = action["transpile_pass"](self.device.basis_gates) - else: + if action_index in self.actions_opt_indices: transpile_pass = action["transpile_pass"] + else: + transpile_pass = action["transpile_pass"](self.device) + if action["origin"] == "qiskit": try: if action["name"] == "QiskitO3": pm = PassManager() pm.append( - action["transpile_pass"](self.device.basis_gates, CouplingMap(self.device.coupling_map)), + action["transpile_pass"]( + self.device.basis_gates, + CouplingMap(self.device.coupling_map) if self.layout is not None else None, + ), do_while=action["do_while"], ) else: @@ -201,20 +233,55 @@ def apply_action(self, action_index: int) -> QuantumCircuit | None: self.error_occured = True return None - if action_index in self.actions_layout_indices + self.actions_mapping_indices: - assert pm.property_set["layout"] - self.layout = TranspileLayout( - initial_layout=pm.property_set["layout"], - input_qubit_mapping=pm.property_set["original_qubit_indices"], - final_layout=pm.property_set["final_layout"], - ) + if ( + action_index + in self.actions_layout_indices + + self.actions_mapping_indices + + self.actions_final_optimization_indices + ): + if action["name"] == "VF2Layout": + if pm.property_set["layout"]: + altered_qc, pm = rl.helper.postprocess_VF2Layout( + altered_qc, + pm.property_set["layout"], + pm.property_set["original_qubit_indices"], + pm.property_set["final_layout"], + self.device, + ) + elif action["name"] == "VF2PostLayout": + assert pm.property_set["VF2PostLayout_stop_reason"] is not None + post_layout = pm.property_set["post_layout"] + if post_layout: + altered_qc, pm = rl.helper.postprocess_VF2PostLayout(altered_qc, post_layout, self.layout) + else: + assert pm.property_set["layout"] + + if pm.property_set["layout"]: + self.layout = TranspileLayout( + initial_layout=pm.property_set["layout"], + input_qubit_mapping=pm.property_set["original_qubit_indices"], + final_layout=pm.property_set["final_layout"], + _output_qubit_list=altered_qc.qubits, + _input_qubit_count=self.num_qubits_uncompiled_circuit, + ) + + elif action_index in self.actions_routing_indices: + assert self.layout is not None + self.layout.final_layout = pm.property_set["final_layout"] elif action["origin"] == "tket": try: - tket_qc = qiskit_to_tk(self.state) + tket_qc = qiskit_to_tk(self.state, preserve_param_uuid=True) for elem in transpile_pass: elem.apply(tket_qc) + qbs = tket_qc.qubits + qubit_map = {qbs[i]: Qubit("q", i) for i in range(len(qbs))} + tket_qc.rename_units(qubit_map) # type: ignore[arg-type] altered_qc = tk_to_qiskit(tket_qc) + if action_index in self.actions_routing_indices: + assert self.layout is not None + self.layout.final_layout = rl.helper.final_layout_pytket_to_qiskit(tket_qc, altered_qc) + except Exception: logger.exception( "Error in executing TKET transpile pass for {action} at step {i} for {filename}".format( @@ -224,6 +291,28 @@ def apply_action(self, action_index: int) -> QuantumCircuit | None: self.error_occured = True return None + elif action["origin"] == "bqskit": + try: + bqskit_qc = qiskit_to_bqskit(self.state) + if action_index in self.actions_opt_indices + self.actions_synthesis_indices: + bqskit_compiled_qc = transpile_pass(bqskit_qc) + altered_qc = bqskit_to_qiskit(bqskit_compiled_qc) + elif action_index in self.actions_mapping_indices: + bqskit_compiled_qc, initial_layout, final_layout = transpile_pass(bqskit_qc) + altered_qc = bqskit_to_qiskit(bqskit_compiled_qc) + layout = rl.helper.final_layout_bqskit_to_qiskit( + initial_layout, final_layout, altered_qc, self.state + ) + self.layout = layout + except Exception: + logger.exception( + "Error in executing BQSKit transpile pass for {action} at step {i} for {filename}".format( + action=action["name"], i=self.num_steps, filename=self.filename + ) + ) + self.error_occured = True + return None + else: error_msg = f"Origin {action['origin']} not supported." raise ValueError(error_msg) @@ -241,16 +330,19 @@ def determine_valid_actions_for_state(self) -> list[int]: only_nat_gates = check_nat_gates.property_set["all_gates_in_basis"] if not only_nat_gates: - return self.actions_synthesis_indices + self.actions_opt_indices + actions = self.actions_synthesis_indices + self.actions_opt_indices + if self.layout is not None: + actions += self.actions_routing_indices + return actions check_mapping = CheckMap(coupling_map=CouplingMap(self.device.coupling_map)) check_mapping(self.state) mapped = check_mapping.property_set["is_swap_mapped"] if mapped and self.layout is not None: - return [self.action_terminate_index, *self.actions_opt_indices] # type: ignore[unreachable] + return [self.action_terminate_index, *self.actions_opt_indices] - if self.state._layout is not None: # noqa: SLF001 + if self.layout is not None: return self.actions_routing_indices # No layout applied yet diff --git a/src/mqt/predictor/rl/helper.py b/src/mqt/predictor/rl/helper.py index 151205400..9c25ecbdd 100644 --- a/src/mqt/predictor/rl/helper.py +++ b/src/mqt/predictor/rl/helper.py @@ -8,6 +8,7 @@ import numpy as np import requests +from bqskit import MachineModel from packaging import version from pytket.architecture import Architecture from pytket.circuit import Circuit, Node, Qubit @@ -22,7 +23,7 @@ from qiskit import QuantumCircuit from qiskit.circuit.equivalence_library import StandardEquivalenceLibrary from qiskit.circuit.library import XGate, ZGate -from qiskit.transpiler import CouplingMap +from qiskit.transpiler import CouplingMap, Layout, PassManager, TranspileLayout from qiskit.transpiler.passes import ( ApplyLayout, BasicSwap, @@ -44,14 +45,13 @@ OptimizeCliffords, RemoveDiagonalGatesBeforeMeasure, SabreLayout, - SabreSwap, Size, StochasticSwap, TrivialLayout, UnitarySynthesis, + VF2Layout, + VF2PostLayout, ) -from qiskit.transpiler.preset_passmanagers import common -from qiskit.transpiler.runningpassmanager import ConditionalController from sb3_contrib import MaskablePPO from tqdm import tqdm @@ -59,28 +59,29 @@ from mqt.predictor import reward, rl if TYPE_CHECKING: + from collections.abc import Callable + from numpy.typing import NDArray + from mqt.bench.devices import Device + + if TYPE_CHECKING or sys.version_info >= (3, 10, 0): from importlib import metadata, resources else: import importlib_metadata as metadata import importlib_resources as resources +from bqskit import compile as bqskit_compile +from bqskit.ir import gates +from qiskit import QuantumRegister +from qiskit.providers.fake_provider import FakeGuadalupe, FakeMontreal, FakeQuito, FakeWashington +from qiskit.transpiler.preset_passmanagers import common +from qiskit.transpiler.runningpassmanager import ConditionalController logger = logging.getLogger("mqt-predictor") -NUM_ACTIONS_OPT = 13 -NUM_ACTIONS_LAYOUT = 3 -NUM_ACTIONS_ROUTING = 4 -NUM_ACTIONS_SYNTHESIS = 1 -NUM_ACTIONS_TERMINATE = 1 -NUM_ACTIONS_DEVICES = 7 -NUM_ACTIONS_MAPPING = 1 -NUM_FEATURE_VECTOR_ELEMENTS = 7 - - def qcompile( qc: QuantumCircuit | str, figure_of_merit: reward.figure_of_merit | None = "expected_fidelity", @@ -178,18 +179,18 @@ def get_actions_opt() -> list[dict[str, Any]]: }, { "name": "QiskitO3", - "transpile_pass": lambda bgates, cmap: [ + "transpile_pass": lambda native_gate, coupling_map: [ Collect2qBlocks(), - ConsolidateBlocks(basis_gates=bgates), - UnitarySynthesis(basis_gates=bgates, coupling_map=cmap), - Optimize1qGatesDecomposition(basis=bgates), - CommutativeCancellation(basis_gates=bgates), - GatesInBasis(bgates), + ConsolidateBlocks(basis_gates=native_gate), + UnitarySynthesis(basis_gates=native_gate, coupling_map=coupling_map), + Optimize1qGatesDecomposition(basis=native_gate), + CommutativeCancellation(basis_gates=native_gate), + GatesInBasis(native_gate), ConditionalController( [ pass_ for x in common.generate_translation_passmanager( - target=None, basis_gates=bgates, coupling_map=cmap + target=None, basis_gates=native_gate, coupling_map=coupling_map ).passes() for pass_ in x["passes"] ], @@ -204,6 +205,25 @@ def get_actions_opt() -> list[dict[str, Any]]: "origin": "qiskit", "do_while": lambda property_set: (not property_set["optimization_loop_minimum_point"]), }, + { + "name": "BQSKitO2", + "transpile_pass": lambda circuit: bqskit_compile(circuit, optimization_level=2), + "origin": "bqskit", + }, + ] + + +def get_actions_final_optimization() -> list[dict[str, Any]]: + """Returns a list of dictionaries containing information about the optimization passes that are available.""" + return [ + { + "name": "VF2PostLayout", + "transpile_pass": lambda device: VF2PostLayout( + coupling_map=CouplingMap(device.coupling_map), + properties=get_ibm_backend_properties_by_device_name(device.name), + ), + "origin": "qiskit", + } ] @@ -212,9 +232,9 @@ def get_actions_layout() -> list[dict[str, Any]]: return [ { "name": "TrivialLayout", - "transpile_pass": lambda c: [ - TrivialLayout(coupling_map=CouplingMap(c)), - FullAncillaAllocation(coupling_map=CouplingMap(c)), + "transpile_pass": lambda device: [ + TrivialLayout(coupling_map=CouplingMap(device.coupling_map)), + FullAncillaAllocation(coupling_map=CouplingMap(device.coupling_map)), EnlargeWithAncilla(), ApplyLayout(), ], @@ -222,21 +242,21 @@ def get_actions_layout() -> list[dict[str, Any]]: }, { "name": "DenseLayout", - "transpile_pass": lambda c: [ - DenseLayout(coupling_map=CouplingMap(c)), - FullAncillaAllocation(coupling_map=CouplingMap(c)), + "transpile_pass": lambda device: [ + DenseLayout(coupling_map=CouplingMap(device.coupling_map)), + FullAncillaAllocation(coupling_map=CouplingMap(device.coupling_map)), EnlargeWithAncilla(), ApplyLayout(), ], "origin": "qiskit", }, { - "name": "SabreLayout", - "transpile_pass": lambda c: [ - SabreLayout(coupling_map=CouplingMap(c), skip_routing=True), - FullAncillaAllocation(coupling_map=CouplingMap(c)), - EnlargeWithAncilla(), - ApplyLayout(), + "name": "VF2Layout", + "transpile_pass": lambda device: [ + VF2Layout( + coupling_map=CouplingMap(device.coupling_map), + properties=get_ibm_backend_properties_by_device_name(device.name), + ), ], "origin": "qiskit", }, @@ -248,25 +268,20 @@ def get_actions_routing() -> list[dict[str, Any]]: return [ { "name": "BasicSwap", - "transpile_pass": lambda c: [BasicSwap(coupling_map=CouplingMap(c))], + "transpile_pass": lambda device: [BasicSwap(coupling_map=CouplingMap(device.coupling_map))], "origin": "qiskit", }, { "name": "RoutingPass", - "transpile_pass": lambda c: [ + "transpile_pass": lambda device: [ PreProcessTKETRoutingAfterQiskitLayout(), - RoutingPass(Architecture(c)), + RoutingPass(Architecture(device.coupling_map)), ], "origin": "tket", }, { "name": "StochasticSwap", - "transpile_pass": lambda c: [StochasticSwap(coupling_map=CouplingMap(c))], - "origin": "qiskit", - }, - { - "name": "SabreSwap", - "transpile_pass": lambda c: [SabreSwap(coupling_map=CouplingMap(c))], + "transpile_pass": lambda device: [StochasticSwap(coupling_map=CouplingMap(device.coupling_map))], "origin": "qiskit", }, ] @@ -277,11 +292,25 @@ def get_actions_mapping() -> list[dict[str, Any]]: return [ { "name": "SabreMapping", - "transpile_pass": lambda c: [ - SabreLayout(coupling_map=CouplingMap(c), skip_routing=False), + "transpile_pass": lambda device: [ + SabreLayout(coupling_map=CouplingMap(device.coupling_map), skip_routing=False), ], "origin": "qiskit", }, + { + "name": "BQSKitMapping", + "transpile_pass": lambda device: lambda bqskit_circuit: bqskit_compile( + bqskit_circuit, + model=MachineModel( + num_qudits=device.num_qubits, + gate_set=get_BQSKit_native_gates(device), + coupling_graph=[(elem[0], elem[1]) for elem in device.coupling_map], + ), + with_mapping=True, + optimization_level=2, + ), + "origin": "bqskit", + }, ] @@ -290,9 +319,20 @@ def get_actions_synthesis() -> list[dict[str, Any]]: return [ { "name": "BasisTranslator", - "transpile_pass": lambda g: [BasisTranslator(StandardEquivalenceLibrary, target_basis=g)], + "transpile_pass": lambda device: [ + BasisTranslator(StandardEquivalenceLibrary, target_basis=device.basis_gates) + ], "origin": "qiskit", }, + { + "name": "BQSKitSynthesis", + "transpile_pass": lambda device: lambda bqskit_circuit: bqskit_compile( + bqskit_circuit, + model=MachineModel(bqskit_circuit.num_qudits, gate_set=get_BQSKit_native_gates(device)), + optimization_level=2, + ), + "origin": "bqskit", + }, ] @@ -476,13 +516,207 @@ def handle_downloading_model(download_url: str, model_name: str) -> None: class PreProcessTKETRoutingAfterQiskitLayout: """ - Pre-processing step to route a circuit with tket after a Qiskit Layout pass has been applied. - The reason why we can apply the trivial layout here is that the circuit is already mapped by qiskit to the - device qubits and its qubits are sorted by their ascending physical qubit indices. - The trivial layout indices that this layout of the physical qubits is the identity mapping. + Pre-processing step to route a circuit with TKET after a Qiskit Layout pass has been applied. + The reason why we can apply the trivial layout here is that the circuit already got assigned a layout by qiskit. + Implicitly, Qiskit is reordering its qubits in a sequential manner, i.e., the qubit with the lowest *physical* qubit + first. + + Assuming, the layouted circuit is given by + + ┌───┐ ░ ┌─┐ + q_2 -> 0 ┤ H ├──■────────░───────┤M├ + └───┘┌─┴─┐ ░ ┌─┐└╥┘ + q_1 -> 1 ─────┤ X ├──■───░────┤M├─╫─ + └───┘┌─┴─┐ ░ ┌─┐└╥┘ ║ + q_0 -> 2 ──────────┤ X ├─░─┤M├─╫──╫─ + └───┘ ░ └╥┘ ║ ║ + ancilla_0 -> 3 ───────────────────╫──╫──╫─ + ║ ║ ║ + ancilla_1 -> 4 ───────────────────╫──╫──╫─ + ║ ║ ║ + meas: 3/═══════════════════╩══╩══╩═ + 0 1 2 + + Applying the trivial layout, we get the same qubit order as in the original circuit and can be respectively + routed. This results int: + ┌───┐ ░ ┌─┐ + q_0: ┤ H ├──■────────░───────┤M├ + └───┘┌─┴─┐ ░ ┌─┐└╥┘ + q_1: ─────┤ X ├──■───░────┤M├─╫─ + └───┘┌─┴─┐ ░ ┌─┐└╥┘ ║ + q_2: ──────────┤ X ├─░─┤M├─╫──╫─ + └───┘ ░ └╥┘ ║ ║ + q_3: ───────────────────╫──╫──╫─ + ║ ║ ║ + q_4: ───────────────────╫──╫──╫─ + ║ ║ ║ + meas: 3/═══════════════════╩══╩══╩═ + 0 1 2 + + + If we would not apply the trivial layout, no layout would be considered resulting, e.g., in the followiong circuit: + ┌───┐ ░ ┌─┐ + q_0: ─────┤ X ├─────■───░────┤M├─── + ┌───┐└─┬─┘ ┌─┴─┐ ░ ┌─┐└╥┘ + q_1: ┤ H ├──■───X─┤ X ├─░─┤M├─╫──── + └───┘ │ └───┘ ░ └╥┘ ║ ┌─┐ + q_2: ───────────X───────░──╫──╫─┤M├ + ░ ║ ║ └╥┘ + q_3: ──────────────────────╫──╫──╫─ + ║ ║ ║ + q_4: ──────────────────────╫──╫──╫─ + ║ ║ ║ + meas: 3/══════════════════════╩══╩══╩═ + 0 1 2 + """ def apply(self, circuit: Circuit) -> None: """Applies the pre-processing step to route a circuit with tket after a Qiskit Layout pass has been applied.""" mapping = {Qubit(i): Node(i) for i in range(circuit.n_qubits)} place_with_map(circuit=circuit, qmap=mapping) + + +def get_BQSKit_native_gates(device: Device) -> list[gates.Gate] | None: + """Returns the native gates of the given device. + + Args: + device: The device for which the native gates are returned. + + Returns: + list[gates.Gate]: The native gates of the given provider. + """ + provider = device.name.split("_")[0] + + native_gatesets = { + "ibm": [gates.RZGate(), gates.SXGate(), gates.XGate(), gates.CNOTGate()], + "rigetti": [gates.RXGate(), gates.RZGate(), gates.CZGate()], + "ionq": [gates.RXXGate(), gates.RZGate(), gates.RYGate(), gates.RXGate()], + "quantinuum": [gates.RZZGate(), gates.RZGate(), gates.RYGate(), gates.RXGate()], + } + + if provider not in native_gatesets: + logger.warning("No native gateset for provider " + provider + " found. No native gateset is used.") + return None + + return native_gatesets[provider] + + +def final_layout_pytket_to_qiskit(pytket_circuit: Circuit, qiskit_ciruit: QuantumCircuit) -> Layout: + pytket_layout = pytket_circuit.qubit_readout + size_circuit = pytket_circuit.n_qubits + qiskit_layout = {} + qiskit_qreg = qiskit_ciruit.qregs[0] + + pytket_layout = dict(sorted(pytket_layout.items(), key=lambda item: item[1])) + + for node, qubit_index in pytket_layout.items(): + qiskit_layout[node.index[0]] = qiskit_qreg[qubit_index] + + for i in range(size_circuit): + if i not in set(pytket_layout.values()): + qiskit_layout[i] = qiskit_qreg[i] + + return Layout(input_dict=qiskit_layout) + + +def final_layout_bqskit_to_qiskit( + bqskit_initial_layout: list[int], + bqskit_final_layout: list[int], + compiled_qc: QuantumCircuit, + initial_qc: QuantumCircuit, +) -> TranspileLayout: + # BQSKit provides an initial layout as a list[int] where each virtual qubit is mapped to a physical qubit + # similarly, it provides a final layout as a list[int] representing where each virtual qubit is mapped to at the end + # of the circuit + + ancilla = QuantumRegister(compiled_qc.num_qubits - initial_qc.num_qubits, "ancilla") + qiskit_initial_layout = {} + for i in range(compiled_qc.num_qubits): + if i in bqskit_initial_layout: + qiskit_initial_layout[i] = initial_qc.qubits[bqskit_initial_layout.index(i)] + else: + qiskit_initial_layout[i] = ancilla[i - initial_qc.num_qubits] + + initial_qubit_mapping = {bit: index for index, bit in enumerate(compiled_qc.qubits)} + + if bqskit_initial_layout == bqskit_final_layout: + qiskit_final_layout = None + else: + qiskit_final_layout = {} + for i in range(compiled_qc.num_qubits): + if i in bqskit_final_layout: + qiskit_final_layout[i] = compiled_qc.qubits[bqskit_initial_layout[bqskit_final_layout.index(i)]] + else: + qiskit_final_layout[i] = compiled_qc.qubits[i] + + return TranspileLayout( + initial_layout=Layout(input_dict=qiskit_initial_layout), + input_qubit_mapping=initial_qubit_mapping, + final_layout=Layout(input_dict=qiskit_final_layout) if qiskit_final_layout else None, + _output_qubit_list=compiled_qc.qubits, + _input_qubit_count=initial_qc.num_qubits, + ) + + +def get_ibm_backend_properties_by_device_name(device_name: str) -> Any: + """Returns the IBM backend name for the given device name. + + Args: + device_name (str): The name of the device for which the IBM backend name is returned. + + Returns: + str: The IBM backend name for the given device name. + """ + if "ibm" not in device_name: + return None + if device_name == "ibm_washington": + return FakeWashington().properties() + if device_name == "ibm_montreal": + return FakeMontreal().properties() + if device_name == "ibm_guadalupe": + return FakeGuadalupe().properties() + if device_name == "ibm_quito": + return FakeQuito().properties() + return None + + +def get_layout_postprocessing_qiskit_pass() -> ( + Callable[[Device], list[FullAncillaAllocation | EnlargeWithAncilla | ApplyLayout]] +): + return lambda device: [ + FullAncillaAllocation(coupling_map=CouplingMap(device.coupling_map)), + EnlargeWithAncilla(), + ApplyLayout(), + ] + + +def postprocess_VF2Layout( + qc: QuantumCircuit, + initial_layout: Layout, + original_qubit_indices: dict[QuantumRegister, int], + final_layout: Layout, + device: Device, +) -> tuple[QuantumCircuit, PassManager]: + """Postprocesses the given quantum circuit with the given layout and returns the altered quantum circuit and the respective PassManager.""" + postprocessing_action = rl.helper.get_layout_postprocessing_qiskit_pass()(device) + pm = PassManager(postprocessing_action) + pm.property_set["layout"] = initial_layout + pm.property_set["original_qubit_indices"] = original_qubit_indices + pm.property_set["final_layout"] = final_layout + altered_qc = pm.run(qc) + return altered_qc, pm + + +def postprocess_VF2PostLayout( + qc: QuantumCircuit, post_layout: Layout, layout_before: TranspileLayout +) -> tuple[QuantumCircuit, PassManager]: + """Postprocesses the given quantum circuit with the post_layout and returns the altered quantum circuit and the respective PassManager.""" + pm = PassManager(ApplyLayout()) + assert layout_before is not None + pm.property_set["layout"] = layout_before.initial_layout + pm.property_set["original_qubit_indices"] = layout_before.input_qubit_mapping + pm.property_set["final_layout"] = layout_before.final_layout + pm.property_set["post_layout"] = post_layout + altered_qc = pm.run(qc) + return altered_qc, pm diff --git a/tests/compilation/test_helper_rl.py b/tests/compilation/test_helper_rl.py index 0ac160789..03bbabcd3 100644 --- a/tests/compilation/test_helper_rl.py +++ b/tests/compilation/test_helper_rl.py @@ -2,39 +2,21 @@ from pathlib import Path +import numpy as np +from qiskit import transpile +from qiskit.transpiler import PassManager +from qiskit.transpiler.passes.layout.vf2_post_layout import VF2PostLayoutStopReason + from mqt.bench import get_benchmark +from mqt.bench.devices import get_device_by_name from mqt.predictor import rl -def test_get_actions_opt() -> None: - assert len(rl.helper.get_actions_opt()) == rl.helper.NUM_ACTIONS_OPT - - -def test_get_actions_layout() -> None: - assert len(rl.helper.get_actions_layout()) == rl.helper.NUM_ACTIONS_LAYOUT - - -def test_et_actions_routing() -> None: - assert len(rl.helper.get_actions_routing()) == rl.helper.NUM_ACTIONS_ROUTING - - -def test_get_actions_synthesis() -> None: - assert len(rl.helper.get_actions_synthesis()) == rl.helper.NUM_ACTIONS_SYNTHESIS - - -def test_get_action_terminate() -> None: - assert len(rl.helper.get_action_terminate()) == rl.helper.NUM_ACTIONS_TERMINATE - - -def test_get_actions_mapping() -> None: - assert len(rl.helper.get_actions_mapping()) == rl.helper.NUM_ACTIONS_MAPPING - - def test_create_feature_dict() -> None: qc = get_benchmark("dj", 1, 5) features = rl.helper.create_feature_dict(qc) - assert features - assert len(features) == rl.helper.NUM_FEATURE_VECTOR_ELEMENTS + for feature in features.values(): + assert isinstance(feature, np.ndarray | int) def test_get_path_trained_model() -> None: @@ -47,3 +29,54 @@ def test_get_path_training_circuits() -> None: path = rl.helper.get_path_training_circuits() assert path.exists() assert isinstance(path, Path) + + +def test_VF2_layout_and_postlayout() -> None: + qc = get_benchmark("ghz", 1, 3) + + for dev in [get_device_by_name("ibm_montreal"), get_device_by_name("ionq_harmony")]: + layout_pass = None + for layout_action in rl.helper.get_actions_layout(): + if layout_action["name"] == "VF2Layout": + layout_pass = layout_action["transpile_pass"](dev) + break + pm = PassManager(layout_pass) + altered_qc = pm.run(qc) + assert len(altered_qc.layout.initial_layout) == 3 + assert pm.property_set["VF2Layout_stop_reason"] is not None + layouted_qc, pm = rl.helper.postprocess_VF2Layout( + altered_qc, + pm.property_set["layout"], + pm.property_set["original_qubit_indices"], + pm.property_set["final_layout"], + dev, + ) + assert len(layouted_qc.layout.initial_layout) == dev.num_qubits + + dev_success = get_device_by_name("ibm_montreal") + qc_transpiled = transpile( + qc, basis_gates=dev_success.basis_gates, coupling_map=dev_success.coupling_map, optimization_level=0 + ) + assert qc_transpiled.layout is not None + + initial_layout_before = qc_transpiled.layout.initial_layout + + post_layout_pass = None + for layout_action in rl.helper.get_actions_final_optimization(): + if layout_action["name"] == "VF2PostLayout": + post_layout_pass = layout_action["transpile_pass"](dev_success) + break + + pm = PassManager(post_layout_pass) + altered_qc = pm.run(qc_transpiled) + + assert pm.property_set["VF2PostLayout_stop_reason"] is not None + assert pm.property_set["VF2PostLayout_stop_reason"] == VF2PostLayoutStopReason.SOLUTION_FOUND + + assert len(layouted_qc.layout.initial_layout) == dev.num_qubits + layouted_qc, pm = rl.helper.postprocess_VF2PostLayout( + altered_qc, pm.property_set["post_layout"], qc_transpiled.layout + ) + initial_layout_after = layouted_qc.layout.initial_layout + + assert initial_layout_before != initial_layout_after diff --git a/tests/compilation/test_integration_further_SDKs.py b/tests/compilation/test_integration_further_SDKs.py new file mode 100644 index 000000000..1b8cf7b37 --- /dev/null +++ b/tests/compilation/test_integration_further_SDKs.py @@ -0,0 +1,201 @@ +from __future__ import annotations + +from typing import cast + +import pytest +from bqskit.ext import bqskit_to_qiskit, qiskit_to_bqskit +from pytket.circuit import Qubit +from pytket.extensions.qiskit import qiskit_to_tk, tk_to_qiskit +from qiskit import QuantumCircuit +from qiskit.transpiler import CouplingMap, PassManager +from qiskit.transpiler.passes import CheckMap, GatesInBasis +from qiskit.transpiler.runningpassmanager import TranspileLayout + +from mqt.bench.devices import Device, get_available_devices, get_device_by_name +from mqt.predictor.rl import helper + + +def test_BQSKitO2_action() -> None: + """Test the BQSKitO2 action.""" + action_BQSKitO2 = None + for action in helper.get_actions_opt(): + if action["name"] == "BQSKitO2": + action_BQSKitO2 = action + + assert action_BQSKitO2 is not None + + qc = QuantumCircuit(2) + qc.h(0) + qc.cx(0, 1) + + bqskit_qc = qiskit_to_bqskit(qc) + optimized_qc = bqskit_to_qiskit(action_BQSKitO2["transpile_pass"](bqskit_qc)) + + assert optimized_qc != qc + + +@pytest.mark.parametrize("device", get_available_devices(), ids=lambda device: cast(str, device.name)) +def test_BQSKitSynthesis_action(device: Device) -> None: + """Test the BQSKitSynthesis action for all devices.""" + action_BQSKitSynthesis = None + for action in helper.get_actions_synthesis(): + if action["name"] == "BQSKitSynthesis": + action_BQSKitSynthesis = action + + assert action_BQSKitSynthesis is not None + + qc = QuantumCircuit(2) + qc.h(0) + qc.cx(0, 1) + + check_nat_gates = GatesInBasis(basis_gates=device.basis_gates) + check_nat_gates(qc) + assert not check_nat_gates.property_set["all_gates_in_basis"] + + transpile_pass = action_BQSKitSynthesis["transpile_pass"](device) + bqskit_qc = qiskit_to_bqskit(qc) + native_gates_qc = bqskit_to_qiskit(transpile_pass(bqskit_qc)) + + check_nat_gates = GatesInBasis(basis_gates=device.basis_gates) + check_nat_gates(native_gates_qc) + only_nat_gates = check_nat_gates.property_set["all_gates_in_basis"] + if "oqc" not in device.name: + assert only_nat_gates + + +def test_BQSKitMapping_action_swaps_necessary() -> None: + """Test the BQSKitMapping action for quantum circuit that requires SWAP gates.""" + action_BQSKitMapping = None + for action in helper.get_actions_mapping(): + if action["name"] == "BQSKitMapping": + action_BQSKitMapping = action + + assert action_BQSKitMapping is not None + + qc = QuantumCircuit(5) + qc.h(0) + qc.cx(0, 1) + qc.cx(0, 2) + qc.cx(0, 3) + qc.cx(0, 4) + + device = get_device_by_name("ibm_montreal") + bqskit_qc = qiskit_to_bqskit(qc) + bqskit_qc_mapped, input_mapping, output_mapping = action_BQSKitMapping["transpile_pass"](device)(bqskit_qc) + mapped_qc = bqskit_to_qiskit(bqskit_qc_mapped) + layout = helper.final_layout_bqskit_to_qiskit(input_mapping, output_mapping, mapped_qc, qc) + + assert input_mapping != output_mapping + assert layout.final_layout is not None + check_mapped_circuit(initial_qc=qc, mapped_qc=mapped_qc, device=device, layout=layout) + + +def check_mapped_circuit( + initial_qc: QuantumCircuit, mapped_qc: QuantumCircuit, device: Device, layout: TranspileLayout +) -> None: + # check if the altered circuit is correctly mapped to the device + check_mapping = CheckMap(coupling_map=CouplingMap(device.coupling_map)) + check_mapping(mapped_qc) + mapped = check_mapping.property_set["is_swap_mapped"] + assert mapped + assert mapped_qc != initial_qc + assert layout is not None + assert len(layout.initial_layout) == device.num_qubits + if layout.final_layout is not None: + assert len(layout.final_layout) == device.num_qubits + + # each qubit of the initial layout is part of the initial quantum circuit and the register name is correctly set + for assigned_physical_qubit in layout.initial_layout._p2v.values(): # noqa: SLF001 + qreg = assigned_physical_qubit.register + assert qreg.name in {"q", "ancilla"} + + # assigned_physical_qubit is part of the original quantum circuit + if qreg.name == "q": + assert qreg.size == initial_qc.num_qubits + # each qubit is also part of the initial uncompiled quantum circuit + assert initial_qc.find_bit(assigned_physical_qubit).registers[0][0].name == "q" + # assigned_physical_qubit is an ancilla qubit + else: + assert qreg.size == device.num_qubits - initial_qc.num_qubits + # each qubit of the final layout is part of the mapped quantum circuit and the register name is correctly set + if layout.final_layout is not None: + for assigned_physical_qubit in layout.final_layout._p2v.values(): # noqa: SLF001 + assert mapped_qc.find_bit(assigned_physical_qubit).registers[0][0].name == "q" + # each virtual qubit of the original quantum circuit is part of the initial layout + for virtual_qubit in initial_qc.qubits: + assert virtual_qubit in layout.initial_layout._p2v.values() # noqa: SLF001 + + +def test_BQSKitMapping_action_no_swaps_necessary() -> None: + """Test the BQSKitMapping action for a simple quantum circuit that does not require SWAP gates.""" + + action_BQSKitMapping = None + for action in helper.get_actions_mapping(): + if action["name"] == "BQSKitMapping": + action_BQSKitMapping = action + + assert action_BQSKitMapping is not None + + qc_no_swap_needed = QuantumCircuit(2) + qc_no_swap_needed.h(0) + qc_no_swap_needed.cx(0, 1) + + device = get_device_by_name("ibm_montreal") + + bqskit_qc = qiskit_to_bqskit(qc_no_swap_needed) + bqskit_qc_mapped, input_mapping, output_mapping = action_BQSKitMapping["transpile_pass"](device)(bqskit_qc) + mapped_qc = bqskit_to_qiskit(bqskit_qc_mapped) + layout = helper.final_layout_bqskit_to_qiskit(input_mapping, output_mapping, mapped_qc, qc_no_swap_needed) + assert layout is not None + assert input_mapping == output_mapping + assert layout.final_layout is None + + check_mapped_circuit(qc_no_swap_needed, mapped_qc, device, layout) + + +def test_TKET_routing() -> None: + """Test the TKETRouting action.""" + + qc = QuantumCircuit(5) + qc.h(0) + qc.cx(0, 1) + qc.cx(0, 2) + qc.cx(0, 3) + qc.cx(0, 4) + + device = get_device_by_name("ibm_montreal") + + layout_action = helper.get_actions_layout()[0] + transpile_pass = layout_action["transpile_pass"](device) + pm = PassManager(transpile_pass) + layouted_qc = pm.run(qc) + initial_layout = pm.property_set["layout"] + input_qubit_mapping = pm.property_set["original_qubit_indices"] + + routing_action = None + for action in helper.get_actions_routing(): + if action["origin"] == "tket": + routing_action = action + assert routing_action is not None + + tket_qc = qiskit_to_tk(layouted_qc, preserve_param_uuid=True) + for elem in routing_action["transpile_pass"](device): + elem.apply(tket_qc) + + qbs = tket_qc.qubits + qubit_map = {qbs[i]: Qubit("q", i) for i in range(len(qbs))} + tket_qc.rename_units(qubit_map) # type: ignore[arg-type] + + mapped_qc = tk_to_qiskit(tket_qc) + + final_layout = helper.final_layout_pytket_to_qiskit(tket_qc, mapped_qc) + + layout = TranspileLayout( + initial_layout=initial_layout, + input_qubit_mapping=input_qubit_mapping, + final_layout=final_layout, + _output_qubit_list=mapped_qc.qubits, + _input_qubit_count=qc.num_qubits, + ) + + check_mapped_circuit(qc, mapped_qc, device, layout) diff --git a/tests/compilation/test_predictor_rl.py b/tests/compilation/test_predictor_rl.py index c7c006f6c..c5c5cff69 100644 --- a/tests/compilation/test_predictor_rl.py +++ b/tests/compilation/test_predictor_rl.py @@ -38,6 +38,7 @@ def test_qcompile_with_newly_trained_models(figure_of_merit: reward.figure_of_me qc_compiled, compilation_information = res assert isinstance(qc_compiled, QuantumCircuit) + assert qc_compiled.layout is not None assert compilation_information is not None diff --git a/tests/device_selection/test_helper_ml.py b/tests/device_selection/test_helper_ml.py index fab46869e..87fb2e7bf 100644 --- a/tests/device_selection/test_helper_ml.py +++ b/tests/device_selection/test_helper_ml.py @@ -3,7 +3,6 @@ import pytest from mqt.bench import benchmark_generator -from mqt.bench.devices import get_available_device_names from mqt.predictor import ml @@ -53,14 +52,6 @@ def test_get_path_trained_model() -> None: assert path.exists() -def test_predict_device_for_figure_of_merit() -> None: - qc = benchmark_generator.get_benchmark("ghz", 1, 5) - assert ml.helper.predict_device_for_figure_of_merit(qc, "expected_fidelity").name in get_available_device_names() - - with pytest.raises(FileNotFoundError, match="Classifier is neither trained nor saved."): - ml.helper.predict_device_for_figure_of_merit(qc, "false_input") # type: ignore[arg-type] - - def test_get_path_results() -> None: for get_ghz_path_results in (True, False): path = ml.helper.get_path_results(ghz_results=get_ghz_path_results) diff --git a/tests/device_selection/test_predictor_ml.py b/tests/device_selection/test_predictor_ml.py index e455a0218..043fa8914 100644 --- a/tests/device_selection/test_predictor_ml.py +++ b/tests/device_selection/test_predictor_ml.py @@ -4,12 +4,31 @@ from typing import Literal import numpy as np +import pytest from mqt.bench import benchmark_generator -from mqt.bench.devices import get_available_devices +from mqt.bench.devices import get_available_device_names, get_available_devices from mqt.predictor import ml, reward +def test_train_random_forest_classifier() -> None: + """Test the training of a random forest classifier. This test must be executed prior to any prediction to make sure + the model is trained using the latest scikit-learn version.""" + predictor = ml.Predictor() + assert predictor.clf is None + predictor.train_random_forest_classifier(visualize_results=False) + + assert predictor.clf is not None + + +def test_predict_device_for_figure_of_merit() -> None: + qc = benchmark_generator.get_benchmark("ghz", 1, 5) + assert ml.helper.predict_device_for_figure_of_merit(qc, "expected_fidelity").name in get_available_device_names() + + with pytest.raises(FileNotFoundError, match="Classifier is neither trained nor saved."): + ml.helper.predict_device_for_figure_of_merit(qc, "false_input") # type: ignore[arg-type] + + def test_predict() -> None: path = ml.helper.get_path_trained_model(figure_of_merit="expected_fidelity") assert path.is_file() @@ -67,14 +86,6 @@ def test_performance_measures() -> None: result_path.unlink() -def test_train_random_forest_classifier() -> None: - predictor = ml.Predictor() - assert predictor.clf is None - predictor.train_random_forest_classifier(visualize_results=False) - - assert predictor.clf is not None - - def test_compile_all_circuits_for_dev_and_fom() -> None: predictor = ml.Predictor() source_path = Path() diff --git a/tests/compilation/test_pretrained_models.py b/tests/test_pretrained_models.py similarity index 91% rename from tests/compilation/test_pretrained_models.py rename to tests/test_pretrained_models.py index d84eaff19..f8981418d 100644 --- a/tests/compilation/test_pretrained_models.py +++ b/tests/test_pretrained_models.py @@ -27,6 +27,7 @@ def test_qcompile_with_pretrained_models(figure_of_merit: reward.figure_of_merit assert compilation_information is not None +@pytest.mark.skipif(not IN_GITHUB_ACTIONS, reason="Only run this test on GitHub runner") def test_qcompile() -> None: qc = get_benchmark("ghz", 1, 5) qc_compiled, compilation_information, quantum_device = qcompile(qc) @@ -35,6 +36,7 @@ def test_qcompile() -> None: assert len(qc_compiled) > 0 +@pytest.mark.skipif(not IN_GITHUB_ACTIONS, reason="Only run this test on GitHub runner") def test_evaluate_sample_circuit() -> None: qc = get_benchmark("ghz", 1, 3) filename = "test_3.qasm"