diff --git a/pennylane_qiskit/aer.py b/pennylane_qiskit/aer.py index 24e1a353..34d6c3bc 100644 --- a/pennylane_qiskit/aer.py +++ b/pennylane_qiskit/aer.py @@ -18,10 +18,10 @@ """ import qiskit_aer -from .qiskit_device import QiskitDevice +from .qiskit_device_legacy import QiskitDeviceLegacy -class AerDevice(QiskitDevice): +class AerDevice(QiskitDeviceLegacy): """A PennyLane device for the C++ Qiskit Aer simulator. Please refer to the `Qiskit documentation `_ for diff --git a/pennylane_qiskit/basic_sim.py b/pennylane_qiskit/basic_sim.py index 0b51be91..99f0a10e 100644 --- a/pennylane_qiskit/basic_sim.py +++ b/pennylane_qiskit/basic_sim.py @@ -17,10 +17,10 @@ using PennyLane. """ from qiskit.providers.basic_provider import BasicProvider -from .qiskit_device import QiskitDevice +from .qiskit_device_legacy import QiskitDeviceLegacy -class BasicSimulatorDevice(QiskitDevice): +class BasicSimulatorDevice(QiskitDeviceLegacy): """A PennyLane device for the native Python Qiskit simulator. For more information on the ``BasicSimulator`` backend options and transpile options, please visit the diff --git a/pennylane_qiskit/converter.py b/pennylane_qiskit/converter.py index 064ab4c8..0df4e492 100644 --- a/pennylane_qiskit/converter.py +++ b/pennylane_qiskit/converter.py @@ -25,6 +25,7 @@ import qiskit.qasm2 from qiskit.circuit import Parameter, ParameterExpression, ParameterVector from qiskit.circuit import Measure, Barrier, ControlFlowOp, Clbit +from qiskit.circuit import library as lib from qiskit.circuit.classical import expr from qiskit.circuit.controlflow.switch_case import _DefaultCaseType from qiskit.circuit.library import GlobalPhaseGate @@ -36,7 +37,50 @@ import pennylane as qml import pennylane.ops as pennylane_ops from pennylane.tape.tape import rotations_and_diagonal_measurements -from pennylane_qiskit.qiskit_device import QISKIT_OPERATION_MAP + +QISKIT_OPERATION_MAP = { + # native PennyLane operations also native to qiskit + "PauliX": lib.XGate, + "PauliY": lib.YGate, + "PauliZ": lib.ZGate, + "Hadamard": lib.HGate, + "CNOT": lib.CXGate, + "CZ": lib.CZGate, + "SWAP": lib.SwapGate, + "ISWAP": lib.iSwapGate, + "RX": lib.RXGate, + "RY": lib.RYGate, + "RZ": lib.RZGate, + "Identity": lib.IGate, + "CSWAP": lib.CSwapGate, + "CRX": lib.CRXGate, + "CRY": lib.CRYGate, + "CRZ": lib.CRZGate, + "PhaseShift": lib.PhaseGate, + "QubitStateVector": lib.Initialize, + "StatePrep": lib.Initialize, + "Toffoli": lib.CCXGate, + "QubitUnitary": lib.UnitaryGate, + "U1": lib.U1Gate, + "U2": lib.U2Gate, + "U3": lib.U3Gate, + "IsingZZ": lib.RZZGate, + "IsingYY": lib.RYYGate, + "IsingXX": lib.RXXGate, + "S": lib.SGate, + "T": lib.TGate, + "SX": lib.SXGate, + "Adjoint(S)": lib.SdgGate, + "Adjoint(T)": lib.TdgGate, + "Adjoint(SX)": lib.SXdgGate, + "CY": lib.CYGate, + "CH": lib.CHGate, + "CPhase": lib.CPhaseGate, + "CCZ": lib.CCZGate, + "ECR": lib.ECRGate, + "Barrier": lib.Barrier, + "Adjoint(GlobalPhase)": lib.GlobalPhaseGate, +} inv_map = {v.__name__: k for k, v in QISKIT_OPERATION_MAP.items()} diff --git a/pennylane_qiskit/qiskit_device.py b/pennylane_qiskit/qiskit_device.py index b03a9692..8df10cad 100644 --- a/pennylane_qiskit/qiskit_device.py +++ b/pennylane_qiskit/qiskit_device.py @@ -1,4 +1,4 @@ -# Copyright 2019-2021 Xanadu Quantum Technologies Inc. +# Copyright 2019-2024 Xanadu Quantum Technologies Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,124 +12,258 @@ # See the License for the specific language governing permissions and # limitations under the License. r""" -This module contains a base class for constructing Qiskit devices for PennyLane. +This module contains a prototype base class for constructing Qiskit devices +for PennyLane with the new device API. """ # pylint: disable=too-many-instance-attributes,attribute-defined-outside-init -import abc -import inspect import warnings +import inspect +from typing import Union, Callable, Tuple, Sequence +from contextlib import contextmanager import numpy as np -from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.circuit import library as lib +import pennylane as qml from qiskit.compiler import transpile -from qiskit.converters import circuit_to_dag, dag_to_circuit -from qiskit.providers import Backend, BackendV2, QiskitBackendNotFoundError - -from pennylane import QubitDevice, DeviceError -from pennylane.measurements import SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP +from qiskit.providers import BackendV2 + +from qiskit_ibm_runtime import Session, SamplerV2 as Sampler, EstimatorV2 as Estimator + +from pennylane import transform +from pennylane.transforms.core import TransformProgram +from pennylane.transforms import broadcast_expand, split_non_commuting +from pennylane.tape import QuantumTape, QuantumScript +from pennylane.typing import Result, ResultBatch +from pennylane.devices import Device +from pennylane.devices.execution_config import ExecutionConfig, DefaultExecutionConfig +from pennylane.devices.preprocess import ( + decompose, + validate_observables, + validate_measurements, + validate_device_wires, +) +from pennylane.measurements import ExpectationMP, VarianceMP from ._version import __version__ +from .converter import QISKIT_OPERATION_MAP, circuit_to_qiskit, mp_to_pauli + +QuantumTapeBatch = Sequence[QuantumTape] +QuantumTape_or_Batch = Union[QuantumTape, QuantumTapeBatch] +Result_or_ResultBatch = Union[Result, ResultBatch] + + +# pylint: disable=protected-access +@contextmanager +def qiskit_session(device, **kwargs): + """A context manager that creates a Qiskit Session and sets it as a session + on the device while the context manager is active. Using the context manager + will ensure the Session closes properly and is removed from the device after + completing the tasks. Any Session that was initialized and passed into the + device will be overwritten by the Qiskit Session created by this context + manager. + + Args: + device (QiskitDevice2): the device that will create remote tasks using the session + **kwargs: session keyword arguments to be used for settings for the Session. At the + time of writing, the only relevant keyword argument is "max_time", which lets you + set the maximum amount of time the sessin is open. For the most up to date information, + please refer to the Qiskit Session + `documentation `_. + + **Example:** + + .. code-block:: python + + import pennylane as qml + from pennylane_qiskit import qiskit_session + from qiskit_ibm_runtime import QiskitRuntimeService + + # get backend + service = QiskitRuntimeService(channel="ibm_quantum") + backend = service.least_busy(simulator=False, operational=True) + + # initialize device + dev = qml.device('qiskit.remote', wires=2, backend=backend) + + @qml.qnode(dev) + def circuit(x): + qml.RX(x, 0) + qml.CNOT([0, 1]) + return qml.expval(qml.PauliZ(1)) + + angle = 0.1 + + with qiskit_session(dev, max_time=60) as session: + # queue for the first execution + res = circuit(angle)[0] + + # then this loop executes immediately after without queueing again + while res > 0: + angle += 0.3 + res = circuit(angle)[0] + + Note that if you passed in a session to your device, that session will be overwritten + by `qiskit_session`. + + .. code-block:: python + + import pennylane as qml + from pennylane_qiskit import qiskit_session + from qiskit_ibm_runtime import QiskitRuntimeService, Session -SAMPLE_TYPES = (SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP) - - -QISKIT_OPERATION_MAP = { - # native PennyLane operations also native to qiskit - "PauliX": lib.XGate, - "PauliY": lib.YGate, - "PauliZ": lib.ZGate, - "Hadamard": lib.HGate, - "CNOT": lib.CXGate, - "CZ": lib.CZGate, - "SWAP": lib.SwapGate, - "ISWAP": lib.iSwapGate, - "RX": lib.RXGate, - "RY": lib.RYGate, - "RZ": lib.RZGate, - "Identity": lib.IGate, - "CSWAP": lib.CSwapGate, - "CRX": lib.CRXGate, - "CRY": lib.CRYGate, - "CRZ": lib.CRZGate, - "PhaseShift": lib.PhaseGate, - "QubitStateVector": lib.Initialize, - "StatePrep": lib.Initialize, - "Toffoli": lib.CCXGate, - "QubitUnitary": lib.UnitaryGate, - "U1": lib.U1Gate, - "U2": lib.U2Gate, - "U3": lib.U3Gate, - "IsingZZ": lib.RZZGate, - "IsingYY": lib.RYYGate, - "IsingXX": lib.RXXGate, - "S": lib.SGate, - "T": lib.TGate, - "SX": lib.SXGate, - "Adjoint(S)": lib.SdgGate, - "Adjoint(T)": lib.TdgGate, - "Adjoint(SX)": lib.SXdgGate, - "CY": lib.CYGate, - "CH": lib.CHGate, - "CPhase": lib.CPhaseGate, - "CCZ": lib.CCZGate, - "ECR": lib.ECRGate, - "Barrier": lib.Barrier, - "Adjoint(GlobalPhase)": lib.GlobalPhaseGate, -} - - -def _get_backend_name(backend): + # get backend + service = QiskitRuntimeService(channel="ibm_quantum") + backend = service.least_busy(simulator=False, operational=True) + + # initialize device + dev = qml.device('qiskit.remote', wires=2, backend=backend, session=Session(backend=backend, max_time=30)) + + @qml.qnode(dev) + def circuit(x): + qml.RX(x, 0) + qml.CNOT([0, 1]) + return qml.expval(qml.PauliZ(1)) + + angle = 0.1 + + # This session will have the Qiskit default settings max_time=900 + with qiskit_session(dev) as session: + res = circuit(angle)[0] + + while res > 0: + angle += 0.3 + res = circuit(angle)[0] + """ + # Code to acquire session: + existing_session = device._session + + session_options = {"backend": device.backend, "service": device.service} + + for k, v in kwargs.items(): + # Options like service and backend should be tied to the settings set on device + if k in session_options: + warnings.warn(f"Using '{k}' set in device, {getattr(device, k)}", UserWarning) + else: + session_options[k] = v + + session = Session(**session_options) + device._session = session try: - return backend.name() # BackendV1 - except TypeError: # pragma: no cover - return backend.name # BackendV2 + yield session + finally: + # Code to release session: + session.close() + device._session = existing_session + + +def accepted_sample_measurement(m: qml.measurements.MeasurementProcess) -> bool: + """Specifies whether or not a measurement is accepted when sampling.""" + + return isinstance( + m, + ( + qml.measurements.SampleMeasurement, + qml.measurements.ClassicalShadowMP, + qml.measurements.ShadowExpvalMP, + ), + ) + + +@transform +def split_execution_types( + tape: qml.tape.QuantumTape, +) -> (Sequence[qml.tape.QuantumTape], Callable): + """Split into separate tapes based on measurement type. Counts and sample-based measurements + will use the Qiskit Sampler. ExpectationValue and Variance will use the Estimator, except + when the measured observable does not have a `pauli_rep`. In that case, the Sampler will be + used, and the raw samples will be processed to give an expectation value.""" + estimator = [] + sampler = [] + + for i, mp in enumerate(tape.measurements): + if isinstance(mp, (ExpectationMP, VarianceMP)): + if mp.obs.pauli_rep: + estimator.append((mp, i)) + else: + warnings.warn( + f"The observable measured {mp.obs} does not have a `pauli_rep` " + "and will be run without using the Estimator primitive. Instead, " + "raw samples from the Sampler will be used." + ) + sampler.append((mp, i)) + else: + sampler.append((mp, i)) + + order_indices = [[i for mp, i in group] for group in [estimator, sampler]] + + tapes = [] + if estimator: + tapes.extend( + [ + qml.tape.QuantumScript( + tape.operations, + measurements=[mp for mp, i in estimator], + shots=tape.shots, + ) + ] + ) + if sampler: + tapes.extend( + [ + qml.tape.QuantumScript( + tape.operations, + measurements=[mp for mp, i in sampler], + shots=tape.shots, + ) + ] + ) + + def reorder_fn(res): + """re-order the output to the original shape and order""" + flattened_indices = [i for group in order_indices for i in group] + flattened_results = [r for group in res for r in group] -class QiskitDevice(QubitDevice, abc.ABC): - r"""Abstract Qiskit device for PennyLane. + if len(flattened_indices) != len(flattened_results): + raise ValueError( + "The lengths of flattened_indices and flattened_results do not match." + ) # pragma: no cover + + result = dict(zip(flattened_indices, flattened_results)) + + result = tuple(result[i] for i in sorted(result.keys())) + + return result[0] if len(result) == 1 else result + + return tapes, reorder_fn + + +class QiskitDevice(Device): + r"""Hardware/simulator Qiskit device for PennyLane. Args: wires (int or Iterable[Number, str]]): Number of subsystems represented by the device, or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``) - or strings (``['ancilla', 'q1', 'q2']``). - provider (Provider | None): The Qiskit backend provider. - backend (str | Backend): the desired backend. If a string, a provider must be given. - shots (int or None): number of circuit evaluations/random samples used - to estimate expectation values and variances of observables. For state vector backends, - setting to ``None`` results in computing statistics like expectation values and variances analytically. + or strings (``['aux_wire', 'q1', 'q2']``). + backend (Backend): the initialized Qiskit backend Keyword Args: - name (str): The name of the circuit. Default ``'circuit'``. - compile_backend (BaseBackend): The backend used for compilation. If you wish - to simulate a device compliant circuit, you can specify a backend here. + shots (int or None): number of circuit evaluations/random samples used + to estimate expectation values and variances of observables. + session (Session): a Qiskit Session to use for device execution. If none is provided, a session will + be created at each device execution. + compile_backend (Union[Backend, None]): the backend to be used for compiling the circuit that will be + sent to the backend device, to be set if the backend desired for compliation differs from the + backend used for execution. Defaults to ``None``, which means the primary backend will be used. + **kwargs: transpilation and runtime keyword arguments to be used for measurements with Primitives. + If an `options` dictionary is defined amongst the kwargs, and there are settings that overlap + with those in kwargs, the settings in `options` will take precedence over kwargs. Keyword + arguments accepted by both the transpiler and at runtime (e.g. ``optimization_level``) + will be passed to the transpiler rather than to the Primitive. """ - name = "Qiskit PennyLane plugin" - pennylane_requires = ">=0.37.0" - version = __version__ - plugin_version = __version__ - author = "Xanadu" - - _capabilities = { - "model": "qubit", - "tensor_observables": True, - "inverse_operations": True, - } - _operation_map = QISKIT_OPERATION_MAP - _state_backends = { - "statevector_simulator", - "simulator_statevector", - "unitary_simulator", - "aer_simulator_statevector", - "aer_simulator_unitary", - } - """set[str]: Set of backend names that define the backends - that support returning the underlying quantum statevector""" - - operations = set(_operation_map.keys()) + operations = set(QISKIT_OPERATION_MAP.keys()) observables = { "PauliX", "PauliY", @@ -138,399 +272,415 @@ class QiskitDevice(QubitDevice, abc.ABC): "Hadamard", "Hermitian", "Projector", + "Prod", + "Sum", + "LinearCombination", + "SProd", + # TODO Could support SparseHamiltonian } - analytic_warning_message = ( - "The analytic calculation of expectations, variances and " - "probabilities is only supported on statevector backends, not on the {}. " - "Such statistics obtained from this device are estimates based on samples." - ) - - _eigs = {} - - def __init__(self, wires, provider, backend, shots=1024, **kwargs): + # pylint:disable = too-many-arguments + def __init__( + self, + wires, + backend, + shots=1024, + session=None, + compile_backend=None, + **kwargs, + ): + + if shots is None: + warnings.warn( + "Expected an integer number of shots, but received shots=None. Defaulting " + "to 1024 shots. The analytic calculation of results is not supported on " + "this device. All statistics obtained from this device are estimates based " + "on samples.", + UserWarning, + ) + + shots = 1024 super().__init__(wires=wires, shots=shots) - self.provider = provider - - if isinstance(backend, Backend): - self._backend = backend - self.backend_name = _get_backend_name(backend) - elif provider is None: - raise ValueError("Must pass a provider if the backend is not a Backend instance.") - else: - try: - self._backend = provider.get_backend(backend) - except QiskitBackendNotFoundError as e: - available_backends = list(map(_get_backend_name, provider.backends())) - raise ValueError( - f"Backend '{backend}' does not exist. Available backends " - f"are:\n {available_backends}" - ) from e - - self.backend_name = _get_backend_name(self._backend) - - # Keep track if the user specified analytic to be True - if shots is None and not self._is_state_backend: - # Raise a warning if no shots were specified for a hardware device - warnings.warn(self.analytic_warning_message.format(backend), UserWarning) + self._backend = backend + self._compile_backend = compile_backend if compile_backend else backend - self.shots = 1024 + self._service = getattr(backend, "_service", None) + self._session = session - self._capabilities["returns_state"] = self._is_state_backend + kwargs["shots"] = shots # Perform validation against backend - backend_qubits = ( + available_qubits = ( backend.num_qubits if isinstance(backend, BackendV2) - else self.backend.configuration().n_qubits + else backend.configuration().n_qubits ) - if backend_qubits and len(self.wires) > int(backend_qubits): - raise ValueError(f"Backend '{backend}' supports maximum {backend_qubits} wires") + if len(self.wires) > int(available_qubits): + raise ValueError(f"Backend '{backend}' supports maximum {available_qubits} wires") - # Initialize inner state self.reset() + self._kwargs, self._transpile_args = self._process_kwargs( + kwargs + ) # processes kwargs and separates transpilation arguments to dev._transpile_args - self.process_kwargs(kwargs) - - def process_kwargs(self, kwargs): - """Processing the keyword arguments that were provided upon device initialization. + @property + def backend(self): + """The Qiskit backend object. - Args: - kwargs (dict): keyword arguments to be set for the device + Returns: + qiskit.providers.Backend: Qiskit backend object. """ - self.compile_backend = None - if "compile_backend" in kwargs: - self.compile_backend = kwargs.pop("compile_backend") - - if "noise_model" in kwargs: - noise_model = kwargs.pop("noise_model") - self.backend.set_options(noise_model=noise_model) - - # set transpile_args - self.set_transpile_args(**kwargs) - - # Get further arguments for run - self.run_args = {} - - # Specify to have a memory for hw/hw simulators - compile_backend = self.compile_backend or self.backend - memory = str(compile_backend) not in self._state_backends - - if memory: - kwargs["memory"] = True - - # Consider the remaining kwargs as keyword arguments to run - self.run_args.update(kwargs) - - @property - def _is_state_backend(self): - """Returns whether this device has a state backend.""" - return self.backend_name in self._state_backends or self.backend.options.get("method") in { - "unitary", - "statevector", - } + return self._backend @property - def _is_statevector_backend(self): - """Returns whether this device has a statevector backend.""" - method = "statevector" - return method in self.backend_name or self.backend.options.get("method") == method + def compile_backend(self): + """The ``compile_backend`` is a Qiskit backend object to be used for transpilation. + Returns: + qiskit.providers.backend: Qiskit backend object. + """ + return self._compile_backend @property - def _is_unitary_backend(self): - """Returns whether this device has a unitary backend.""" - method = "unitary" - return method in self.backend_name or self.backend.options.get("method") == method - - def set_transpile_args(self, **kwargs): - """The transpile argument setter. + def service(self): + """The QiskitRuntimeService service. - Keyword Args: - kwargs (dict): keyword arguments to be set for the Qiskit transpiler. For more details, see the - `Qiskit transpiler documentation `_ + Returns: + qiskit.qiskit_ibm_runtime.QiskitRuntimeService """ - transpile_sig = inspect.signature(transpile).parameters - self.transpile_args = {arg: kwargs[arg] for arg in transpile_sig if arg in kwargs} - self.transpile_args.pop("circuits", None) - self.transpile_args.pop("backend", None) + return self._service @property - def backend(self): - """The Qiskit backend object. + def session(self): + """The QiskitRuntimeService session. Returns: - qiskit.providers.backend: Qiskit backend object. + qiskit.qiskit_ibm_runtime.Session """ - return self._backend + return self._session - def reset(self): - """Reset the Qiskit backend device""" - # Reset only internal data, not the options that are determined on - # device creation - self._reg = QuantumRegister(self.num_wires, "q") - self._creg = ClassicalRegister(self.num_wires, "c") - self._circuit = QuantumCircuit(self._reg, self._creg, name="temp") + @property + def num_wires(self): + """Get the number of wires. - self._current_job = None - self._state = None # statevector of a simulator backend + Returns: + int: The number of wires. + """ + return len(self.wires) - def create_circuit_object(self, operations, **kwargs): - """Builds the circuit objects based on the operations and measurements - specified to apply. + def update_session(self, session): + """Update the session attribute. Args: - operations (list[~.Operation]): operations to apply to the device - - Keyword args: - rotations (list[~.Operation]): Operations that rotate the circuit - pre-measurement into the eigenbasis of the observables. + session: The new session to be set. """ - rotations = kwargs.get("rotations", []) - - applied_operations = self.apply_operations(operations) + self._session = session - # Rotating the state for measurement in the computational basis - rotation_circuits = self.apply_operations(rotations) - applied_operations.extend(rotation_circuits) + def reset(self): + """Reset the current job to None.""" + self._current_job = None - for circuit in applied_operations: - self._circuit &= circuit + def stopping_condition(self, op: qml.operation.Operator) -> bool: + """Specifies whether or not an Operator is accepted by QiskitDevice2.""" + return op.name in self.operations - if not self._is_state_backend: - # Add measurements if they are needed - for qr, cr in zip(self._reg, self._creg): - self._circuit.measure(qr, cr) - elif "aer" in self.backend_name: - self._circuit.save_state() + def observable_stopping_condition(self, obs: qml.operation.Operator) -> bool: + """Specifies whether or not an observable is accepted by QiskitDevice2.""" + return obs.name in self.observables - def apply(self, operations, **kwargs): - """Build the circuit object and apply the operations""" - self.create_circuit_object(operations, **kwargs) + def preprocess( + self, + execution_config: ExecutionConfig = DefaultExecutionConfig, + ) -> Tuple[TransformProgram, ExecutionConfig]: + """This function defines the device transform program to be applied and an updated device configuration. - # These operations need to run for all devices - compiled_circuit = self.compile() - self.run(compiled_circuit) + Args: + execution_config (Union[ExecutionConfig, Sequence[ExecutionConfig]]): A data structure describing the + parameters needed to fully describe the execution. - def apply_operations(self, operations): - """Apply the circuit operations. + Returns: + TransformProgram, ExecutionConfig: A transform program that when called returns QuantumTapes that the device + can natively execute as well as a postprocessing function to be called after execution, and a configuration with + unset specifications filled in. - This method serves as an auxiliary method to :meth:`~.QiskitDevice.apply`. + This device: - Args: - operations (List[pennylane.Operation]): operations to be applied + * Supports any operations with explicit PennyLane to Qiskit gate conversions defined in the plugin + * Does not intrinsically support parameter broadcasting - Returns: - list[QuantumCircuit]: a list of quantum circuit objects that - specify the corresponding operations """ - circuits = [] + config = execution_config + config.use_device_gradient = False - for operation in operations: - # Apply the circuit operations - device_wires = self.map_wires(operation.wires) - par = operation.parameters + transform_program = TransformProgram() - for idx, p in enumerate(par): - if isinstance(p, np.ndarray): - # Convert arrays so that Qiskit accepts the parameter - par[idx] = p.tolist() + transform_program.add_transform(validate_device_wires, self.wires, name=self.name) + transform_program.add_transform( + decompose, + stopping_condition=self.stopping_condition, + name=self.name, + skip_initial_state_prep=False, + ) + transform_program.add_transform( + validate_measurements, + sample_measurements=accepted_sample_measurement, + name=self.name, + ) + transform_program.add_transform( + validate_observables, + stopping_condition=self.observable_stopping_condition, + name=self.name, + ) - operation = operation.name + transform_program.add_transform(broadcast_expand) + transform_program.add_transform(split_non_commuting) - mapped_operation = self._operation_map[operation] + transform_program.add_transform(split_execution_types) - self.qubit_state_vector_check(operation) + return transform_program, config - qregs = [self._reg[i] for i in device_wires.labels] + def _process_kwargs(self, kwargs): + """Processes kwargs given and separates them into kwargs and transpile_args. If given + a keyword argument 'options' that is a dictionary, a common practice in + Qiskit, the options in said dictionary take precedence over any overlapping keyword + arguments defined in the kwargs. - if operation in ("QubitUnitary", "QubitStateVector", "StatePrep"): - # Need to revert the order of the quantum registers used in - # Qiskit such that it matches the PennyLane ordering - qregs = list(reversed(qregs)) + Keyword Args: + kwargs (dict): keyword arguments that set either runtime options or transpilation + options. - if operation in ("Barrier",): - # Need to add the num_qubits for instantiating Barrier in Qiskit - par = [len(self._reg)] + Returns: + kwargs, transpile_args: keyword arguments for the runtime options and keyword + arguments for the transpiler + """ - dag = circuit_to_dag(QuantumCircuit(self._reg, self._creg, name="")) + if "noise_model" in kwargs: + noise_model = kwargs.pop("noise_model") + self.backend.set_options(noise_model=noise_model) - gate = mapped_operation(*par) + if "options" in kwargs: + for key, val in kwargs.pop("options").items(): + if key in kwargs: + warnings.warn( + "An overlap between what was passed in via options and what was passed in via kwargs was found." + f"The value set in options {key}={val} will be used." + ) + kwargs[key] = val - dag.apply_operation_back(gate, qargs=qregs) - circuit = dag_to_circuit(dag) - circuits.append(circuit) + shots = kwargs.pop("shots") - return circuits + if "default_shots" in kwargs: + warnings.warn( + f"default_shots was found in the keyword arguments, but it is not supported by {self.name}" + "Please use the `shots` keyword argument instead. The number of shots " + f"{shots} will be used instead." + ) + kwargs["default_shots"] = shots - def qubit_state_vector_check(self, operation): - """Input check for the StatePrepBase operations. + kwargs, transpile_args = self.get_transpile_args(kwargs) - Args: - operation (pennylane.Operation): operation to be checked + return kwargs, transpile_args - Raises: - DeviceError: If the operation is QubitStateVector or StatePrep - """ - if operation in ("QubitStateVector", "StatePrep"): - if self._is_unitary_backend: - raise DeviceError( - f"The {operation} operation " - "is not supported on the unitary simulator backend." - ) + @staticmethod + def get_transpile_args(kwargs): + """The transpile argument setter. This separates keyword arguments related to transpilation + from the rest of the keyword arguments and removes those keyword arguments from kwargs. - def compile(self): - """Compile the quantum circuit to target the provided compile_backend. + Keyword Args: + kwargs (dict): combined keyword arguments to be parsed for the Qiskit transpiler. For more details, see the + `Qiskit transpiler documentation `_ - If compile_backend is None, then the target is simply the - backend. + Returns: + kwargs (dict), transpile_args (dict): keyword arguments for the runtime options and keyword + arguments for the transpiler """ - compile_backend = self.compile_backend or self.backend - compiled_circuits = transpile(self._circuit, backend=compile_backend, **self.transpile_args) - return compiled_circuits - def run(self, qcirc): - """Run the compiled circuit and query the result. + transpile_sig = inspect.signature(transpile).parameters - Args: - qcirc (qiskit.QuantumCircuit): the quantum circuit to be run on the backend - """ - self._current_job = self.backend.run(qcirc, shots=self.shots, **self.run_args) - result = self._current_job.result() + transpile_args = {arg: kwargs.pop(arg) for arg in transpile_sig if arg in kwargs} + transpile_args.pop("circuits", None) + transpile_args.pop("backend", None) - if self._is_state_backend: - self._state = self._get_state(result) + return kwargs, transpile_args - def _get_state(self, result, experiment=None): - """Returns the statevector for state simulator backends. + def compile_circuits(self, circuits): + """Compiles multiple circuits one after the other. Args: - result (qiskit.Result): result object - experiment (str or None): the name of the experiment to get the state for. + circuits (list[QuantumCircuit]): the circuits to be compiled Returns: - array[float]: size ``(2**num_wires,)`` statevector + list[QuantumCircuit]: the list of compiled circuits """ - if self._is_statevector_backend: - state = np.asarray(result.get_statevector(experiment)) + # Compile each circuit object + compiled_circuits = [] + transpile_args = self._transpile_args - elif self._is_unitary_backend: - unitary = np.asarray(result.get_unitary(experiment)) - initial_state = np.zeros([2**self.num_wires]) - initial_state[0] = 1 + for i, circuit in enumerate(circuits): + compiled_circ = transpile(circuit, backend=self.compile_backend, **transpile_args) + compiled_circ.name = f"circ{i}" + compiled_circuits.append(compiled_circ) - state = unitary @ initial_state + return compiled_circuits - # reverse qubit order to match PennyLane convention - return state.reshape([2] * self.num_wires).T.flatten() + # pylint: disable=unused-argument, no-member + def execute( + self, + circuits: QuantumTape_or_Batch, + execution_config: ExecutionConfig = DefaultExecutionConfig, + ) -> Result_or_ResultBatch: + """Execute a circuit or a batch of circuits and turn it into results.""" + session = self._session or Session(backend=self.backend) - def generate_samples(self, circuit=None): - r"""Returns the computational basis samples generated for all wires. + results = [] - Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where - :math:`q_0` is the most significant bit. + if isinstance(circuits, QuantumScript): + circuits = [circuits] + + @contextmanager + def execute_circuits(session): + try: + for circ in circuits: + if circ.shots and len(circ.shots.shot_vector) > 1: + raise ValueError( + f"Setting shot vector {circ.shots.shot_vector} is not supported for {self.name}." + "Please use a single integer instead when specifying the number of shots." + ) + if isinstance(circ.measurements[0], (ExpectationMP, VarianceMP)) and getattr( + circ.measurements[0].obs, "pauli_rep", None + ): + execute_fn = self._execute_estimator + else: + execute_fn = self._execute_sampler + results.append(execute_fn(circ, session)) + yield results + finally: + session.close() + + with execute_circuits(session) as results: + return results + + def _execute_sampler(self, circuit, session): + """Returns the result of the execution of the circuit using the SamplerV2 Primitive. + Note that this result has been processed respective to the MeasurementProcess given. + E.g. `qml.expval` returns an expectation value whereas `qml.sample()` will return the raw samples. Args: - circuit (str or None): the name of the circuit to get the state for + circuits (list[QuantumCircuit]): the circuits to be executed via SamplerV2 + session (Session): the session that the execution will be performed with Returns: - array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)`` + result (tuple): the processed result from SamplerV2 """ + qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=True, measure=True)] + sampler = Sampler(session=session) + compiled_circuits = self.compile_circuits(qcirc) + sampler.options.update(**self._kwargs) - # branch out depending on the type of backend - if self._is_state_backend: - # software simulator: need to sample from probabilities - return super().generate_samples() + # len(compiled_circuits) is always 1 so the indexing does not matter. + result = sampler.run( + compiled_circuits, + shots=circuit.shots.total_shots if circuit.shots.total_shots else None, + ).result()[0] + classical_register_name = compiled_circuits[0].cregs[0].name + self._current_job = getattr(result.data, classical_register_name) - # hardware or hardware simulator - samples = self._current_job.result().get_memory(circuit) - # reverse qubit order to match PennyLane convention - return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples]) + # needs processing function to convert to the correct format for states, and + # also handle instances where wires were specified in probs, and for multiple probs measurements - @property - def state(self): - """Get state of the device""" - return self._state + self._samples = self.generate_samples(0) + res = [ + mp.process_samples(self._samples, wire_order=self.wires) for mp in circuit.measurements + ] - def analytic_probability(self, wires=None): - """Get the analytic probability of the device""" - if self._state is None: - return None + single_measurement = len(circuit.measurements) == 1 + res = (res[0],) if single_measurement else tuple(res) - prob = self.marginal_prob(np.abs(self._state) ** 2, wires) - return prob + return res - def compile_circuits(self, circuits): - r"""Compiles multiple circuits one after the other. + def _execute_estimator(self, circuit, session): + """Returns the result of the execution of the circuit using the EstimatorV2 Primitive. + Note that this result has been processed respective to the MeasurementProcess given. + E.g. `qml.expval` returns an expectation value whereas `qml.var` will return the variance. Args: - circuits (list[.tapes.QuantumTape]): the circuits to be compiled + circuits (list[QuantumCircuit]): the circuits to be executed via EstimatorV2 + session (Session): the session that the execution will be performed with Returns: - list[QuantumCircuit]: the list of compiled circuits + result (tuple): the processed result from EstimatorV2 """ - # Compile each circuit object - compiled_circuits = [] - - for circuit in circuits: - # We need to reset the device here, else it will - # not start the next computation in the zero state - self.reset() - self.create_circuit_object(circuit.operations, rotations=circuit.diagonalizing_gates) - - compiled_circ = self.compile() - compiled_circ.name = f"circ{len(compiled_circuits)}" - compiled_circuits.append(compiled_circ) + # the Estimator primitive takes care of diagonalization and measurements itself, + # so diagonalizing gates and measurements are not included in the circuit + qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=False, measure=False)] + estimator = Estimator(session=session) + + pauli_observables = [mp_to_pauli(mp, self.num_wires) for mp in circuit.measurements] + compiled_circuits = self.compile_circuits(qcirc) + estimator.options.update(**self._kwargs) + # split into one call per measurement + # could technically be more efficient if there are some observables where we ask + # for expectation value and variance on the same observable, but spending time on + # that right now feels excessive + circ_and_obs = [(compiled_circuits[0], pauli_observables)] + result = estimator.run( + circ_and_obs, + precision=np.sqrt(1 / circuit.shots.total_shots) if circuit.shots else None, + ).result() + self._current_job = result + result = self._process_estimator_job(circuit.measurements, result) + + return result + + @staticmethod + def _process_estimator_job(measurements, job_result): + """Estimator returns the expectation value and standard error for each observable measured, + along with some metadata that contains the precision. Extracts the relevant number for each + measurement process and return the requested results from the Estimator executions. + + Note that for variance, we calculate the variance by using the standard error and the + precision value. - return compiled_circuits + Args: + measurements (list[MeasurementProcess]): the measurements in the circuit + job_result (Any): the result from EstimatorV2 - def batch_execute(self, circuits, timeout: int = None): - """Batch execute the circuits on the device""" + Returns: + result (tuple): the processed result from EstimatorV2 + """ + expvals = job_result[0].data.evs + variances = (job_result[0].data.stds / job_result[0].metadata["target_precision"]) ** 2 + result = [] + for i, mp in enumerate(measurements): + if isinstance(mp, ExpectationMP): + result.append(expvals[i]) + elif isinstance(mp, VarianceMP): + result.append(variances[i]) - compiled_circuits = self.compile_circuits(circuits) + single_measurement = len(measurements) == 1 + result = (result[0],) if single_measurement else tuple(result) - if not compiled_circuits: - # At least one circuit must always be provided to the backend. - return [] + return result - # Send the batch of circuit objects using backend.run - self._current_job = self.backend.run(compiled_circuits, shots=self.shots, **self.run_args) + def generate_samples(self, circuit=None): + r"""Returns the computational basis samples generated for all wires. - try: - result = self._current_job.result(timeout=timeout) - except TypeError: # pragma: no cover - # timeout not supported - result = self._current_job.result() + Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where + :math:`q_0` is the most significant bit. - # increment counter for number of executions of qubit device - # pylint: disable=no-member - self._num_executions += 1 + Args: + circuit (int): position of the circuit in the batch. - # Compute statistics using the state and/or samples - results = [] - for circuit, circuit_obj in zip(circuits, compiled_circuits): - # Update the tracker - if self.tracker.active: - self.tracker.update(executions=1, shots=self.shots) - self.tracker.record() - - if self._is_state_backend: - self._state = self._get_state(result, experiment=circuit_obj) - - # generate computational basis samples - if self.shots is not None or any( - isinstance(m, SAMPLE_TYPES) for m in circuit.measurements - ): - self._samples = self.generate_samples(circuit_obj) - - res = self.statistics(circuit) - single_measurement = len(circuit.measurements) == 1 - res = res[0] if single_measurement else tuple(res) - results.append(res) - - if self.tracker.active: - self.tracker.update(batches=1, batch_len=len(circuits)) - self.tracker.record() - - return results + Returns: + array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)`` + """ + counts = self._current_job.get_counts() + # Batch of circuits + if not isinstance(counts, dict): + counts = self._current_job.get_counts()[circuit] + + samples = [] + for key, value in counts.items(): + samples.extend([key] * value) + return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples]) diff --git a/pennylane_qiskit/qiskit_device2.py b/pennylane_qiskit/qiskit_device2.py deleted file mode 100644 index ed4cf6a1..00000000 --- a/pennylane_qiskit/qiskit_device2.py +++ /dev/null @@ -1,669 +0,0 @@ -# Copyright 2019-2024 Xanadu Quantum Technologies Inc. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -r""" -This module contains a prototype base class for constructing Qiskit devices -for PennyLane with the new device API. -""" -# pylint: disable=too-many-instance-attributes,attribute-defined-outside-init, missing-function-docstring - - -import warnings -import inspect -from typing import Union, Callable, Tuple, Sequence -from contextlib import contextmanager - -import numpy as np -import pennylane as qml -from qiskit.compiler import transpile -from qiskit.providers import BackendV2 - -from qiskit_ibm_runtime import Session, SamplerV2 as Sampler, EstimatorV2 as Estimator - -from pennylane import transform -from pennylane.transforms.core import TransformProgram -from pennylane.transforms import broadcast_expand, split_non_commuting -from pennylane.tape import QuantumTape, QuantumScript -from pennylane.typing import Result, ResultBatch -from pennylane.devices import Device -from pennylane.devices.execution_config import ExecutionConfig, DefaultExecutionConfig -from pennylane.devices.preprocess import ( - decompose, - validate_observables, - validate_measurements, - validate_device_wires, -) -from pennylane.measurements import ExpectationMP, VarianceMP - -from ._version import __version__ -from .converter import QISKIT_OPERATION_MAP, circuit_to_qiskit, mp_to_pauli - -QuantumTapeBatch = Sequence[QuantumTape] -QuantumTape_or_Batch = Union[QuantumTape, QuantumTapeBatch] -Result_or_ResultBatch = Union[Result, ResultBatch] - - -# pylint: disable=protected-access -@contextmanager -def qiskit_session(device, **kwargs): - """A context manager that creates a Qiskit Session and sets it as a session - on the device while the context manager is active. Using the context manager - will ensure the Session closes properly and is removed from the device after - completing the tasks. Any Session that was initialized and passed into the - device will be overwritten by the Qiskit Session created by this context - manager. - - Args: - device (QiskitDevice2): the device that will create remote tasks using the session - **kwargs: session keyword arguments to be used for settings for the Session. At the - time of writing, the only relevant keyword argument is "max_time", which lets you - set the maximum amount of time the sessin is open. For the most up to date information, - please refer to the Qiskit Session - `documentation `_. - - **Example:** - - .. code-block:: python - - import pennylane as qml - from pennylane_qiskit import qiskit_session - from qiskit_ibm_runtime import QiskitRuntimeService - - # get backend - service = QiskitRuntimeService(channel="ibm_quantum") - backend = service.least_busy(simulator=False, operational=True) - - # initialize device - dev = qml.device('qiskit.remote', wires=2, backend=backend) - - @qml.qnode(dev) - def circuit(x): - qml.RX(x, 0) - qml.CNOT([0, 1]) - return qml.expval(qml.PauliZ(1)) - - angle = 0.1 - - with qiskit_session(dev, max_time=60) as session: - # queue for the first execution - res = circuit(angle)[0] - - # then this loop executes immediately after without queueing again - while res > 0: - angle += 0.3 - res = circuit(angle)[0] - - Note that if you passed in a session to your device, that session will be overwritten - by `qiskit_session`. - - .. code-block:: python - - import pennylane as qml - from pennylane_qiskit import qiskit_session - from qiskit_ibm_runtime import QiskitRuntimeService, Session - - # get backend - service = QiskitRuntimeService(channel="ibm_quantum") - backend = service.least_busy(simulator=False, operational=True) - - # initialize device - dev = qml.device('qiskit.remote', wires=2, backend=backend, session=Session(backend=backend, max_time=30)) - - @qml.qnode(dev) - def circuit(x): - qml.RX(x, 0) - qml.CNOT([0, 1]) - return qml.expval(qml.PauliZ(1)) - - angle = 0.1 - - # This session will have the Qiskit default settings max_time=900 - with qiskit_session(dev) as session: - res = circuit(angle)[0] - - while res > 0: - angle += 0.3 - res = circuit(angle)[0] - """ - # Code to acquire session: - existing_session = device._session - - session_options = {"backend": device.backend, "service": device.service} - - for k, v in kwargs.items(): - # Options like service and backend should be tied to the settings set on device - if k in session_options: - warnings.warn(f"Using '{k}' set in device, {getattr(device, k)}", UserWarning) - else: - session_options[k] = v - - session = Session(**session_options) - device._session = session - try: - yield session - finally: - # Code to release session: - session.close() - device._session = existing_session - - -def accepted_sample_measurement(m: qml.measurements.MeasurementProcess) -> bool: - """Specifies whether or not a measurement is accepted when sampling.""" - - return isinstance( - m, - ( - qml.measurements.SampleMeasurement, - qml.measurements.ClassicalShadowMP, - qml.measurements.ShadowExpvalMP, - ), - ) - - -@transform -def split_execution_types( - tape: qml.tape.QuantumTape, -) -> (Sequence[qml.tape.QuantumTape], Callable): - """Split into separate tapes based on measurement type. Counts and sample-based measurements - will use the Qiskit Sampler. ExpectationValue and Variance will use the Estimator, except - when the measured observable does not have a `pauli_rep`. In that case, the Sampler will be - used, and the raw samples will be processed to give an expectation value.""" - estimator = [] - sampler = [] - - for i, mp in enumerate(tape.measurements): - if isinstance(mp, (ExpectationMP, VarianceMP)): - if mp.obs.pauli_rep: - estimator.append((mp, i)) - else: - warnings.warn( - f"The observable measured {mp.obs} does not have a `pauli_rep` " - "and will be run without using the Estimator primitive. Instead, " - "raw samples from the Sampler will be used." - ) - sampler.append((mp, i)) - else: - sampler.append((mp, i)) - - order_indices = [[i for mp, i in group] for group in [estimator, sampler]] - - tapes = [] - if estimator: - tapes.extend( - [ - qml.tape.QuantumScript( - tape.operations, - measurements=[mp for mp, i in estimator], - shots=tape.shots, - ) - ] - ) - if sampler: - tapes.extend( - [ - qml.tape.QuantumScript( - tape.operations, - measurements=[mp for mp, i in sampler], - shots=tape.shots, - ) - ] - ) - - def reorder_fn(res): - """re-order the output to the original shape and order""" - - flattened_indices = [i for group in order_indices for i in group] - flattened_results = [r for group in res for r in group] - - result = dict(zip(flattened_indices, flattened_results)) - - result = tuple(result[i] for i in sorted(result.keys())) - - return result[0] if len(result) == 1 else result - - return tapes, reorder_fn - - -class QiskitDevice2(Device): - r"""Hardware/simulator Qiskit device for PennyLane. - - Args: - wires (int or Iterable[Number, str]]): Number of subsystems represented by the device, - or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``) - or strings (``['aux_wire', 'q1', 'q2']``). - backend (Backend): the initialized Qiskit backend - - Keyword Args: - shots (int or None): number of circuit evaluations/random samples used - to estimate expectation values and variances of observables. - session (Session): a Qiskit Session to use for device execution. If none is provided, a session will - be created at each device execution. - compile_backend (Union[Backend, None]): the backend to be used for compiling the circuit that will be - sent to the backend device, to be set if the backend desired for compliation differs from the - backend used for execution. Defaults to ``None``, which means the primary backend will be used. - **kwargs: transpilation and runtime keyword arguments to be used for measurements with Primitives. - If an `options` dictionary is defined amongst the kwargs, and there are settings that overlap - with those in kwargs, the settings in `options` will take precedence over kwargs. Keyword - arguments accepted by both the transpiler and at runtime (e.g. ``optimization_level``) - will be passed to the transpiler rather than to the Primitive. - """ - - operations = set(QISKIT_OPERATION_MAP.keys()) - observables = { - "PauliX", - "PauliY", - "PauliZ", - "Identity", - "Hadamard", - "Hermitian", - "Projector", - "Prod", - "Sum", - "LinearCombination", - "SProd", - # TODO Could support SparseHamiltonian - } - - # pylint:disable = too-many-arguments - def __init__( - self, - wires, - backend, - shots=1024, - session=None, - compile_backend=None, - **kwargs, - ): - - if shots is None: - warnings.warn( - "Expected an integer number of shots, but received shots=None. Defaulting " - "to 1024 shots. The analytic calculation of results is not supported on " - "this device. All statistics obtained from this device are estimates based " - "on samples.", - UserWarning, - ) - - shots = 1024 - - super().__init__(wires=wires, shots=shots) - - self._backend = backend - self._compile_backend = compile_backend if compile_backend else backend - - self._service = getattr(backend, "_service", None) - self._session = session - - kwargs["shots"] = shots - - # Perform validation against backend - available_qubits = ( - backend.num_qubits - if isinstance(backend, BackendV2) - else backend.configuration().n_qubits - ) - if len(self.wires) > int(available_qubits): - raise ValueError(f"Backend '{backend}' supports maximum {available_qubits} wires") - - self.reset() - self._kwargs, self._transpile_args = self._process_kwargs( - kwargs - ) # processes kwargs and separates transpilation arguments to dev._transpile_args - - @property - def backend(self): - """The Qiskit backend object. - - Returns: - qiskit.providers.Backend: Qiskit backend object. - """ - return self._backend - - @property - def compile_backend(self): - """The ``compile_backend`` is a Qiskit backend object to be used for transpilation. - Returns: - qiskit.providers.backend: Qiskit backend object. - """ - return self._compile_backend - - @property - def service(self): - """The QiskitRuntimeService service. - - Returns: - qiskit.qiskit_ibm_runtime.QiskitRuntimeService - """ - return self._service - - @property - def session(self): - """The QiskitRuntimeService session. - - Returns: - qiskit.qiskit_ibm_runtime.Session - """ - return self._session - - @property - def num_wires(self): - return len(self.wires) - - def update_session(self, session): - self._session = session - - def reset(self): - self._current_job = None - - def stopping_condition(self, op: qml.operation.Operator) -> bool: - """Specifies whether or not an Operator is accepted by QiskitDevice2.""" - return op.name in self.operations - - def observable_stopping_condition(self, obs: qml.operation.Operator) -> bool: - """Specifies whether or not an observable is accepted by QiskitDevice2.""" - return obs.name in self.observables - - def preprocess( - self, - execution_config: ExecutionConfig = DefaultExecutionConfig, - ) -> Tuple[TransformProgram, ExecutionConfig]: - """This function defines the device transform program to be applied and an updated device configuration. - - Args: - execution_config (Union[ExecutionConfig, Sequence[ExecutionConfig]]): A data structure describing the - parameters needed to fully describe the execution. - - Returns: - TransformProgram, ExecutionConfig: A transform program that when called returns QuantumTapes that the device - can natively execute as well as a postprocessing function to be called after execution, and a configuration with - unset specifications filled in. - - This device: - - * Supports any operations with explicit PennyLane to Qiskit gate conversions defined in the plugin - * Does not intrinsically support parameter broadcasting - - """ - config = execution_config - config.use_device_gradient = False - - transform_program = TransformProgram() - - transform_program.add_transform(validate_device_wires, self.wires, name=self.name) - transform_program.add_transform( - decompose, - stopping_condition=self.stopping_condition, - name=self.name, - skip_initial_state_prep=False, - ) - transform_program.add_transform( - validate_measurements, - sample_measurements=accepted_sample_measurement, - name=self.name, - ) - transform_program.add_transform( - validate_observables, - stopping_condition=self.observable_stopping_condition, - name=self.name, - ) - - transform_program.add_transform(broadcast_expand) - transform_program.add_transform(split_non_commuting) - - transform_program.add_transform(split_execution_types) - - return transform_program, config - - def _process_kwargs(self, kwargs): - """Processes kwargs given and separates them into kwargs and transpile_args. If given - a keyword argument 'options' that is a dictionary, a common practice in - Qiskit, the options in said dictionary take precedence over any overlapping keyword - arguments defined in the kwargs. - - Keyword Args: - kwargs (dict): keyword arguments that set either runtime options or transpilation - options. - - Returns: - kwargs, transpile_args: keyword arguments for the runtime options and keyword - arguments for the transpiler - """ - - if "noise_model" in kwargs: - noise_model = kwargs.pop("noise_model") - self.backend.set_options(noise_model=noise_model) - - if "options" in kwargs: - for key, val in kwargs.pop("options").items(): - if key in kwargs: - warnings.warn( - "An overlap between what was passed in via options and what was passed in via kwargs was found." - f"The value set in options {key}={val} will be used." - ) - kwargs[key] = val - - shots = kwargs.pop("shots") - - if "default_shots" in kwargs: - warnings.warn( - f"default_shots was found in the keyword arguments, but it is not supported by {self.name}" - "Please use the `shots` keyword argument instead. The number of shots " - f"{shots} will be used instead." - ) - kwargs["default_shots"] = shots - - kwargs, transpile_args = self.get_transpile_args(kwargs) - - return kwargs, transpile_args - - @staticmethod - def get_transpile_args(kwargs): - """The transpile argument setter. This separates keyword arguments related to transpilation - from the rest of the keyword arguments and removes those keyword arguments from kwargs. - - Keyword Args: - kwargs (dict): combined keyword arguments to be parsed for the Qiskit transpiler. For more details, see the - `Qiskit transpiler documentation `_ - - Returns: - kwargs (dict), transpile_args (dict): keyword arguments for the runtime options and keyword - arguments for the transpiler - """ - - transpile_sig = inspect.signature(transpile).parameters - - transpile_args = {arg: kwargs.pop(arg) for arg in transpile_sig if arg in kwargs} - transpile_args.pop("circuits", None) - transpile_args.pop("backend", None) - - return kwargs, transpile_args - - def compile_circuits(self, circuits): - """Compiles multiple circuits one after the other. - - Args: - circuits (list[QuantumCircuit]): the circuits to be compiled - - Returns: - list[QuantumCircuit]: the list of compiled circuits - """ - # Compile each circuit object - compiled_circuits = [] - transpile_args = self._transpile_args - - for i, circuit in enumerate(circuits): - compiled_circ = transpile(circuit, backend=self.compile_backend, **transpile_args) - compiled_circ.name = f"circ{i}" - compiled_circuits.append(compiled_circ) - - return compiled_circuits - - # pylint: disable=unused-argument, no-member - def execute( - self, - circuits: QuantumTape_or_Batch, - execution_config: ExecutionConfig = DefaultExecutionConfig, - ) -> Result_or_ResultBatch: - session = self._session or Session(backend=self.backend) - - results = [] - - if isinstance(circuits, QuantumScript): - circuits = [circuits] - - @contextmanager - def execute_circuits(session): - try: - for circ in circuits: - if circ.shots and len(circ.shots.shot_vector) > 1: - raise ValueError( - f"Setting shot vector {circ.shots.shot_vector} is not supported for {self.name}." - "Please use a single integer instead when specifying the number of shots." - ) - if isinstance(circ.measurements[0], (ExpectationMP, VarianceMP)) and getattr( - circ.measurements[0].obs, "pauli_rep", None - ): - execute_fn = self._execute_estimator - else: - execute_fn = self._execute_sampler - results.append(execute_fn(circ, session)) - yield results - finally: - session.close() - - with execute_circuits(session) as results: - return results - - def _execute_sampler(self, circuit, session): - """Returns the result of the execution of the circuit using the SamplerV2 Primitive. - Note that this result has been processed respective to the MeasurementProcess given. - E.g. `qml.expval` returns an expectation value whereas `qml.sample()` will return the raw samples. - - Args: - circuits (list[QuantumCircuit]): the circuits to be executed via SamplerV2 - session (Session): the session that the execution will be performed with - - Returns: - result (tuple): the processed result from SamplerV2 - """ - qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=True, measure=True)] - sampler = Sampler(session=session) - compiled_circuits = self.compile_circuits(qcirc) - sampler.options.update(**self._kwargs) - - # len(compiled_circuits) is always 1 so the indexing does not matter. - result = sampler.run( - compiled_circuits, - shots=circuit.shots.total_shots if circuit.shots.total_shots else None, - ).result()[0] - classical_register_name = compiled_circuits[0].cregs[0].name - self._current_job = getattr(result.data, classical_register_name) - - # needs processing function to convert to the correct format for states, and - # also handle instances where wires were specified in probs, and for multiple probs measurements - - self._samples = self.generate_samples(0) - res = [ - mp.process_samples(self._samples, wire_order=self.wires) for mp in circuit.measurements - ] - - single_measurement = len(circuit.measurements) == 1 - res = (res[0],) if single_measurement else tuple(res) - - return res - - def _execute_estimator(self, circuit, session): - """Returns the result of the execution of the circuit using the EstimatorV2 Primitive. - Note that this result has been processed respective to the MeasurementProcess given. - E.g. `qml.expval` returns an expectation value whereas `qml.var` will return the variance. - - Args: - circuits (list[QuantumCircuit]): the circuits to be executed via EstimatorV2 - session (Session): the session that the execution will be performed with - - Returns: - result (tuple): the processed result from EstimatorV2 - """ - # the Estimator primitive takes care of diagonalization and measurements itself, - # so diagonalizing gates and measurements are not included in the circuit - qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=False, measure=False)] - estimator = Estimator(session=session) - - pauli_observables = [mp_to_pauli(mp, self.num_wires) for mp in circuit.measurements] - compiled_circuits = self.compile_circuits(qcirc) - estimator.options.update(**self._kwargs) - # split into one call per measurement - # could technically be more efficient if there are some observables where we ask - # for expectation value and variance on the same observable, but spending time on - # that right now feels excessive - circ_and_obs = [(compiled_circuits[0], pauli_observables)] - result = estimator.run( - circ_and_obs, - precision=np.sqrt(1 / circuit.shots.total_shots) if circuit.shots else None, - ).result() - self._current_job = result - result = self._process_estimator_job(circuit.measurements, result) - - return result - - @staticmethod - def _process_estimator_job(measurements, job_result): - """Estimator returns the expectation value and standard error for each observable measured, - along with some metadata that contains the precision. Extracts the relevant number for each - measurement process and return the requested results from the Estimator executions. - - Note that for variance, we calculate the variance by using the standard error and the - precision value. - - Args: - measurements (list[MeasurementProcess]): the measurements in the circuit - job_result (Any): the result from EstimatorV2 - - Returns: - result (tuple): the processed result from EstimatorV2 - """ - expvals = job_result[0].data.evs - variances = (job_result[0].data.stds / job_result[0].metadata["target_precision"]) ** 2 - result = [] - for i, mp in enumerate(measurements): - if isinstance(mp, ExpectationMP): - result.append(expvals[i]) - elif isinstance(mp, VarianceMP): - result.append(variances[i]) - - single_measurement = len(measurements) == 1 - result = (result[0],) if single_measurement else tuple(result) - - return result - - def generate_samples(self, circuit=None): - r"""Returns the computational basis samples generated for all wires. - - Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where - :math:`q_0` is the most significant bit. - - Args: - circuit (int): position of the circuit in the batch. - - Returns: - array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)`` - """ - counts = self._current_job.get_counts() - # Batch of circuits - if not isinstance(counts, dict): - counts = self._current_job.get_counts()[circuit] - - samples = [] - for key, value in counts.items(): - samples.extend([key] * value) - return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples]) diff --git a/pennylane_qiskit/qiskit_device_legacy.py b/pennylane_qiskit/qiskit_device_legacy.py new file mode 100644 index 00000000..d44a17d7 --- /dev/null +++ b/pennylane_qiskit/qiskit_device_legacy.py @@ -0,0 +1,491 @@ +# Copyright 2019-2021 Xanadu Quantum Technologies Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +r""" +This module contains a base class for constructing Qiskit devices for PennyLane. +""" +# pylint: disable=too-many-instance-attributes,attribute-defined-outside-init + + +import abc +import inspect +import warnings + +import numpy as np +from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister +from qiskit.compiler import transpile +from qiskit.converters import circuit_to_dag, dag_to_circuit +from qiskit.providers import Backend, BackendV2, QiskitBackendNotFoundError + +from pennylane import QubitDevice, DeviceError +from pennylane.measurements import SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP + +from .converter import QISKIT_OPERATION_MAP +from ._version import __version__ + +SAMPLE_TYPES = (SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP) + + +def _get_backend_name(backend): + try: + return backend.name() # BackendV1 + except TypeError: # pragma: no cover + return backend.name # BackendV2 + + +class QiskitDeviceLegacy(QubitDevice, abc.ABC): + r"""Abstract Qiskit device for PennyLane. + + Args: + wires (int or Iterable[Number, str]]): Number of subsystems represented by the device, + or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``) + or strings (``['ancilla', 'q1', 'q2']``). + provider (Provider | None): The Qiskit backend provider. + backend (str | Backend): the desired backend. If a string, a provider must be given. + shots (int or None): number of circuit evaluations/random samples used + to estimate expectation values and variances of observables. For state vector backends, + setting to ``None`` results in computing statistics like expectation values and variances analytically. + + Keyword Args: + name (str): The name of the circuit. Default ``'circuit'``. + compile_backend (BaseBackend): The backend used for compilation. If you wish + to simulate a device compliant circuit, you can specify a backend here. + """ + + name = "Qiskit PennyLane plugin" + pennylane_requires = ">=0.30.0" + version = __version__ + plugin_version = __version__ + author = "Xanadu" + + _capabilities = { + "model": "qubit", + "tensor_observables": True, + "inverse_operations": True, + } + _operation_map = QISKIT_OPERATION_MAP + _state_backends = { + "statevector_simulator", + "simulator_statevector", + "unitary_simulator", + "aer_simulator_statevector", + "aer_simulator_unitary", + } + """set[str]: Set of backend names that define the backends + that support returning the underlying quantum statevector""" + + operations = set(_operation_map.keys()) + observables = { + "PauliX", + "PauliY", + "PauliZ", + "Identity", + "Hadamard", + "Hermitian", + "Projector", + } + + analytic_warning_message = ( + "The analytic calculation of expectations, variances and " + "probabilities is only supported on statevector backends, not on the {}. " + "Such statistics obtained from this device are estimates based on samples." + ) + + _eigs = {} + + def __init__(self, wires, provider, backend, shots=1024, **kwargs): + + super().__init__(wires=wires, shots=shots) + + self.provider = provider + + if isinstance(backend, Backend): + self._backend = backend + self.backend_name = _get_backend_name(backend) + elif provider is None: + raise ValueError("Must pass a provider if the backend is not a Backend instance.") + else: + try: + self._backend = provider.get_backend(backend) + except QiskitBackendNotFoundError as e: + available_backends = list(map(_get_backend_name, provider.backends())) + raise ValueError( + f"Backend '{backend}' does not exist. Available backends " + f"are:\n {available_backends}" + ) from e + + self.backend_name = _get_backend_name(self._backend) + + # Keep track if the user specified analytic to be True + if shots is None and not self._is_state_backend: + # Raise a warning if no shots were specified for a hardware device + warnings.warn(self.analytic_warning_message.format(backend), UserWarning) + + self.shots = 1024 + + self._capabilities["returns_state"] = self._is_state_backend + + # Perform validation against backend + backend_qubits = ( + backend.num_qubits + if isinstance(backend, BackendV2) + else self.backend.configuration().n_qubits + ) + if backend_qubits and len(self.wires) > int(backend_qubits): + raise ValueError(f"Backend '{backend}' supports maximum {backend_qubits} wires") + + # Initialize inner state + self.reset() + + self.process_kwargs(kwargs) + + def process_kwargs(self, kwargs): + """Processing the keyword arguments that were provided upon device initialization. + + Args: + kwargs (dict): keyword arguments to be set for the device + """ + self.compile_backend = None + if "compile_backend" in kwargs: + self.compile_backend = kwargs.pop("compile_backend") + + if "noise_model" in kwargs: + noise_model = kwargs.pop("noise_model") + self.backend.set_options(noise_model=noise_model) + + # set transpile_args + self.set_transpile_args(**kwargs) + + # Get further arguments for run + self.run_args = {} + + # Specify to have a memory for hw/hw simulators + compile_backend = self.compile_backend or self.backend + memory = str(compile_backend) not in self._state_backends + + if memory: + kwargs["memory"] = True + + # Consider the remaining kwargs as keyword arguments to run + self.run_args.update(kwargs) + + @property + def _is_state_backend(self): + """Returns whether this device has a state backend.""" + return self.backend_name in self._state_backends or self.backend.options.get("method") in { + "unitary", + "statevector", + } + + @property + def _is_statevector_backend(self): + """Returns whether this device has a statevector backend.""" + method = "statevector" + return method in self.backend_name or self.backend.options.get("method") == method + + @property + def _is_unitary_backend(self): + """Returns whether this device has a unitary backend.""" + method = "unitary" + return method in self.backend_name or self.backend.options.get("method") == method + + def set_transpile_args(self, **kwargs): + """The transpile argument setter. + + Keyword Args: + kwargs (dict): keyword arguments to be set for the Qiskit transpiler. For more details, see the + `Qiskit transpiler documentation `_ + """ + transpile_sig = inspect.signature(transpile).parameters + self.transpile_args = {arg: kwargs[arg] for arg in transpile_sig if arg in kwargs} + self.transpile_args.pop("circuits", None) + self.transpile_args.pop("backend", None) + + @property + def backend(self): + """The Qiskit backend object. + + Returns: + qiskit.providers.backend: Qiskit backend object. + """ + return self._backend + + def reset(self): + """Reset the Qiskit backend device""" + # Reset only internal data, not the options that are determined on + # device creation + self._reg = QuantumRegister(self.num_wires, "q") + self._creg = ClassicalRegister(self.num_wires, "c") + self._circuit = QuantumCircuit(self._reg, self._creg, name="temp") + + self._current_job = None + self._state = None # statevector of a simulator backend + + def create_circuit_object(self, operations, **kwargs): + """Builds the circuit objects based on the operations and measurements + specified to apply. + + Args: + operations (list[~.Operation]): operations to apply to the device + + Keyword args: + rotations (list[~.Operation]): Operations that rotate the circuit + pre-measurement into the eigenbasis of the observables. + """ + rotations = kwargs.get("rotations", []) + + applied_operations = self.apply_operations(operations) + + # Rotating the state for measurement in the computational basis + rotation_circuits = self.apply_operations(rotations) + applied_operations.extend(rotation_circuits) + + for circuit in applied_operations: + self._circuit &= circuit + + if not self._is_state_backend: + # Add measurements if they are needed + for qr, cr in zip(self._reg, self._creg): + self._circuit.measure(qr, cr) + elif "aer" in self.backend_name: + self._circuit.save_state() + + def apply(self, operations, **kwargs): + """Build the circuit object and apply the operations""" + self.create_circuit_object(operations, **kwargs) + + # These operations need to run for all devices + compiled_circuit = self.compile() + self.run(compiled_circuit) + + def apply_operations(self, operations): + """Apply the circuit operations. + + This method serves as an auxiliary method to :meth:`~.QiskitDevice.apply`. + + Args: + operations (List[pennylane.Operation]): operations to be applied + + Returns: + list[QuantumCircuit]: a list of quantum circuit objects that + specify the corresponding operations + """ + circuits = [] + + for operation in operations: + # Apply the circuit operations + device_wires = self.map_wires(operation.wires) + par = operation.parameters + + for idx, p in enumerate(par): + if isinstance(p, np.ndarray): + # Convert arrays so that Qiskit accepts the parameter + par[idx] = p.tolist() + + operation = operation.name + + mapped_operation = self._operation_map[operation] + + self.qubit_state_vector_check(operation) + + qregs = [self._reg[i] for i in device_wires.labels] + + if operation in ("QubitUnitary", "QubitStateVector", "StatePrep"): + # Need to revert the order of the quantum registers used in + # Qiskit such that it matches the PennyLane ordering + qregs = list(reversed(qregs)) + + if operation in ("Barrier",): + # Need to add the num_qubits for instantiating Barrier in Qiskit + par = [len(self._reg)] + + dag = circuit_to_dag(QuantumCircuit(self._reg, self._creg, name="")) + + gate = mapped_operation(*par) + + dag.apply_operation_back(gate, qargs=qregs) + circuit = dag_to_circuit(dag) + circuits.append(circuit) + + return circuits + + def qubit_state_vector_check(self, operation): + """Input check for the StatePrepBase operations. + + Args: + operation (pennylane.Operation): operation to be checked + + Raises: + DeviceError: If the operation is QubitStateVector or StatePrep + """ + if operation in ("QubitStateVector", "StatePrep"): + if self._is_unitary_backend: + raise DeviceError( + f"The {operation} operation " + "is not supported on the unitary simulator backend." + ) + + def compile(self): + """Compile the quantum circuit to target the provided compile_backend. + + If compile_backend is None, then the target is simply the + backend. + """ + compile_backend = self.compile_backend or self.backend + compiled_circuits = transpile(self._circuit, backend=compile_backend, **self.transpile_args) + return compiled_circuits + + def run(self, qcirc): + """Run the compiled circuit and query the result. + + Args: + qcirc (qiskit.QuantumCircuit): the quantum circuit to be run on the backend + """ + self._current_job = self.backend.run(qcirc, shots=self.shots, **self.run_args) + result = self._current_job.result() + + if self._is_state_backend: + self._state = self._get_state(result) + + def _get_state(self, result, experiment=None): + """Returns the statevector for state simulator backends. + + Args: + result (qiskit.Result): result object + experiment (str or None): the name of the experiment to get the state for. + + Returns: + array[float]: size ``(2**num_wires,)`` statevector + """ + if self._is_statevector_backend: + state = np.asarray(result.get_statevector(experiment)) + + elif self._is_unitary_backend: + unitary = np.asarray(result.get_unitary(experiment)) + initial_state = np.zeros([2**self.num_wires]) + initial_state[0] = 1 + + state = unitary @ initial_state + + # reverse qubit order to match PennyLane convention + return state.reshape([2] * self.num_wires).T.flatten() + + def generate_samples(self, circuit=None): + r"""Returns the computational basis samples generated for all wires. + + Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where + :math:`q_0` is the most significant bit. + + Args: + circuit (str or None): the name of the circuit to get the state for + + Returns: + array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)`` + """ + + # branch out depending on the type of backend + if self._is_state_backend: + # software simulator: need to sample from probabilities + return super().generate_samples() + + # hardware or hardware simulator + samples = self._current_job.result().get_memory(circuit) + # reverse qubit order to match PennyLane convention + return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples]) + + @property + def state(self): + """Get state of the device""" + return self._state + + def analytic_probability(self, wires=None): + """Get the analytic probability of the device""" + if self._state is None: + return None + + prob = self.marginal_prob(np.abs(self._state) ** 2, wires) + return prob + + def compile_circuits(self, circuits): + r"""Compiles multiple circuits one after the other. + + Args: + circuits (list[.tapes.QuantumTape]): the circuits to be compiled + + Returns: + list[QuantumCircuit]: the list of compiled circuits + """ + # Compile each circuit object + compiled_circuits = [] + + for circuit in circuits: + # We need to reset the device here, else it will + # not start the next computation in the zero state + self.reset() + self.create_circuit_object(circuit.operations, rotations=circuit.diagonalizing_gates) + + compiled_circ = self.compile() + compiled_circ.name = f"circ{len(compiled_circuits)}" + compiled_circuits.append(compiled_circ) + + return compiled_circuits + + def batch_execute(self, circuits, timeout: int = None): + """Batch execute the circuits on the device""" + + compiled_circuits = self.compile_circuits(circuits) + + if not compiled_circuits: + # At least one circuit must always be provided to the backend. + return [] + + # Send the batch of circuit objects using backend.run + self._current_job = self.backend.run(compiled_circuits, shots=self.shots, **self.run_args) + + try: + result = self._current_job.result(timeout=timeout) + except TypeError: # pragma: no cover + # timeout not supported + result = self._current_job.result() + + # increment counter for number of executions of qubit device + # pylint: disable=no-member + self._num_executions += 1 + + # Compute statistics using the state and/or samples + results = [] + for circuit, circuit_obj in zip(circuits, compiled_circuits): + # Update the tracker + if self.tracker.active: + self.tracker.update(executions=1, shots=self.shots) + self.tracker.record() + + if self._is_state_backend: + self._state = self._get_state(result, experiment=circuit_obj) + + # generate computational basis samples + if self.shots is not None or any( + isinstance(m, SAMPLE_TYPES) for m in circuit.measurements + ): + self._samples = self.generate_samples(circuit_obj) + + res = self.statistics(circuit) + single_measurement = len(circuit.measurements) == 1 + res = res[0] if single_measurement else tuple(res) + results.append(res) + + if self.tracker.active: + self.tracker.update(batches=1, batch_len=len(circuits)) + self.tracker.record() + + return results diff --git a/tests/test_base_device.py b/tests/test_base_device.py index 2fd3c2fe..d958fda8 100644 --- a/tests/test_base_device.py +++ b/tests/test_base_device.py @@ -34,8 +34,8 @@ from qiskit.providers import BackendV1, BackendV2 from qiskit import QuantumCircuit, transpile -from pennylane_qiskit.qiskit_device2 import ( - QiskitDevice2, +from pennylane_qiskit.qiskit_device import ( + QiskitDevice, qiskit_session, split_execution_types, ) @@ -117,7 +117,7 @@ def close(self): # This is just to appease a test mocked_backend = MockedBackend() legacy_backend = MockedBackendLegacy() aer_backend = AerSimulator() -test_dev = QiskitDevice2(wires=5, backend=aer_backend) +test_dev = QiskitDevice(wires=5, backend=aer_backend) class TestSupportForV1andV2: @@ -129,7 +129,7 @@ class TestSupportForV1andV2: ) def test_v1_and_v2_mocked(self, backend): """Test that device initializes with no error mocked""" - dev = QiskitDevice2(wires=10, backend=backend) + dev = QiskitDevice(wires=10, backend=backend) assert dev._backend == backend @pytest.mark.parametrize( @@ -141,7 +141,7 @@ def test_v1_and_v2_mocked(self, backend): ) def test_v1_and_v2_manila(self, backend, shape): """Test that device initializes and runs without error with V1 and V2 backends by Qiskit""" - dev = QiskitDevice2(wires=5, backend=backend) + dev = QiskitDevice(wires=5, backend=backend) @qml.qnode(dev) def circuit(x): @@ -163,8 +163,8 @@ def test_compile_backend_kwarg(self): compile_backend = MockedBackend(name="compile_backend") main_backend = MockedBackend(name="main_backend") - dev1 = QiskitDevice2(wires=5, backend=main_backend) - dev2 = QiskitDevice2(wires=5, backend=main_backend, compile_backend=compile_backend) + dev1 = QiskitDevice(wires=5, backend=main_backend) + dev2 = QiskitDevice(wires=5, backend=main_backend, compile_backend=compile_backend) assert dev1._compile_backend == dev1._backend == main_backend @@ -179,7 +179,7 @@ def test_no_shots_warns_and_defaults(self): UserWarning, match="Expected an integer number of shots, but received shots=None", ): - dev = QiskitDevice2(wires=2, backend=aer_backend, shots=None) + dev = QiskitDevice(wires=2, backend=aer_backend, shots=None) assert dev.shots.total_shots == 1024 @@ -189,15 +189,15 @@ def test_backend_wire_validation(self, backend): the number of wires available on the backend, for both backend versions""" with pytest.raises(ValueError, match="supports maximum"): - QiskitDevice2(wires=500, backend=backend) + QiskitDevice(wires=500, backend=backend) def test_setting_simulator_noise_model(self): """Test that the simulator noise model saved on a passed Options object is used to set the backend noise model""" new_backend = MockedBackend() - dev1 = QiskitDevice2(wires=3, backend=aer_backend) - dev2 = QiskitDevice2(wires=3, backend=new_backend, noise_model={"placeholder": 1}) + dev1 = QiskitDevice(wires=3, backend=aer_backend) + dev2 = QiskitDevice(wires=3, backend=new_backend, noise_model={"placeholder": 1}) assert dev1.backend.options.noise_model is None assert dev2.backend.options.noise_model == {"placeholder": 1} @@ -210,7 +210,7 @@ class TestQiskitSessionManagement: def test_default_no_session_on_initialization(self, backend): """Test that the default behaviour is no session at initialization""" - dev = QiskitDevice2(wires=2, backend=backend) + dev = QiskitDevice(wires=2, backend=backend) assert dev._session is None @pytest.mark.parametrize("backend", [aer_backend, FakeManila(), FakeManilaV2()]) @@ -218,15 +218,15 @@ def test_initializing_with_session(self, backend): """Test that you can initialize a device with an existing Qiskit session""" session = MockSession(backend=backend, max_time="1m") - dev = QiskitDevice2(wires=2, backend=backend, session=session) + dev = QiskitDevice(wires=2, backend=backend, session=session) assert dev._session == session - @patch("pennylane_qiskit.qiskit_device2.Session") + @patch("pennylane_qiskit.qiskit_device.Session") @pytest.mark.parametrize("initial_session", [None, MockSession(aer_backend)]) def test_using_session_context(self, mock_session, initial_session): """Test that you can add a session within a context manager""" - dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session) assert dev._session == initial_session @@ -238,7 +238,7 @@ def test_using_session_context(self, mock_session, initial_session): def test_using_session_context_options(self): """Test that you can set session options using qiskit_session""" - dev = QiskitDevice2(wires=2, backend=aer_backend) + dev = QiskitDevice(wires=2, backend=aer_backend) assert dev._session is None @@ -254,7 +254,7 @@ def test_error_when_passing_unexpected_kwarg(self): Qiskit allows for more customization we can automatically accomodate those needs. Right now there are no such keyword arguments, so an error on Qiskit's side is raised.""" - dev = QiskitDevice2(wires=2, backend=aer_backend) + dev = QiskitDevice(wires=2, backend=aer_backend) assert dev._session is None @@ -269,7 +269,7 @@ def test_error_when_passing_unexpected_kwarg(self): def test_no_warning_when_using_initial_session_options(self): initial_session = Session(backend=aer_backend, max_time=30) - dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session) assert dev._session == initial_session @@ -288,7 +288,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder): default options, passed in from the `qiskit_session` take precedence, barring `backend` or `service`""" initial_session = Session(backend=aer_backend) - dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session) assert dev._session == initial_session @@ -314,7 +314,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder): assert dev._session == initial_session max_time_session = Session(backend=aer_backend, max_time=60) - dev = QiskitDevice2(wires=2, backend=aer_backend, session=max_time_session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=max_time_session) with qiskit_session(dev, max_time=30) as session: assert dev._session == session assert dev._session != initial_session @@ -328,7 +328,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder): def test_update_session(self, initial_session): """Test that you can update the session stored on the device""" - dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session) assert dev._session == initial_session new_session = MockSession(backend=aer_backend, max_time="1m") @@ -519,7 +519,7 @@ def test_observable_stopping_condition(self, obs, expected): def test_preprocess_split_non_commuting(self, measurements, num_tapes): """Test that `split_non_commuting` works as expected in the preprocess function.""" - dev = QiskitDevice2(wires=5, backend=aer_backend) + dev = QiskitDevice(wires=5, backend=aer_backend) qs = QuantumScript([], measurements=measurements, shots=qml.measurements.Shots(1000)) program, _ = dev.preprocess() @@ -542,7 +542,7 @@ def test_preprocess_splits_incompatible_primitive_measurements(self, measurement on measurement type. Expval and Variance are one type (Estimator), Probs and raw-sample based measurements are another type (Sampler).""" - dev = QiskitDevice2(wires=5, backend=aer_backend) + dev = QiskitDevice(wires=5, backend=aer_backend) qs = QuantumScript([], measurements=measurements, shots=qml.measurements.Shots(1000)) program, _ = dev.preprocess() @@ -591,20 +591,20 @@ def test_warning_if_shots(self): UserWarning, match="default_shots was found in the keyword arguments", ): - dev = QiskitDevice2(wires=2, backend=aer_backend, default_shots=333) + dev = QiskitDevice(wires=2, backend=aer_backend, default_shots=333) # Qiskit takes in `default_shots` to define the # of shots, therefore we use # the kwarg "default_shots" rather than shots to pass it to Qiskit. assert dev._kwargs["default_shots"] == 1024 - dev = QiskitDevice2(wires=2, backend=aer_backend, shots=200) + dev = QiskitDevice(wires=2, backend=aer_backend, shots=200) assert dev._kwargs["default_shots"] == 200 with pytest.warns( UserWarning, match="default_shots was found in the keyword arguments", ): - dev = QiskitDevice2(wires=2, backend=aer_backend, options={"default_shots": 30}) + dev = QiskitDevice(wires=2, backend=aer_backend, options={"default_shots": 30}) # resets to default since we reinitialize the device assert dev._kwargs["default_shots"] == 1024 @@ -615,7 +615,7 @@ def test_warning_if_options_and_kwargs_overlap(self): UserWarning, match="An overlap between", ): - dev = QiskitDevice2( + dev = QiskitDevice( wires=2, backend=aer_backend, options={"resilience_level": 1, "optimization_level": 1}, @@ -642,7 +642,7 @@ def test_options_and_kwargs_combine_into_unified_kwargs(self, backend): """Test that options set via the keyword argument options and options set via kwargs will combine into a single unified kwargs that is passed to the device""" - dev = QiskitDevice2( + dev = QiskitDevice( wires=5, backend=backend, options={"resilience_level": 1}, @@ -666,7 +666,7 @@ def test_no_error_is_raised_if_transpilation_options_are_passed(self, backend): """Tests that when transpilation options are passed in, they are properly handled without error""" - dev = QiskitDevice2( + dev = QiskitDevice( wires=5, backend=backend, options={"resilience_level": 1, "optimization_level": 1}, @@ -692,7 +692,7 @@ def circuit(): class TestDeviceProperties: def test_name_property(self): """Test the backend property""" - assert test_dev.name == "QiskitDevice2" + assert test_dev.name == "QiskitDevice" def test_backend_property(self): """Test the backend property""" @@ -704,7 +704,7 @@ def test_compile_backend_property(self, backend): """Test the compile_backend property""" compile_backend = MockedBackend(name="compile_backend") - dev = QiskitDevice2(wires=5, backend=backend, compile_backend=compile_backend) + dev = QiskitDevice(wires=5, backend=backend, compile_backend=compile_backend) assert dev.compile_backend == dev._compile_backend assert dev.compile_backend == compile_backend @@ -717,7 +717,7 @@ def test_session_property(self): """Test the session property""" session = MockSession(backend=aer_backend) - dev = QiskitDevice2(wires=2, backend=aer_backend, session=session) + dev = QiskitDevice(wires=2, backend=aer_backend, session=session) assert dev.session == dev._session assert dev.session == session @@ -725,7 +725,7 @@ def test_num_wires_property(self): """Test the num_wires property""" wires = [1, 2, 3] - dev = QiskitDevice2(wires=wires, backend=aer_backend) + dev = QiskitDevice(wires=wires, backend=aer_backend) assert dev.num_wires == len(wires) @@ -742,7 +742,7 @@ def test_get_transpile_args(self): "circuits": [], } compile_backend = MockedBackend(name="compile_backend") - dev = QiskitDevice2( + dev = QiskitDevice( wires=5, backend=aer_backend, compile_backend=compile_backend, **transpile_args ) assert dev._transpile_args == { @@ -750,14 +750,14 @@ def test_get_transpile_args(self): "seed_transpiler": 42, } - @patch("pennylane_qiskit.qiskit_device2.transpile") + @patch("pennylane_qiskit.qiskit_device.transpile") @pytest.mark.parametrize("compile_backend", [None, MockedBackend(name="compile_backend")]) def test_compile_circuits(self, transpile_mock, compile_backend): """Tests compile_circuits with a mocked transpile function to avoid calling a remote backend. Confirm compile_backend and transpile_args are used.""" transpile_args = {"seed_transpiler": 42, "optimization_level": 2} - dev = QiskitDevice2( + dev = QiskitDevice( wires=5, backend=aer_backend, compile_backend=compile_backend, **transpile_args ) @@ -815,18 +815,18 @@ def get_counts(): assert len(np.argwhere([np.allclose(s, [0, 1]) for s in samples])) == results_dict["10"] assert len(np.argwhere([np.allclose(s, [1, 0]) for s in samples])) == results_dict["01"] - @patch("pennylane_qiskit.qiskit_device2.QiskitDevice2._execute_estimator") + @patch("pennylane_qiskit.qiskit_device.QiskitDevice._execute_estimator") def test_execute_pipeline_primitives_no_session(self, mocker): """Test that a Primitives-based device initialized with no Session creates one for the execution, and then returns the device session to None.""" - dev = QiskitDevice2(wires=5, backend=aer_backend, session=None) + dev = QiskitDevice(wires=5, backend=aer_backend, session=None) assert dev._session is None qs = QuantumScript([qml.PauliX(0), qml.PauliY(1)], measurements=[qml.expval(qml.PauliZ(0))]) - with patch("pennylane_qiskit.qiskit_device2.Session") as mock_session: + with patch("pennylane_qiskit.qiskit_device.Session") as mock_session: dev.execute(qs) mock_session.assert_called_once() # a session was created @@ -837,7 +837,7 @@ def test_execute_pipeline_with_all_execute_types_mocked(self, mocker, backend): """Test that a device executes measurements that require raw samples via the sampler, and the relevant primitive measurements via the estimator""" - dev = QiskitDevice2(wires=5, backend=backend, session=MockSession(backend)) + dev = QiskitDevice(wires=5, backend=backend, session=MockSession(backend)) qs = QuantumScript( [qml.PauliX(0), qml.PauliY(1)], @@ -865,8 +865,8 @@ def test_execute_pipeline_with_all_execute_types_mocked(self, mocker, backend): "sampler_execute_res", ] - @patch("pennylane_qiskit.qiskit_device2.Estimator") - @patch("pennylane_qiskit.qiskit_device2.QiskitDevice2._process_estimator_job") + @patch("pennylane_qiskit.qiskit_device.Estimator") + @patch("pennylane_qiskit.qiskit_device.QiskitDevice._process_estimator_job") @pytest.mark.parametrize("session", [None, MockSession(aer_backend)]) def test_execute_estimator_mocked(self, mocked_estimator, mocked_process_fn, session): """Test the _execute_estimator function using a mocked version of Estimator @@ -885,7 +885,7 @@ def test_execute_estimator_mocked(self, mocked_estimator, mocked_process_fn, ses def test_shot_vector_error_mocked(self): """Test that a device that executes a circuit with an array of shots raises the appropriate ValueError""" - dev = QiskitDevice2(wires=5, backend=aer_backend, session=MockSession(aer_backend)) + dev = QiskitDevice(wires=5, backend=aer_backend, session=MockSession(aer_backend)) qs = QuantumScript( measurements=[ qml.expval(qml.PauliX(0)), @@ -919,7 +919,7 @@ def test_estimator_with_different_pauli_obs(self, mocker, wire, angle, op, expec correspond correctly (wire ordering convention in Qiskit and PennyLane don't match.) """ - dev = QiskitDevice2(wires=5, backend=aer_backend) + dev = QiskitDevice(wires=5, backend=aer_backend) sampler_execute = mocker.spy(dev, "_execute_sampler") estimator_execute = mocker.spy(dev, "_execute_estimator") @@ -978,7 +978,7 @@ def test_estimator_with_various_multi_qubit_pauli_obs( """ pl_dev = qml.device("default.qubit", wires=[0, 1, 2, 3]) - dev = QiskitDevice2(wires=[0, 1, 2, 3], backend=aer_backend) + dev = QiskitDevice(wires=[0, 1, 2, 3], backend=aer_backend) sampler_execute = mocker.spy(dev, "_execute_sampler") estimator_execute = mocker.spy(dev, "_execute_estimator") @@ -1002,7 +1002,7 @@ def test_estimator_with_various_multi_qubit_pauli_obs( def test_tape_shots_used_for_estimator(self, mocker): """Tests that device uses tape shots rather than device shots for estimator""" - dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2) + dev = QiskitDevice(wires=5, backend=aer_backend, shots=2) estimator_execute = mocker.spy(dev, "_execute_estimator") @@ -1068,7 +1068,7 @@ def test_process_estimator_job(self, measurements, expectation): assert isinstance(result[0].metadata, dict) - processed_result = QiskitDevice2._process_estimator_job(qs.measurements, result) + processed_result = QiskitDevice._process_estimator_job(qs.measurements, result) assert isinstance(processed_result, tuple) assert np.allclose(processed_result, expectation, atol=0.1) @@ -1076,7 +1076,7 @@ def test_process_estimator_job(self, measurements, expectation): @pytest.mark.parametrize("num_shots", [50, 100]) def test_generate_samples(self, num_wires, num_shots): qs = QuantumScript([], measurements=[qml.expval(qml.PauliX(0))]) - dev = QiskitDevice2(wires=num_wires, backend=aer_backend, shots=num_shots) + dev = QiskitDevice(wires=num_wires, backend=aer_backend, shots=num_shots) dev._execute_sampler(circuit=qs, session=Session(backend=aer_backend)) samples = dev.generate_samples(0) @@ -1099,7 +1099,7 @@ def test_generate_samples(self, num_wires, num_shots): def test_tape_shots_used_for_sampler(self, mocker): """Tests that device uses tape shots rather than device shots for sampler""" - dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2) + dev = QiskitDevice(wires=5, backend=aer_backend, shots=2) sampler_execute = mocker.spy(dev, "_execute_sampler") @@ -1119,7 +1119,7 @@ def circuit(): def test_error_for_shot_vector(self): """Tests that a ValueError is raised if a shot vector is passed.""" - dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2) + dev = QiskitDevice(wires=5, backend=aer_backend, shots=2) @qml.qnode(dev) def circuit(): @@ -1147,7 +1147,7 @@ def test_no_pauli_observable_gives_accurate_answer(self, mocker, observable): provides an accurate answer for measurements with observables that don't have a pauli_rep. """ - dev = QiskitDevice2(wires=5, backend=aer_backend) + dev = QiskitDevice(wires=5, backend=aer_backend) pl_dev = qml.device("default.qubit", wires=5) @@ -1178,7 +1178,7 @@ def test_warning_for_split_execution_types_when_observable_no_pauli(self): """Test that a warning is raised when device is passed a measurement on an observable that does not have a pauli_rep.""" - dev = QiskitDevice2(wires=5, backend=aer_backend) + dev = QiskitDevice(wires=5, backend=aer_backend) @qml.qnode(dev) def circuit(): @@ -1198,7 +1198,7 @@ def test_qiskit_probability_output_format(self, backend): the same as pennylane's.""" dev = qml.device("default.qubit", wires=[0, 1, 2, 3, 4]) - qiskit_dev = QiskitDevice2(wires=[0, 1, 2, 3, 4], backend=backend) + qiskit_dev = QiskitDevice(wires=[0, 1, 2, 3, 4], backend=backend) @qml.qnode(dev) def circuit(): @@ -1220,7 +1220,7 @@ def test_sampler_output_shape(self, backend): """Test that the shape of the results produced from the sampler for the Qiskit device is consistent with Pennylane""" dev = qml.device("default.qubit", wires=5, shots=1024) - qiskit_dev = QiskitDevice2(wires=5, backend=backend) + qiskit_dev = QiskitDevice(wires=5, backend=backend) @qml.qnode(dev) def circuit(x): @@ -1244,7 +1244,7 @@ def test_sampler_output_shape_multi_measurements(self, backend): """Test that the shape of the results produced from the sampler for the Qiskit device is consistent with Pennylane for circuits with multiple measurements""" dev = qml.device("default.qubit", wires=5, shots=10) - qiskit_dev = QiskitDevice2(wires=5, backend=backend, shots=10) + qiskit_dev = QiskitDevice(wires=5, backend=backend, shots=10) @qml.qnode(dev) def circuit(x): @@ -1327,7 +1327,7 @@ def test_observables_that_need_split_non_commuting(self, observable): """Tests that observables that have non-commuting measurements are processed correctly when executed by the Estimator or, in the case of qml.Hadamard, executed by the Sampler via expval() or var""" - qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=30000) + qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=30000) @qml.qnode(qiskit_dev) def qiskit_circuit(): @@ -1362,7 +1362,7 @@ def circuit(): def test_observables_that_need_split_non_commuting_counts(self, observable): """Tests that observables that have non-commuting measurents are processed correctly when executed by the Sampler via counts()""" - qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=4000) + qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=4000) @qml.qnode(qiskit_dev) def qiskit_circuit(): @@ -1424,7 +1424,7 @@ def circuit(): def test_observables_that_need_split_non_commuting_samples(self, observable): """Tests that observables that have non-commuting measurents are processed correctly when executed by the Sampler via sample()""" - qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=20000) + qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=20000) @qml.qnode(qiskit_dev) def qiskit_circuit(): diff --git a/tests/test_integration.py b/tests/test_integration.py index 39d7f93f..1e1d3db5 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -23,42 +23,29 @@ import pennylane as qml from pennylane.numpy import tensor -from semantic_version import Version import pytest import qiskit import qiskit_aer from qiskit.providers import QiskitBackendNotFoundError -from pennylane_qiskit.qiskit_device import QiskitDevice +from qiskit.providers.basic_provider import BasicProvider +from pennylane_qiskit.qiskit_device_legacy import QiskitDeviceLegacy # pylint: disable=protected-access, unused-argument, ungrouped-imports, too-many-arguments, too-few-public-methods -if Version(qiskit.__version__) < Version("1.0.0"): - pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicaer", qiskit.BasicAer)] - def check_provider_backend_compatibility(pldevice, backend_name): - """check compatibility of provided backend""" - dev_name, _ = pldevice - if (dev_name == "qiskit.aer" and "aer" not in backend_name) or ( - dev_name == "qiskit.basicaer" and "aer" in backend_name - ): - return (False, "Only the AerSimulator is supported on AerDevice") - return True, None - -else: - from qiskit.providers.basic_provider import BasicProvider +pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicsim", BasicProvider())] - pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicsim", BasicProvider())] - def check_provider_backend_compatibility(pldevice, backend_name): - """check compatibility of provided backend""" - dev_name, _ = pldevice - if dev_name == "qiskit.aer" and backend_name == "basic_simulator": - return (False, "basic_simulator is not supported on the AerDevice") +def check_provider_backend_compatibility(pldevice, backend_name): + """Check the compatibility of provided backend""" + dev_name, _ = pldevice + if dev_name == "qiskit.aer" and backend_name == "basic_simulator": + return (False, "basic_simulator is not supported on the AerDevice") - if dev_name == "qiskit.basicsim" and backend_name != "basic_simulator": - return (False, "Only the basic_simulator backend works with the BasicSimulatorDevice") - return True, None + if dev_name == "qiskit.basicsim" and backend_name != "basic_simulator": + return (False, "Only the basic_simulator backend works with the BasicSimulatorDevice") + return True, None class TestDeviceIntegration: @@ -78,7 +65,6 @@ def test_load_device(self, d, backend): assert dev.shots == 1024 assert dev.short_name == d[0] assert dev.provider == d[1] - assert dev.capabilities()["returns_state"] == (backend in state_backends) @pytest.mark.parametrize("d", pldevices) def test_load_remote_device_with_backend_instance(self, d, backend): @@ -90,32 +76,18 @@ def test_load_remote_device_with_backend_instance(self, d, backend): except QiskitBackendNotFoundError: pytest.skip("Backend is not compatible with specified device") - dev = qml.device("qiskit.remote", wires=2, backend=backend_instance, shots=1024) - assert dev.num_wires == 2 - assert dev.shots == 1024 - assert dev.short_name == "qiskit.remote" - assert dev.provider is None - assert dev.capabilities()["returns_state"] == (backend in state_backends) - - @pytest.mark.parametrize("d", pldevices) - def test_load_remote_device_by_name(self, d, backend): - """Test that the qiskit.remote device loads correctly when passed a provider and a backend - name. This test is equivalent to `test_load_device` but on the qiskit.remote device instead - of specialized devices that expose more configuration options.""" - - # check compatibility between provider and backend, and skip if incompatible - is_compatible, failure_msg = check_provider_backend_compatibility(d, backend) - if not is_compatible: - pytest.skip(failure_msg) - - _, provider = d + if backend_instance.configuration().n_qubits is None: + pytest.skip("No qubits?") - dev = qml.device("qiskit.remote", wires=2, provider=provider, backend=backend, shots=1024) - assert dev.num_wires == 2 - assert dev.shots == 1024 + dev = qml.device( + "qiskit.remote", + wires=backend_instance.configuration().n_qubits, + backend=backend_instance, + shots=1024, + ) + assert dev.num_wires == backend_instance.configuration().n_qubits + assert dev.shots.total_shots == 1024 assert dev.short_name == "qiskit.remote" - assert dev.provider == provider - assert dev.capabilities()["returns_state"] == (backend in state_backends) def test_incorrect_backend(self): """Test that exception is raised if name is incorrect""" @@ -129,12 +101,6 @@ def test_incorrect_backend_wires(self): ): qml.device("qiskit.aer", wires=100, method="statevector") - def test_remote_device_no_provider(self): - """Test that the qiskit.remote device raises a ValueError if passed a backend - by name but no provider to look up the name on.""" - with pytest.raises(ValueError, match=r"Must pass a provider"): - qml.device("qiskit.remote", wires=2, backend="aer_simulator_statevector") - def test_args(self): """Test that the device requires correct arguments""" with pytest.raises(TypeError, match="missing 1 required positional argument"): @@ -591,7 +557,7 @@ def test_one_qubit_circuit_batch_params(self, shots, d, backend, tol, mocker): b = np.linspace(0, 0.123, batch_dim) c = np.linspace(0, 0.987, batch_dim) - spy1 = mocker.spy(QiskitDevice, "batch_execute") + spy1 = mocker.spy(QiskitDeviceLegacy, "batch_execute") spy2 = mocker.spy(dev.backend, "run") @partial(qml.batch_params, all_operations=True) @@ -605,7 +571,7 @@ def circuit(x, y, z): assert np.allclose(circuit(a, b, c), np.cos(a) * np.sin(b), **tol) - # Check that QiskitDevice.batch_execute was called + # Check that QiskitDeviceLegacy.batch_execute was called assert spy1.call_count == 1 assert spy2.call_count == 1 @@ -625,7 +591,7 @@ def test_batch_execute_parameter_shift(self, shots, d, backend, tol, mocker): dev = qml.device(d[0], wires=3, backend=backend, shots=shots) - spy1 = mocker.spy(QiskitDevice, "batch_execute") + spy1 = mocker.spy(QiskitDeviceLegacy, "batch_execute") spy2 = mocker.spy(dev.backend, "run") @qml.qnode(dev, diff_method="parameter-shift") @@ -642,7 +608,7 @@ def circuit(x, y): expected = np.array([[-np.sin(y) * np.sin(x), np.cos(y) * np.cos(x)]]) assert np.allclose(res, expected, **tol) - # Check that QiskitDevice.batch_execute was called twice + # Check that QiskitDeviceLegacy.batch_execute was called twice assert spy1.call_count == 2 # Check that run was called twice: for the partial derivatives and for diff --git a/tests/test_qiskit_device.py b/tests/test_qiskit_device.py index 7eabcf9c..8674301d 100644 --- a/tests/test_qiskit_device.py +++ b/tests/test_qiskit_device.py @@ -25,7 +25,7 @@ import pennylane as qml from pennylane_qiskit import AerDevice -from pennylane_qiskit.qiskit_device import QiskitDevice +from pennylane_qiskit.qiskit_device_legacy import QiskitDeviceLegacy # pylint: disable=protected-access, unused-argument, too-few-public-methods @@ -109,7 +109,7 @@ class TestSupportForV1andV2: ) def test_v1_and_v2_mocked(self, dev_backend): """Test that device initializes with no error mocked""" - dev = qml.device("qiskit.remote", wires=10, backend=dev_backend, use_primitives=True) + dev = qml.device("qiskit.aer", wires=10, backend=dev_backend) assert dev._backend == dev_backend @pytest.mark.parametrize( @@ -121,7 +121,7 @@ def test_v1_and_v2_mocked(self, dev_backend): ) def test_v1_and_v2_manila(self, dev_backend): """Test that device initializes with no error with V1 and V2 backends by Qiskit""" - dev = qml.device("qiskit.remote", wires=5, backend=dev_backend, use_primitives=True) + dev = qml.device("qiskit.aer", wires=5, backend=dev_backend) @qml.qnode(dev) def circuit(x): @@ -237,12 +237,12 @@ def test_calls_to_execute(self, device, n_tapes, mocker): called and not the general execute method.""" dev = device(2) - spy = mocker.spy(QiskitDevice, "execute") + spy = mocker.spy(QiskitDeviceLegacy, "execute") tapes = [self.tape1] * n_tapes dev.batch_execute(tapes) - # Check that QiskitDevice.execute was not called + # Check that QiskitDeviceLegacyLegacy.execute was not called assert spy.call_count == 0 @pytest.mark.parametrize("n_tapes", [1, 2, 3]) @@ -251,7 +251,7 @@ def test_calls_to_reset(self, n_tapes, mocker, device): times.""" dev = device(2) - spy = mocker.spy(QiskitDevice, "reset") + spy = mocker.spy(QiskitDeviceLegacy, "reset") tapes = [self.tape1] * n_tapes dev.batch_execute(tapes)