diff --git a/pennylane_qiskit/aer.py b/pennylane_qiskit/aer.py
index 24e1a353..34d6c3bc 100644
--- a/pennylane_qiskit/aer.py
+++ b/pennylane_qiskit/aer.py
@@ -18,10 +18,10 @@
import qiskit_aer
-from .qiskit_device import QiskitDevice
+from .qiskit_device_legacy import QiskitDeviceLegacy
-class AerDevice(QiskitDevice):
+class AerDevice(QiskitDeviceLegacy):
"""A PennyLane device for the C++ Qiskit Aer simulator.
Please refer to the `Qiskit documentation `_ for
diff --git a/pennylane_qiskit/basic_sim.py b/pennylane_qiskit/basic_sim.py
index 0b51be91..99f0a10e 100644
--- a/pennylane_qiskit/basic_sim.py
+++ b/pennylane_qiskit/basic_sim.py
@@ -17,10 +17,10 @@
using PennyLane.
from qiskit.providers.basic_provider import BasicProvider
-from .qiskit_device import QiskitDevice
+from .qiskit_device_legacy import QiskitDeviceLegacy
-class BasicSimulatorDevice(QiskitDevice):
+class BasicSimulatorDevice(QiskitDeviceLegacy):
"""A PennyLane device for the native Python Qiskit simulator.
For more information on the ``BasicSimulator`` backend options and transpile options, please visit the
diff --git a/pennylane_qiskit/converter.py b/pennylane_qiskit/converter.py
index 064ab4c8..0df4e492 100644
--- a/pennylane_qiskit/converter.py
+++ b/pennylane_qiskit/converter.py
@@ -25,6 +25,7 @@
import qiskit.qasm2
from qiskit.circuit import Parameter, ParameterExpression, ParameterVector
from qiskit.circuit import Measure, Barrier, ControlFlowOp, Clbit
+from qiskit.circuit import library as lib
from qiskit.circuit.classical import expr
from qiskit.circuit.controlflow.switch_case import _DefaultCaseType
from qiskit.circuit.library import GlobalPhaseGate
@@ -36,7 +37,50 @@
import pennylane as qml
import pennylane.ops as pennylane_ops
from pennylane.tape.tape import rotations_and_diagonal_measurements
-from pennylane_qiskit.qiskit_device import QISKIT_OPERATION_MAP
+ # native PennyLane operations also native to qiskit
+ "PauliX": lib.XGate,
+ "PauliY": lib.YGate,
+ "PauliZ": lib.ZGate,
+ "Hadamard": lib.HGate,
+ "CNOT": lib.CXGate,
+ "CZ": lib.CZGate,
+ "SWAP": lib.SwapGate,
+ "ISWAP": lib.iSwapGate,
+ "RX": lib.RXGate,
+ "RY": lib.RYGate,
+ "RZ": lib.RZGate,
+ "Identity": lib.IGate,
+ "CSWAP": lib.CSwapGate,
+ "CRX": lib.CRXGate,
+ "CRY": lib.CRYGate,
+ "CRZ": lib.CRZGate,
+ "PhaseShift": lib.PhaseGate,
+ "QubitStateVector": lib.Initialize,
+ "StatePrep": lib.Initialize,
+ "Toffoli": lib.CCXGate,
+ "QubitUnitary": lib.UnitaryGate,
+ "U1": lib.U1Gate,
+ "U2": lib.U2Gate,
+ "U3": lib.U3Gate,
+ "IsingZZ": lib.RZZGate,
+ "IsingYY": lib.RYYGate,
+ "IsingXX": lib.RXXGate,
+ "S": lib.SGate,
+ "T": lib.TGate,
+ "SX": lib.SXGate,
+ "Adjoint(S)": lib.SdgGate,
+ "Adjoint(T)": lib.TdgGate,
+ "Adjoint(SX)": lib.SXdgGate,
+ "CY": lib.CYGate,
+ "CH": lib.CHGate,
+ "CPhase": lib.CPhaseGate,
+ "CCZ": lib.CCZGate,
+ "ECR": lib.ECRGate,
+ "Barrier": lib.Barrier,
+ "Adjoint(GlobalPhase)": lib.GlobalPhaseGate,
inv_map = {v.__name__: k for k, v in QISKIT_OPERATION_MAP.items()}
diff --git a/pennylane_qiskit/qiskit_device.py b/pennylane_qiskit/qiskit_device.py
index b03a9692..8df10cad 100644
--- a/pennylane_qiskit/qiskit_device.py
+++ b/pennylane_qiskit/qiskit_device.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 Xanadu Quantum Technologies Inc.
+# Copyright 2019-2024 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,124 +12,258 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-This module contains a base class for constructing Qiskit devices for PennyLane.
+This module contains a prototype base class for constructing Qiskit devices
+for PennyLane with the new device API.
# pylint: disable=too-many-instance-attributes,attribute-defined-outside-init
-import abc
-import inspect
import warnings
+import inspect
+from typing import Union, Callable, Tuple, Sequence
+from contextlib import contextmanager
import numpy as np
-from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
-from qiskit.circuit import library as lib
+import pennylane as qml
from qiskit.compiler import transpile
-from qiskit.converters import circuit_to_dag, dag_to_circuit
-from qiskit.providers import Backend, BackendV2, QiskitBackendNotFoundError
-from pennylane import QubitDevice, DeviceError
-from pennylane.measurements import SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP
+from qiskit.providers import BackendV2
+from qiskit_ibm_runtime import Session, SamplerV2 as Sampler, EstimatorV2 as Estimator
+from pennylane import transform
+from pennylane.transforms.core import TransformProgram
+from pennylane.transforms import broadcast_expand, split_non_commuting
+from pennylane.tape import QuantumTape, QuantumScript
+from pennylane.typing import Result, ResultBatch
+from pennylane.devices import Device
+from pennylane.devices.execution_config import ExecutionConfig, DefaultExecutionConfig
+from pennylane.devices.preprocess import (
+ decompose,
+ validate_observables,
+ validate_measurements,
+ validate_device_wires,
+from pennylane.measurements import ExpectationMP, VarianceMP
from ._version import __version__
+from .converter import QISKIT_OPERATION_MAP, circuit_to_qiskit, mp_to_pauli
+QuantumTapeBatch = Sequence[QuantumTape]
+QuantumTape_or_Batch = Union[QuantumTape, QuantumTapeBatch]
+Result_or_ResultBatch = Union[Result, ResultBatch]
+# pylint: disable=protected-access
+def qiskit_session(device, **kwargs):
+ """A context manager that creates a Qiskit Session and sets it as a session
+ on the device while the context manager is active. Using the context manager
+ will ensure the Session closes properly and is removed from the device after
+ completing the tasks. Any Session that was initialized and passed into the
+ device will be overwritten by the Qiskit Session created by this context
+ manager.
+ Args:
+ device (QiskitDevice2): the device that will create remote tasks using the session
+ **kwargs: session keyword arguments to be used for settings for the Session. At the
+ time of writing, the only relevant keyword argument is "max_time", which lets you
+ set the maximum amount of time the sessin is open. For the most up to date information,
+ please refer to the Qiskit Session
+ `documentation `_.
+ **Example:**
+ .. code-block:: python
+ import pennylane as qml
+ from pennylane_qiskit import qiskit_session
+ from qiskit_ibm_runtime import QiskitRuntimeService
+ # get backend
+ service = QiskitRuntimeService(channel="ibm_quantum")
+ backend = service.least_busy(simulator=False, operational=True)
+ # initialize device
+ dev = qml.device('qiskit.remote', wires=2, backend=backend)
+ @qml.qnode(dev)
+ def circuit(x):
+ qml.RX(x, 0)
+ qml.CNOT([0, 1])
+ return qml.expval(qml.PauliZ(1))
+ angle = 0.1
+ with qiskit_session(dev, max_time=60) as session:
+ # queue for the first execution
+ res = circuit(angle)[0]
+ # then this loop executes immediately after without queueing again
+ while res > 0:
+ angle += 0.3
+ res = circuit(angle)[0]
+ Note that if you passed in a session to your device, that session will be overwritten
+ by `qiskit_session`.
+ .. code-block:: python
+ import pennylane as qml
+ from pennylane_qiskit import qiskit_session
+ from qiskit_ibm_runtime import QiskitRuntimeService, Session
-SAMPLE_TYPES = (SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP)
- # native PennyLane operations also native to qiskit
- "PauliX": lib.XGate,
- "PauliY": lib.YGate,
- "PauliZ": lib.ZGate,
- "Hadamard": lib.HGate,
- "CNOT": lib.CXGate,
- "CZ": lib.CZGate,
- "SWAP": lib.SwapGate,
- "ISWAP": lib.iSwapGate,
- "RX": lib.RXGate,
- "RY": lib.RYGate,
- "RZ": lib.RZGate,
- "Identity": lib.IGate,
- "CSWAP": lib.CSwapGate,
- "CRX": lib.CRXGate,
- "CRY": lib.CRYGate,
- "CRZ": lib.CRZGate,
- "PhaseShift": lib.PhaseGate,
- "QubitStateVector": lib.Initialize,
- "StatePrep": lib.Initialize,
- "Toffoli": lib.CCXGate,
- "QubitUnitary": lib.UnitaryGate,
- "U1": lib.U1Gate,
- "U2": lib.U2Gate,
- "U3": lib.U3Gate,
- "IsingZZ": lib.RZZGate,
- "IsingYY": lib.RYYGate,
- "IsingXX": lib.RXXGate,
- "S": lib.SGate,
- "T": lib.TGate,
- "SX": lib.SXGate,
- "Adjoint(S)": lib.SdgGate,
- "Adjoint(T)": lib.TdgGate,
- "Adjoint(SX)": lib.SXdgGate,
- "CY": lib.CYGate,
- "CH": lib.CHGate,
- "CPhase": lib.CPhaseGate,
- "CCZ": lib.CCZGate,
- "ECR": lib.ECRGate,
- "Barrier": lib.Barrier,
- "Adjoint(GlobalPhase)": lib.GlobalPhaseGate,
-def _get_backend_name(backend):
+ # get backend
+ service = QiskitRuntimeService(channel="ibm_quantum")
+ backend = service.least_busy(simulator=False, operational=True)
+ # initialize device
+ dev = qml.device('qiskit.remote', wires=2, backend=backend, session=Session(backend=backend, max_time=30))
+ @qml.qnode(dev)
+ def circuit(x):
+ qml.RX(x, 0)
+ qml.CNOT([0, 1])
+ return qml.expval(qml.PauliZ(1))
+ angle = 0.1
+ # This session will have the Qiskit default settings max_time=900
+ with qiskit_session(dev) as session:
+ res = circuit(angle)[0]
+ while res > 0:
+ angle += 0.3
+ res = circuit(angle)[0]
+ """
+ # Code to acquire session:
+ existing_session = device._session
+ session_options = {"backend": device.backend, "service": device.service}
+ for k, v in kwargs.items():
+ # Options like service and backend should be tied to the settings set on device
+ if k in session_options:
+ warnings.warn(f"Using '{k}' set in device, {getattr(device, k)}", UserWarning)
+ else:
+ session_options[k] = v
+ session = Session(**session_options)
+ device._session = session
- return backend.name() # BackendV1
- except TypeError: # pragma: no cover
- return backend.name # BackendV2
+ yield session
+ finally:
+ # Code to release session:
+ session.close()
+ device._session = existing_session
+def accepted_sample_measurement(m: qml.measurements.MeasurementProcess) -> bool:
+ """Specifies whether or not a measurement is accepted when sampling."""
+ return isinstance(
+ m,
+ (
+ qml.measurements.SampleMeasurement,
+ qml.measurements.ClassicalShadowMP,
+ qml.measurements.ShadowExpvalMP,
+ ),
+ )
+def split_execution_types(
+ tape: qml.tape.QuantumTape,
+) -> (Sequence[qml.tape.QuantumTape], Callable):
+ """Split into separate tapes based on measurement type. Counts and sample-based measurements
+ will use the Qiskit Sampler. ExpectationValue and Variance will use the Estimator, except
+ when the measured observable does not have a `pauli_rep`. In that case, the Sampler will be
+ used, and the raw samples will be processed to give an expectation value."""
+ estimator = []
+ sampler = []
+ for i, mp in enumerate(tape.measurements):
+ if isinstance(mp, (ExpectationMP, VarianceMP)):
+ if mp.obs.pauli_rep:
+ estimator.append((mp, i))
+ else:
+ warnings.warn(
+ f"The observable measured {mp.obs} does not have a `pauli_rep` "
+ "and will be run without using the Estimator primitive. Instead, "
+ "raw samples from the Sampler will be used."
+ )
+ sampler.append((mp, i))
+ else:
+ sampler.append((mp, i))
+ order_indices = [[i for mp, i in group] for group in [estimator, sampler]]
+ tapes = []
+ if estimator:
+ tapes.extend(
+ [
+ qml.tape.QuantumScript(
+ tape.operations,
+ measurements=[mp for mp, i in estimator],
+ shots=tape.shots,
+ )
+ ]
+ )
+ if sampler:
+ tapes.extend(
+ [
+ qml.tape.QuantumScript(
+ tape.operations,
+ measurements=[mp for mp, i in sampler],
+ shots=tape.shots,
+ )
+ ]
+ )
+ def reorder_fn(res):
+ """re-order the output to the original shape and order"""
+ flattened_indices = [i for group in order_indices for i in group]
+ flattened_results = [r for group in res for r in group]
-class QiskitDevice(QubitDevice, abc.ABC):
- r"""Abstract Qiskit device for PennyLane.
+ if len(flattened_indices) != len(flattened_results):
+ raise ValueError(
+ "The lengths of flattened_indices and flattened_results do not match."
+ ) # pragma: no cover
+ result = dict(zip(flattened_indices, flattened_results))
+ result = tuple(result[i] for i in sorted(result.keys()))
+ return result[0] if len(result) == 1 else result
+ return tapes, reorder_fn
+class QiskitDevice(Device):
+ r"""Hardware/simulator Qiskit device for PennyLane.
wires (int or Iterable[Number, str]]): Number of subsystems represented by the device,
or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``)
- or strings (``['ancilla', 'q1', 'q2']``).
- provider (Provider | None): The Qiskit backend provider.
- backend (str | Backend): the desired backend. If a string, a provider must be given.
- shots (int or None): number of circuit evaluations/random samples used
- to estimate expectation values and variances of observables. For state vector backends,
- setting to ``None`` results in computing statistics like expectation values and variances analytically.
+ or strings (``['aux_wire', 'q1', 'q2']``).
+ backend (Backend): the initialized Qiskit backend
Keyword Args:
- name (str): The name of the circuit. Default ``'circuit'``.
- compile_backend (BaseBackend): The backend used for compilation. If you wish
- to simulate a device compliant circuit, you can specify a backend here.
+ shots (int or None): number of circuit evaluations/random samples used
+ to estimate expectation values and variances of observables.
+ session (Session): a Qiskit Session to use for device execution. If none is provided, a session will
+ be created at each device execution.
+ compile_backend (Union[Backend, None]): the backend to be used for compiling the circuit that will be
+ sent to the backend device, to be set if the backend desired for compliation differs from the
+ backend used for execution. Defaults to ``None``, which means the primary backend will be used.
+ **kwargs: transpilation and runtime keyword arguments to be used for measurements with Primitives.
+ If an `options` dictionary is defined amongst the kwargs, and there are settings that overlap
+ with those in kwargs, the settings in `options` will take precedence over kwargs. Keyword
+ arguments accepted by both the transpiler and at runtime (e.g. ``optimization_level``)
+ will be passed to the transpiler rather than to the Primitive.
- name = "Qiskit PennyLane plugin"
- pennylane_requires = ">=0.37.0"
- version = __version__
- plugin_version = __version__
- author = "Xanadu"
- _capabilities = {
- "model": "qubit",
- "tensor_observables": True,
- "inverse_operations": True,
- }
- _operation_map = QISKIT_OPERATION_MAP
- _state_backends = {
- "statevector_simulator",
- "simulator_statevector",
- "unitary_simulator",
- "aer_simulator_statevector",
- "aer_simulator_unitary",
- }
- """set[str]: Set of backend names that define the backends
- that support returning the underlying quantum statevector"""
- operations = set(_operation_map.keys())
+ operations = set(QISKIT_OPERATION_MAP.keys())
observables = {
@@ -138,399 +272,415 @@ class QiskitDevice(QubitDevice, abc.ABC):
+ "Prod",
+ "Sum",
+ "LinearCombination",
+ "SProd",
+ # TODO Could support SparseHamiltonian
- analytic_warning_message = (
- "The analytic calculation of expectations, variances and "
- "probabilities is only supported on statevector backends, not on the {}. "
- "Such statistics obtained from this device are estimates based on samples."
- )
- _eigs = {}
- def __init__(self, wires, provider, backend, shots=1024, **kwargs):
+ # pylint:disable = too-many-arguments
+ def __init__(
+ self,
+ wires,
+ backend,
+ shots=1024,
+ session=None,
+ compile_backend=None,
+ **kwargs,
+ ):
+ if shots is None:
+ warnings.warn(
+ "Expected an integer number of shots, but received shots=None. Defaulting "
+ "to 1024 shots. The analytic calculation of results is not supported on "
+ "this device. All statistics obtained from this device are estimates based "
+ "on samples.",
+ UserWarning,
+ )
+ shots = 1024
super().__init__(wires=wires, shots=shots)
- self.provider = provider
- if isinstance(backend, Backend):
- self._backend = backend
- self.backend_name = _get_backend_name(backend)
- elif provider is None:
- raise ValueError("Must pass a provider if the backend is not a Backend instance.")
- else:
- try:
- self._backend = provider.get_backend(backend)
- except QiskitBackendNotFoundError as e:
- available_backends = list(map(_get_backend_name, provider.backends()))
- raise ValueError(
- f"Backend '{backend}' does not exist. Available backends "
- f"are:\n {available_backends}"
- ) from e
- self.backend_name = _get_backend_name(self._backend)
- # Keep track if the user specified analytic to be True
- if shots is None and not self._is_state_backend:
- # Raise a warning if no shots were specified for a hardware device
- warnings.warn(self.analytic_warning_message.format(backend), UserWarning)
+ self._backend = backend
+ self._compile_backend = compile_backend if compile_backend else backend
- self.shots = 1024
+ self._service = getattr(backend, "_service", None)
+ self._session = session
- self._capabilities["returns_state"] = self._is_state_backend
+ kwargs["shots"] = shots
# Perform validation against backend
- backend_qubits = (
+ available_qubits = (
if isinstance(backend, BackendV2)
- else self.backend.configuration().n_qubits
+ else backend.configuration().n_qubits
- if backend_qubits and len(self.wires) > int(backend_qubits):
- raise ValueError(f"Backend '{backend}' supports maximum {backend_qubits} wires")
+ if len(self.wires) > int(available_qubits):
+ raise ValueError(f"Backend '{backend}' supports maximum {available_qubits} wires")
- # Initialize inner state
+ self._kwargs, self._transpile_args = self._process_kwargs(
+ kwargs
+ ) # processes kwargs and separates transpilation arguments to dev._transpile_args
- self.process_kwargs(kwargs)
- def process_kwargs(self, kwargs):
- """Processing the keyword arguments that were provided upon device initialization.
+ @property
+ def backend(self):
+ """The Qiskit backend object.
- Args:
- kwargs (dict): keyword arguments to be set for the device
+ Returns:
+ qiskit.providers.Backend: Qiskit backend object.
- self.compile_backend = None
- if "compile_backend" in kwargs:
- self.compile_backend = kwargs.pop("compile_backend")
- if "noise_model" in kwargs:
- noise_model = kwargs.pop("noise_model")
- self.backend.set_options(noise_model=noise_model)
- # set transpile_args
- self.set_transpile_args(**kwargs)
- # Get further arguments for run
- self.run_args = {}
- # Specify to have a memory for hw/hw simulators
- compile_backend = self.compile_backend or self.backend
- memory = str(compile_backend) not in self._state_backends
- if memory:
- kwargs["memory"] = True
- # Consider the remaining kwargs as keyword arguments to run
- self.run_args.update(kwargs)
- @property
- def _is_state_backend(self):
- """Returns whether this device has a state backend."""
- return self.backend_name in self._state_backends or self.backend.options.get("method") in {
- "unitary",
- "statevector",
- }
+ return self._backend
- def _is_statevector_backend(self):
- """Returns whether this device has a statevector backend."""
- method = "statevector"
- return method in self.backend_name or self.backend.options.get("method") == method
+ def compile_backend(self):
+ """The ``compile_backend`` is a Qiskit backend object to be used for transpilation.
+ Returns:
+ qiskit.providers.backend: Qiskit backend object.
+ """
+ return self._compile_backend
- def _is_unitary_backend(self):
- """Returns whether this device has a unitary backend."""
- method = "unitary"
- return method in self.backend_name or self.backend.options.get("method") == method
- def set_transpile_args(self, **kwargs):
- """The transpile argument setter.
+ def service(self):
+ """The QiskitRuntimeService service.
- Keyword Args:
- kwargs (dict): keyword arguments to be set for the Qiskit transpiler. For more details, see the
- `Qiskit transpiler documentation `_
+ Returns:
+ qiskit.qiskit_ibm_runtime.QiskitRuntimeService
- transpile_sig = inspect.signature(transpile).parameters
- self.transpile_args = {arg: kwargs[arg] for arg in transpile_sig if arg in kwargs}
- self.transpile_args.pop("circuits", None)
- self.transpile_args.pop("backend", None)
+ return self._service
- def backend(self):
- """The Qiskit backend object.
+ def session(self):
+ """The QiskitRuntimeService session.
- qiskit.providers.backend: Qiskit backend object.
+ qiskit.qiskit_ibm_runtime.Session
- return self._backend
+ return self._session
- def reset(self):
- """Reset the Qiskit backend device"""
- # Reset only internal data, not the options that are determined on
- # device creation
- self._reg = QuantumRegister(self.num_wires, "q")
- self._creg = ClassicalRegister(self.num_wires, "c")
- self._circuit = QuantumCircuit(self._reg, self._creg, name="temp")
+ @property
+ def num_wires(self):
+ """Get the number of wires.
- self._current_job = None
- self._state = None # statevector of a simulator backend
+ Returns:
+ int: The number of wires.
+ """
+ return len(self.wires)
- def create_circuit_object(self, operations, **kwargs):
- """Builds the circuit objects based on the operations and measurements
- specified to apply.
+ def update_session(self, session):
+ """Update the session attribute.
- operations (list[~.Operation]): operations to apply to the device
- Keyword args:
- rotations (list[~.Operation]): Operations that rotate the circuit
- pre-measurement into the eigenbasis of the observables.
+ session: The new session to be set.
- rotations = kwargs.get("rotations", [])
- applied_operations = self.apply_operations(operations)
+ self._session = session
- # Rotating the state for measurement in the computational basis
- rotation_circuits = self.apply_operations(rotations)
- applied_operations.extend(rotation_circuits)
+ def reset(self):
+ """Reset the current job to None."""
+ self._current_job = None
- for circuit in applied_operations:
- self._circuit &= circuit
+ def stopping_condition(self, op: qml.operation.Operator) -> bool:
+ """Specifies whether or not an Operator is accepted by QiskitDevice2."""
+ return op.name in self.operations
- if not self._is_state_backend:
- # Add measurements if they are needed
- for qr, cr in zip(self._reg, self._creg):
- self._circuit.measure(qr, cr)
- elif "aer" in self.backend_name:
- self._circuit.save_state()
+ def observable_stopping_condition(self, obs: qml.operation.Operator) -> bool:
+ """Specifies whether or not an observable is accepted by QiskitDevice2."""
+ return obs.name in self.observables
- def apply(self, operations, **kwargs):
- """Build the circuit object and apply the operations"""
- self.create_circuit_object(operations, **kwargs)
+ def preprocess(
+ self,
+ execution_config: ExecutionConfig = DefaultExecutionConfig,
+ ) -> Tuple[TransformProgram, ExecutionConfig]:
+ """This function defines the device transform program to be applied and an updated device configuration.
- # These operations need to run for all devices
- compiled_circuit = self.compile()
- self.run(compiled_circuit)
+ Args:
+ execution_config (Union[ExecutionConfig, Sequence[ExecutionConfig]]): A data structure describing the
+ parameters needed to fully describe the execution.
- def apply_operations(self, operations):
- """Apply the circuit operations.
+ Returns:
+ TransformProgram, ExecutionConfig: A transform program that when called returns QuantumTapes that the device
+ can natively execute as well as a postprocessing function to be called after execution, and a configuration with
+ unset specifications filled in.
- This method serves as an auxiliary method to :meth:`~.QiskitDevice.apply`.
+ This device:
- Args:
- operations (List[pennylane.Operation]): operations to be applied
+ * Supports any operations with explicit PennyLane to Qiskit gate conversions defined in the plugin
+ * Does not intrinsically support parameter broadcasting
- Returns:
- list[QuantumCircuit]: a list of quantum circuit objects that
- specify the corresponding operations
- circuits = []
+ config = execution_config
+ config.use_device_gradient = False
- for operation in operations:
- # Apply the circuit operations
- device_wires = self.map_wires(operation.wires)
- par = operation.parameters
+ transform_program = TransformProgram()
- for idx, p in enumerate(par):
- if isinstance(p, np.ndarray):
- # Convert arrays so that Qiskit accepts the parameter
- par[idx] = p.tolist()
+ transform_program.add_transform(validate_device_wires, self.wires, name=self.name)
+ transform_program.add_transform(
+ decompose,
+ stopping_condition=self.stopping_condition,
+ name=self.name,
+ skip_initial_state_prep=False,
+ )
+ transform_program.add_transform(
+ validate_measurements,
+ sample_measurements=accepted_sample_measurement,
+ name=self.name,
+ )
+ transform_program.add_transform(
+ validate_observables,
+ stopping_condition=self.observable_stopping_condition,
+ name=self.name,
+ )
- operation = operation.name
+ transform_program.add_transform(broadcast_expand)
+ transform_program.add_transform(split_non_commuting)
- mapped_operation = self._operation_map[operation]
+ transform_program.add_transform(split_execution_types)
- self.qubit_state_vector_check(operation)
+ return transform_program, config
- qregs = [self._reg[i] for i in device_wires.labels]
+ def _process_kwargs(self, kwargs):
+ """Processes kwargs given and separates them into kwargs and transpile_args. If given
+ a keyword argument 'options' that is a dictionary, a common practice in
+ Qiskit, the options in said dictionary take precedence over any overlapping keyword
+ arguments defined in the kwargs.
- if operation in ("QubitUnitary", "QubitStateVector", "StatePrep"):
- # Need to revert the order of the quantum registers used in
- # Qiskit such that it matches the PennyLane ordering
- qregs = list(reversed(qregs))
+ Keyword Args:
+ kwargs (dict): keyword arguments that set either runtime options or transpilation
+ options.
- if operation in ("Barrier",):
- # Need to add the num_qubits for instantiating Barrier in Qiskit
- par = [len(self._reg)]
+ Returns:
+ kwargs, transpile_args: keyword arguments for the runtime options and keyword
+ arguments for the transpiler
+ """
- dag = circuit_to_dag(QuantumCircuit(self._reg, self._creg, name=""))
+ if "noise_model" in kwargs:
+ noise_model = kwargs.pop("noise_model")
+ self.backend.set_options(noise_model=noise_model)
- gate = mapped_operation(*par)
+ if "options" in kwargs:
+ for key, val in kwargs.pop("options").items():
+ if key in kwargs:
+ warnings.warn(
+ "An overlap between what was passed in via options and what was passed in via kwargs was found."
+ f"The value set in options {key}={val} will be used."
+ )
+ kwargs[key] = val
- dag.apply_operation_back(gate, qargs=qregs)
- circuit = dag_to_circuit(dag)
- circuits.append(circuit)
+ shots = kwargs.pop("shots")
- return circuits
+ if "default_shots" in kwargs:
+ warnings.warn(
+ f"default_shots was found in the keyword arguments, but it is not supported by {self.name}"
+ "Please use the `shots` keyword argument instead. The number of shots "
+ f"{shots} will be used instead."
+ )
+ kwargs["default_shots"] = shots
- def qubit_state_vector_check(self, operation):
- """Input check for the StatePrepBase operations.
+ kwargs, transpile_args = self.get_transpile_args(kwargs)
- Args:
- operation (pennylane.Operation): operation to be checked
+ return kwargs, transpile_args
- Raises:
- DeviceError: If the operation is QubitStateVector or StatePrep
- """
- if operation in ("QubitStateVector", "StatePrep"):
- if self._is_unitary_backend:
- raise DeviceError(
- f"The {operation} operation "
- "is not supported on the unitary simulator backend."
- )
+ @staticmethod
+ def get_transpile_args(kwargs):
+ """The transpile argument setter. This separates keyword arguments related to transpilation
+ from the rest of the keyword arguments and removes those keyword arguments from kwargs.
- def compile(self):
- """Compile the quantum circuit to target the provided compile_backend.
+ Keyword Args:
+ kwargs (dict): combined keyword arguments to be parsed for the Qiskit transpiler. For more details, see the
+ `Qiskit transpiler documentation `_
- If compile_backend is None, then the target is simply the
- backend.
+ Returns:
+ kwargs (dict), transpile_args (dict): keyword arguments for the runtime options and keyword
+ arguments for the transpiler
- compile_backend = self.compile_backend or self.backend
- compiled_circuits = transpile(self._circuit, backend=compile_backend, **self.transpile_args)
- return compiled_circuits
- def run(self, qcirc):
- """Run the compiled circuit and query the result.
+ transpile_sig = inspect.signature(transpile).parameters
- Args:
- qcirc (qiskit.QuantumCircuit): the quantum circuit to be run on the backend
- """
- self._current_job = self.backend.run(qcirc, shots=self.shots, **self.run_args)
- result = self._current_job.result()
+ transpile_args = {arg: kwargs.pop(arg) for arg in transpile_sig if arg in kwargs}
+ transpile_args.pop("circuits", None)
+ transpile_args.pop("backend", None)
- if self._is_state_backend:
- self._state = self._get_state(result)
+ return kwargs, transpile_args
- def _get_state(self, result, experiment=None):
- """Returns the statevector for state simulator backends.
+ def compile_circuits(self, circuits):
+ """Compiles multiple circuits one after the other.
- result (qiskit.Result): result object
- experiment (str or None): the name of the experiment to get the state for.
+ circuits (list[QuantumCircuit]): the circuits to be compiled
- array[float]: size ``(2**num_wires,)`` statevector
+ list[QuantumCircuit]: the list of compiled circuits
- if self._is_statevector_backend:
- state = np.asarray(result.get_statevector(experiment))
+ # Compile each circuit object
+ compiled_circuits = []
+ transpile_args = self._transpile_args
- elif self._is_unitary_backend:
- unitary = np.asarray(result.get_unitary(experiment))
- initial_state = np.zeros([2**self.num_wires])
- initial_state[0] = 1
+ for i, circuit in enumerate(circuits):
+ compiled_circ = transpile(circuit, backend=self.compile_backend, **transpile_args)
+ compiled_circ.name = f"circ{i}"
+ compiled_circuits.append(compiled_circ)
- state = unitary @ initial_state
+ return compiled_circuits
- # reverse qubit order to match PennyLane convention
- return state.reshape([2] * self.num_wires).T.flatten()
+ # pylint: disable=unused-argument, no-member
+ def execute(
+ self,
+ circuits: QuantumTape_or_Batch,
+ execution_config: ExecutionConfig = DefaultExecutionConfig,
+ ) -> Result_or_ResultBatch:
+ """Execute a circuit or a batch of circuits and turn it into results."""
+ session = self._session or Session(backend=self.backend)
- def generate_samples(self, circuit=None):
- r"""Returns the computational basis samples generated for all wires.
+ results = []
- Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where
- :math:`q_0` is the most significant bit.
+ if isinstance(circuits, QuantumScript):
+ circuits = [circuits]
+ @contextmanager
+ def execute_circuits(session):
+ try:
+ for circ in circuits:
+ if circ.shots and len(circ.shots.shot_vector) > 1:
+ raise ValueError(
+ f"Setting shot vector {circ.shots.shot_vector} is not supported for {self.name}."
+ "Please use a single integer instead when specifying the number of shots."
+ )
+ if isinstance(circ.measurements[0], (ExpectationMP, VarianceMP)) and getattr(
+ circ.measurements[0].obs, "pauli_rep", None
+ ):
+ execute_fn = self._execute_estimator
+ else:
+ execute_fn = self._execute_sampler
+ results.append(execute_fn(circ, session))
+ yield results
+ finally:
+ session.close()
+ with execute_circuits(session) as results:
+ return results
+ def _execute_sampler(self, circuit, session):
+ """Returns the result of the execution of the circuit using the SamplerV2 Primitive.
+ Note that this result has been processed respective to the MeasurementProcess given.
+ E.g. `qml.expval` returns an expectation value whereas `qml.sample()` will return the raw samples.
- circuit (str or None): the name of the circuit to get the state for
+ circuits (list[QuantumCircuit]): the circuits to be executed via SamplerV2
+ session (Session): the session that the execution will be performed with
- array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)``
+ result (tuple): the processed result from SamplerV2
+ qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=True, measure=True)]
+ sampler = Sampler(session=session)
+ compiled_circuits = self.compile_circuits(qcirc)
+ sampler.options.update(**self._kwargs)
- # branch out depending on the type of backend
- if self._is_state_backend:
- # software simulator: need to sample from probabilities
- return super().generate_samples()
+ # len(compiled_circuits) is always 1 so the indexing does not matter.
+ result = sampler.run(
+ compiled_circuits,
+ shots=circuit.shots.total_shots if circuit.shots.total_shots else None,
+ ).result()[0]
+ classical_register_name = compiled_circuits[0].cregs[0].name
+ self._current_job = getattr(result.data, classical_register_name)
- # hardware or hardware simulator
- samples = self._current_job.result().get_memory(circuit)
- # reverse qubit order to match PennyLane convention
- return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples])
+ # needs processing function to convert to the correct format for states, and
+ # also handle instances where wires were specified in probs, and for multiple probs measurements
- @property
- def state(self):
- """Get state of the device"""
- return self._state
+ self._samples = self.generate_samples(0)
+ res = [
+ mp.process_samples(self._samples, wire_order=self.wires) for mp in circuit.measurements
+ ]
- def analytic_probability(self, wires=None):
- """Get the analytic probability of the device"""
- if self._state is None:
- return None
+ single_measurement = len(circuit.measurements) == 1
+ res = (res[0],) if single_measurement else tuple(res)
- prob = self.marginal_prob(np.abs(self._state) ** 2, wires)
- return prob
+ return res
- def compile_circuits(self, circuits):
- r"""Compiles multiple circuits one after the other.
+ def _execute_estimator(self, circuit, session):
+ """Returns the result of the execution of the circuit using the EstimatorV2 Primitive.
+ Note that this result has been processed respective to the MeasurementProcess given.
+ E.g. `qml.expval` returns an expectation value whereas `qml.var` will return the variance.
- circuits (list[.tapes.QuantumTape]): the circuits to be compiled
+ circuits (list[QuantumCircuit]): the circuits to be executed via EstimatorV2
+ session (Session): the session that the execution will be performed with
- list[QuantumCircuit]: the list of compiled circuits
+ result (tuple): the processed result from EstimatorV2
- # Compile each circuit object
- compiled_circuits = []
- for circuit in circuits:
- # We need to reset the device here, else it will
- # not start the next computation in the zero state
- self.reset()
- self.create_circuit_object(circuit.operations, rotations=circuit.diagonalizing_gates)
- compiled_circ = self.compile()
- compiled_circ.name = f"circ{len(compiled_circuits)}"
- compiled_circuits.append(compiled_circ)
+ # the Estimator primitive takes care of diagonalization and measurements itself,
+ # so diagonalizing gates and measurements are not included in the circuit
+ qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=False, measure=False)]
+ estimator = Estimator(session=session)
+ pauli_observables = [mp_to_pauli(mp, self.num_wires) for mp in circuit.measurements]
+ compiled_circuits = self.compile_circuits(qcirc)
+ estimator.options.update(**self._kwargs)
+ # split into one call per measurement
+ # could technically be more efficient if there are some observables where we ask
+ # for expectation value and variance on the same observable, but spending time on
+ # that right now feels excessive
+ circ_and_obs = [(compiled_circuits[0], pauli_observables)]
+ result = estimator.run(
+ circ_and_obs,
+ precision=np.sqrt(1 / circuit.shots.total_shots) if circuit.shots else None,
+ ).result()
+ self._current_job = result
+ result = self._process_estimator_job(circuit.measurements, result)
+ return result
+ @staticmethod
+ def _process_estimator_job(measurements, job_result):
+ """Estimator returns the expectation value and standard error for each observable measured,
+ along with some metadata that contains the precision. Extracts the relevant number for each
+ measurement process and return the requested results from the Estimator executions.
+ Note that for variance, we calculate the variance by using the standard error and the
+ precision value.
- return compiled_circuits
+ Args:
+ measurements (list[MeasurementProcess]): the measurements in the circuit
+ job_result (Any): the result from EstimatorV2
- def batch_execute(self, circuits, timeout: int = None):
- """Batch execute the circuits on the device"""
+ Returns:
+ result (tuple): the processed result from EstimatorV2
+ """
+ expvals = job_result[0].data.evs
+ variances = (job_result[0].data.stds / job_result[0].metadata["target_precision"]) ** 2
+ result = []
+ for i, mp in enumerate(measurements):
+ if isinstance(mp, ExpectationMP):
+ result.append(expvals[i])
+ elif isinstance(mp, VarianceMP):
+ result.append(variances[i])
- compiled_circuits = self.compile_circuits(circuits)
+ single_measurement = len(measurements) == 1
+ result = (result[0],) if single_measurement else tuple(result)
- if not compiled_circuits:
- # At least one circuit must always be provided to the backend.
- return []
+ return result
- # Send the batch of circuit objects using backend.run
- self._current_job = self.backend.run(compiled_circuits, shots=self.shots, **self.run_args)
+ def generate_samples(self, circuit=None):
+ r"""Returns the computational basis samples generated for all wires.
- try:
- result = self._current_job.result(timeout=timeout)
- except TypeError: # pragma: no cover
- # timeout not supported
- result = self._current_job.result()
+ Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where
+ :math:`q_0` is the most significant bit.
- # increment counter for number of executions of qubit device
- # pylint: disable=no-member
- self._num_executions += 1
+ Args:
+ circuit (int): position of the circuit in the batch.
- # Compute statistics using the state and/or samples
- results = []
- for circuit, circuit_obj in zip(circuits, compiled_circuits):
- # Update the tracker
- if self.tracker.active:
- self.tracker.update(executions=1, shots=self.shots)
- self.tracker.record()
- if self._is_state_backend:
- self._state = self._get_state(result, experiment=circuit_obj)
- # generate computational basis samples
- if self.shots is not None or any(
- isinstance(m, SAMPLE_TYPES) for m in circuit.measurements
- ):
- self._samples = self.generate_samples(circuit_obj)
- res = self.statistics(circuit)
- single_measurement = len(circuit.measurements) == 1
- res = res[0] if single_measurement else tuple(res)
- results.append(res)
- if self.tracker.active:
- self.tracker.update(batches=1, batch_len=len(circuits))
- self.tracker.record()
- return results
+ Returns:
+ array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)``
+ """
+ counts = self._current_job.get_counts()
+ # Batch of circuits
+ if not isinstance(counts, dict):
+ counts = self._current_job.get_counts()[circuit]
+ samples = []
+ for key, value in counts.items():
+ samples.extend([key] * value)
+ return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples])
diff --git a/pennylane_qiskit/qiskit_device2.py b/pennylane_qiskit/qiskit_device2.py
deleted file mode 100644
index ed4cf6a1..00000000
--- a/pennylane_qiskit/qiskit_device2.py
+++ /dev/null
@@ -1,669 +0,0 @@
-# Copyright 2019-2024 Xanadu Quantum Technologies Inc.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-This module contains a prototype base class for constructing Qiskit devices
-for PennyLane with the new device API.
-# pylint: disable=too-many-instance-attributes,attribute-defined-outside-init, missing-function-docstring
-import warnings
-import inspect
-from typing import Union, Callable, Tuple, Sequence
-from contextlib import contextmanager
-import numpy as np
-import pennylane as qml
-from qiskit.compiler import transpile
-from qiskit.providers import BackendV2
-from qiskit_ibm_runtime import Session, SamplerV2 as Sampler, EstimatorV2 as Estimator
-from pennylane import transform
-from pennylane.transforms.core import TransformProgram
-from pennylane.transforms import broadcast_expand, split_non_commuting
-from pennylane.tape import QuantumTape, QuantumScript
-from pennylane.typing import Result, ResultBatch
-from pennylane.devices import Device
-from pennylane.devices.execution_config import ExecutionConfig, DefaultExecutionConfig
-from pennylane.devices.preprocess import (
- decompose,
- validate_observables,
- validate_measurements,
- validate_device_wires,
-from pennylane.measurements import ExpectationMP, VarianceMP
-from ._version import __version__
-from .converter import QISKIT_OPERATION_MAP, circuit_to_qiskit, mp_to_pauli
-QuantumTapeBatch = Sequence[QuantumTape]
-QuantumTape_or_Batch = Union[QuantumTape, QuantumTapeBatch]
-Result_or_ResultBatch = Union[Result, ResultBatch]
-# pylint: disable=protected-access
-def qiskit_session(device, **kwargs):
- """A context manager that creates a Qiskit Session and sets it as a session
- on the device while the context manager is active. Using the context manager
- will ensure the Session closes properly and is removed from the device after
- completing the tasks. Any Session that was initialized and passed into the
- device will be overwritten by the Qiskit Session created by this context
- manager.
- Args:
- device (QiskitDevice2): the device that will create remote tasks using the session
- **kwargs: session keyword arguments to be used for settings for the Session. At the
- time of writing, the only relevant keyword argument is "max_time", which lets you
- set the maximum amount of time the sessin is open. For the most up to date information,
- please refer to the Qiskit Session
- `documentation `_.
- **Example:**
- .. code-block:: python
- import pennylane as qml
- from pennylane_qiskit import qiskit_session
- from qiskit_ibm_runtime import QiskitRuntimeService
- # get backend
- service = QiskitRuntimeService(channel="ibm_quantum")
- backend = service.least_busy(simulator=False, operational=True)
- # initialize device
- dev = qml.device('qiskit.remote', wires=2, backend=backend)
- @qml.qnode(dev)
- def circuit(x):
- qml.RX(x, 0)
- qml.CNOT([0, 1])
- return qml.expval(qml.PauliZ(1))
- angle = 0.1
- with qiskit_session(dev, max_time=60) as session:
- # queue for the first execution
- res = circuit(angle)[0]
- # then this loop executes immediately after without queueing again
- while res > 0:
- angle += 0.3
- res = circuit(angle)[0]
- Note that if you passed in a session to your device, that session will be overwritten
- by `qiskit_session`.
- .. code-block:: python
- import pennylane as qml
- from pennylane_qiskit import qiskit_session
- from qiskit_ibm_runtime import QiskitRuntimeService, Session
- # get backend
- service = QiskitRuntimeService(channel="ibm_quantum")
- backend = service.least_busy(simulator=False, operational=True)
- # initialize device
- dev = qml.device('qiskit.remote', wires=2, backend=backend, session=Session(backend=backend, max_time=30))
- @qml.qnode(dev)
- def circuit(x):
- qml.RX(x, 0)
- qml.CNOT([0, 1])
- return qml.expval(qml.PauliZ(1))
- angle = 0.1
- # This session will have the Qiskit default settings max_time=900
- with qiskit_session(dev) as session:
- res = circuit(angle)[0]
- while res > 0:
- angle += 0.3
- res = circuit(angle)[0]
- """
- # Code to acquire session:
- existing_session = device._session
- session_options = {"backend": device.backend, "service": device.service}
- for k, v in kwargs.items():
- # Options like service and backend should be tied to the settings set on device
- if k in session_options:
- warnings.warn(f"Using '{k}' set in device, {getattr(device, k)}", UserWarning)
- else:
- session_options[k] = v
- session = Session(**session_options)
- device._session = session
- try:
- yield session
- finally:
- # Code to release session:
- session.close()
- device._session = existing_session
-def accepted_sample_measurement(m: qml.measurements.MeasurementProcess) -> bool:
- """Specifies whether or not a measurement is accepted when sampling."""
- return isinstance(
- m,
- (
- qml.measurements.SampleMeasurement,
- qml.measurements.ClassicalShadowMP,
- qml.measurements.ShadowExpvalMP,
- ),
- )
-def split_execution_types(
- tape: qml.tape.QuantumTape,
-) -> (Sequence[qml.tape.QuantumTape], Callable):
- """Split into separate tapes based on measurement type. Counts and sample-based measurements
- will use the Qiskit Sampler. ExpectationValue and Variance will use the Estimator, except
- when the measured observable does not have a `pauli_rep`. In that case, the Sampler will be
- used, and the raw samples will be processed to give an expectation value."""
- estimator = []
- sampler = []
- for i, mp in enumerate(tape.measurements):
- if isinstance(mp, (ExpectationMP, VarianceMP)):
- if mp.obs.pauli_rep:
- estimator.append((mp, i))
- else:
- warnings.warn(
- f"The observable measured {mp.obs} does not have a `pauli_rep` "
- "and will be run without using the Estimator primitive. Instead, "
- "raw samples from the Sampler will be used."
- )
- sampler.append((mp, i))
- else:
- sampler.append((mp, i))
- order_indices = [[i for mp, i in group] for group in [estimator, sampler]]
- tapes = []
- if estimator:
- tapes.extend(
- [
- qml.tape.QuantumScript(
- tape.operations,
- measurements=[mp for mp, i in estimator],
- shots=tape.shots,
- )
- ]
- )
- if sampler:
- tapes.extend(
- [
- qml.tape.QuantumScript(
- tape.operations,
- measurements=[mp for mp, i in sampler],
- shots=tape.shots,
- )
- ]
- )
- def reorder_fn(res):
- """re-order the output to the original shape and order"""
- flattened_indices = [i for group in order_indices for i in group]
- flattened_results = [r for group in res for r in group]
- result = dict(zip(flattened_indices, flattened_results))
- result = tuple(result[i] for i in sorted(result.keys()))
- return result[0] if len(result) == 1 else result
- return tapes, reorder_fn
-class QiskitDevice2(Device):
- r"""Hardware/simulator Qiskit device for PennyLane.
- Args:
- wires (int or Iterable[Number, str]]): Number of subsystems represented by the device,
- or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``)
- or strings (``['aux_wire', 'q1', 'q2']``).
- backend (Backend): the initialized Qiskit backend
- Keyword Args:
- shots (int or None): number of circuit evaluations/random samples used
- to estimate expectation values and variances of observables.
- session (Session): a Qiskit Session to use for device execution. If none is provided, a session will
- be created at each device execution.
- compile_backend (Union[Backend, None]): the backend to be used for compiling the circuit that will be
- sent to the backend device, to be set if the backend desired for compliation differs from the
- backend used for execution. Defaults to ``None``, which means the primary backend will be used.
- **kwargs: transpilation and runtime keyword arguments to be used for measurements with Primitives.
- If an `options` dictionary is defined amongst the kwargs, and there are settings that overlap
- with those in kwargs, the settings in `options` will take precedence over kwargs. Keyword
- arguments accepted by both the transpiler and at runtime (e.g. ``optimization_level``)
- will be passed to the transpiler rather than to the Primitive.
- """
- operations = set(QISKIT_OPERATION_MAP.keys())
- observables = {
- "PauliX",
- "PauliY",
- "PauliZ",
- "Identity",
- "Hadamard",
- "Hermitian",
- "Projector",
- "Prod",
- "Sum",
- "LinearCombination",
- "SProd",
- # TODO Could support SparseHamiltonian
- }
- # pylint:disable = too-many-arguments
- def __init__(
- self,
- wires,
- backend,
- shots=1024,
- session=None,
- compile_backend=None,
- **kwargs,
- ):
- if shots is None:
- warnings.warn(
- "Expected an integer number of shots, but received shots=None. Defaulting "
- "to 1024 shots. The analytic calculation of results is not supported on "
- "this device. All statistics obtained from this device are estimates based "
- "on samples.",
- UserWarning,
- )
- shots = 1024
- super().__init__(wires=wires, shots=shots)
- self._backend = backend
- self._compile_backend = compile_backend if compile_backend else backend
- self._service = getattr(backend, "_service", None)
- self._session = session
- kwargs["shots"] = shots
- # Perform validation against backend
- available_qubits = (
- backend.num_qubits
- if isinstance(backend, BackendV2)
- else backend.configuration().n_qubits
- )
- if len(self.wires) > int(available_qubits):
- raise ValueError(f"Backend '{backend}' supports maximum {available_qubits} wires")
- self.reset()
- self._kwargs, self._transpile_args = self._process_kwargs(
- kwargs
- ) # processes kwargs and separates transpilation arguments to dev._transpile_args
- @property
- def backend(self):
- """The Qiskit backend object.
- Returns:
- qiskit.providers.Backend: Qiskit backend object.
- """
- return self._backend
- @property
- def compile_backend(self):
- """The ``compile_backend`` is a Qiskit backend object to be used for transpilation.
- Returns:
- qiskit.providers.backend: Qiskit backend object.
- """
- return self._compile_backend
- @property
- def service(self):
- """The QiskitRuntimeService service.
- Returns:
- qiskit.qiskit_ibm_runtime.QiskitRuntimeService
- """
- return self._service
- @property
- def session(self):
- """The QiskitRuntimeService session.
- Returns:
- qiskit.qiskit_ibm_runtime.Session
- """
- return self._session
- @property
- def num_wires(self):
- return len(self.wires)
- def update_session(self, session):
- self._session = session
- def reset(self):
- self._current_job = None
- def stopping_condition(self, op: qml.operation.Operator) -> bool:
- """Specifies whether or not an Operator is accepted by QiskitDevice2."""
- return op.name in self.operations
- def observable_stopping_condition(self, obs: qml.operation.Operator) -> bool:
- """Specifies whether or not an observable is accepted by QiskitDevice2."""
- return obs.name in self.observables
- def preprocess(
- self,
- execution_config: ExecutionConfig = DefaultExecutionConfig,
- ) -> Tuple[TransformProgram, ExecutionConfig]:
- """This function defines the device transform program to be applied and an updated device configuration.
- Args:
- execution_config (Union[ExecutionConfig, Sequence[ExecutionConfig]]): A data structure describing the
- parameters needed to fully describe the execution.
- Returns:
- TransformProgram, ExecutionConfig: A transform program that when called returns QuantumTapes that the device
- can natively execute as well as a postprocessing function to be called after execution, and a configuration with
- unset specifications filled in.
- This device:
- * Supports any operations with explicit PennyLane to Qiskit gate conversions defined in the plugin
- * Does not intrinsically support parameter broadcasting
- """
- config = execution_config
- config.use_device_gradient = False
- transform_program = TransformProgram()
- transform_program.add_transform(validate_device_wires, self.wires, name=self.name)
- transform_program.add_transform(
- decompose,
- stopping_condition=self.stopping_condition,
- name=self.name,
- skip_initial_state_prep=False,
- )
- transform_program.add_transform(
- validate_measurements,
- sample_measurements=accepted_sample_measurement,
- name=self.name,
- )
- transform_program.add_transform(
- validate_observables,
- stopping_condition=self.observable_stopping_condition,
- name=self.name,
- )
- transform_program.add_transform(broadcast_expand)
- transform_program.add_transform(split_non_commuting)
- transform_program.add_transform(split_execution_types)
- return transform_program, config
- def _process_kwargs(self, kwargs):
- """Processes kwargs given and separates them into kwargs and transpile_args. If given
- a keyword argument 'options' that is a dictionary, a common practice in
- Qiskit, the options in said dictionary take precedence over any overlapping keyword
- arguments defined in the kwargs.
- Keyword Args:
- kwargs (dict): keyword arguments that set either runtime options or transpilation
- options.
- Returns:
- kwargs, transpile_args: keyword arguments for the runtime options and keyword
- arguments for the transpiler
- """
- if "noise_model" in kwargs:
- noise_model = kwargs.pop("noise_model")
- self.backend.set_options(noise_model=noise_model)
- if "options" in kwargs:
- for key, val in kwargs.pop("options").items():
- if key in kwargs:
- warnings.warn(
- "An overlap between what was passed in via options and what was passed in via kwargs was found."
- f"The value set in options {key}={val} will be used."
- )
- kwargs[key] = val
- shots = kwargs.pop("shots")
- if "default_shots" in kwargs:
- warnings.warn(
- f"default_shots was found in the keyword arguments, but it is not supported by {self.name}"
- "Please use the `shots` keyword argument instead. The number of shots "
- f"{shots} will be used instead."
- )
- kwargs["default_shots"] = shots
- kwargs, transpile_args = self.get_transpile_args(kwargs)
- return kwargs, transpile_args
- @staticmethod
- def get_transpile_args(kwargs):
- """The transpile argument setter. This separates keyword arguments related to transpilation
- from the rest of the keyword arguments and removes those keyword arguments from kwargs.
- Keyword Args:
- kwargs (dict): combined keyword arguments to be parsed for the Qiskit transpiler. For more details, see the
- `Qiskit transpiler documentation `_
- Returns:
- kwargs (dict), transpile_args (dict): keyword arguments for the runtime options and keyword
- arguments for the transpiler
- """
- transpile_sig = inspect.signature(transpile).parameters
- transpile_args = {arg: kwargs.pop(arg) for arg in transpile_sig if arg in kwargs}
- transpile_args.pop("circuits", None)
- transpile_args.pop("backend", None)
- return kwargs, transpile_args
- def compile_circuits(self, circuits):
- """Compiles multiple circuits one after the other.
- Args:
- circuits (list[QuantumCircuit]): the circuits to be compiled
- Returns:
- list[QuantumCircuit]: the list of compiled circuits
- """
- # Compile each circuit object
- compiled_circuits = []
- transpile_args = self._transpile_args
- for i, circuit in enumerate(circuits):
- compiled_circ = transpile(circuit, backend=self.compile_backend, **transpile_args)
- compiled_circ.name = f"circ{i}"
- compiled_circuits.append(compiled_circ)
- return compiled_circuits
- # pylint: disable=unused-argument, no-member
- def execute(
- self,
- circuits: QuantumTape_or_Batch,
- execution_config: ExecutionConfig = DefaultExecutionConfig,
- ) -> Result_or_ResultBatch:
- session = self._session or Session(backend=self.backend)
- results = []
- if isinstance(circuits, QuantumScript):
- circuits = [circuits]
- @contextmanager
- def execute_circuits(session):
- try:
- for circ in circuits:
- if circ.shots and len(circ.shots.shot_vector) > 1:
- raise ValueError(
- f"Setting shot vector {circ.shots.shot_vector} is not supported for {self.name}."
- "Please use a single integer instead when specifying the number of shots."
- )
- if isinstance(circ.measurements[0], (ExpectationMP, VarianceMP)) and getattr(
- circ.measurements[0].obs, "pauli_rep", None
- ):
- execute_fn = self._execute_estimator
- else:
- execute_fn = self._execute_sampler
- results.append(execute_fn(circ, session))
- yield results
- finally:
- session.close()
- with execute_circuits(session) as results:
- return results
- def _execute_sampler(self, circuit, session):
- """Returns the result of the execution of the circuit using the SamplerV2 Primitive.
- Note that this result has been processed respective to the MeasurementProcess given.
- E.g. `qml.expval` returns an expectation value whereas `qml.sample()` will return the raw samples.
- Args:
- circuits (list[QuantumCircuit]): the circuits to be executed via SamplerV2
- session (Session): the session that the execution will be performed with
- Returns:
- result (tuple): the processed result from SamplerV2
- """
- qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=True, measure=True)]
- sampler = Sampler(session=session)
- compiled_circuits = self.compile_circuits(qcirc)
- sampler.options.update(**self._kwargs)
- # len(compiled_circuits) is always 1 so the indexing does not matter.
- result = sampler.run(
- compiled_circuits,
- shots=circuit.shots.total_shots if circuit.shots.total_shots else None,
- ).result()[0]
- classical_register_name = compiled_circuits[0].cregs[0].name
- self._current_job = getattr(result.data, classical_register_name)
- # needs processing function to convert to the correct format for states, and
- # also handle instances where wires were specified in probs, and for multiple probs measurements
- self._samples = self.generate_samples(0)
- res = [
- mp.process_samples(self._samples, wire_order=self.wires) for mp in circuit.measurements
- ]
- single_measurement = len(circuit.measurements) == 1
- res = (res[0],) if single_measurement else tuple(res)
- return res
- def _execute_estimator(self, circuit, session):
- """Returns the result of the execution of the circuit using the EstimatorV2 Primitive.
- Note that this result has been processed respective to the MeasurementProcess given.
- E.g. `qml.expval` returns an expectation value whereas `qml.var` will return the variance.
- Args:
- circuits (list[QuantumCircuit]): the circuits to be executed via EstimatorV2
- session (Session): the session that the execution will be performed with
- Returns:
- result (tuple): the processed result from EstimatorV2
- """
- # the Estimator primitive takes care of diagonalization and measurements itself,
- # so diagonalizing gates and measurements are not included in the circuit
- qcirc = [circuit_to_qiskit(circuit, self.num_wires, diagonalize=False, measure=False)]
- estimator = Estimator(session=session)
- pauli_observables = [mp_to_pauli(mp, self.num_wires) for mp in circuit.measurements]
- compiled_circuits = self.compile_circuits(qcirc)
- estimator.options.update(**self._kwargs)
- # split into one call per measurement
- # could technically be more efficient if there are some observables where we ask
- # for expectation value and variance on the same observable, but spending time on
- # that right now feels excessive
- circ_and_obs = [(compiled_circuits[0], pauli_observables)]
- result = estimator.run(
- circ_and_obs,
- precision=np.sqrt(1 / circuit.shots.total_shots) if circuit.shots else None,
- ).result()
- self._current_job = result
- result = self._process_estimator_job(circuit.measurements, result)
- return result
- @staticmethod
- def _process_estimator_job(measurements, job_result):
- """Estimator returns the expectation value and standard error for each observable measured,
- along with some metadata that contains the precision. Extracts the relevant number for each
- measurement process and return the requested results from the Estimator executions.
- Note that for variance, we calculate the variance by using the standard error and the
- precision value.
- Args:
- measurements (list[MeasurementProcess]): the measurements in the circuit
- job_result (Any): the result from EstimatorV2
- Returns:
- result (tuple): the processed result from EstimatorV2
- """
- expvals = job_result[0].data.evs
- variances = (job_result[0].data.stds / job_result[0].metadata["target_precision"]) ** 2
- result = []
- for i, mp in enumerate(measurements):
- if isinstance(mp, ExpectationMP):
- result.append(expvals[i])
- elif isinstance(mp, VarianceMP):
- result.append(variances[i])
- single_measurement = len(measurements) == 1
- result = (result[0],) if single_measurement else tuple(result)
- return result
- def generate_samples(self, circuit=None):
- r"""Returns the computational basis samples generated for all wires.
- Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where
- :math:`q_0` is the most significant bit.
- Args:
- circuit (int): position of the circuit in the batch.
- Returns:
- array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)``
- """
- counts = self._current_job.get_counts()
- # Batch of circuits
- if not isinstance(counts, dict):
- counts = self._current_job.get_counts()[circuit]
- samples = []
- for key, value in counts.items():
- samples.extend([key] * value)
- return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples])
diff --git a/pennylane_qiskit/qiskit_device_legacy.py b/pennylane_qiskit/qiskit_device_legacy.py
new file mode 100644
index 00000000..d44a17d7
--- /dev/null
+++ b/pennylane_qiskit/qiskit_device_legacy.py
@@ -0,0 +1,491 @@
+# Copyright 2019-2021 Xanadu Quantum Technologies Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+This module contains a base class for constructing Qiskit devices for PennyLane.
+# pylint: disable=too-many-instance-attributes,attribute-defined-outside-init
+import abc
+import inspect
+import warnings
+import numpy as np
+from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
+from qiskit.compiler import transpile
+from qiskit.converters import circuit_to_dag, dag_to_circuit
+from qiskit.providers import Backend, BackendV2, QiskitBackendNotFoundError
+from pennylane import QubitDevice, DeviceError
+from pennylane.measurements import SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP
+from .converter import QISKIT_OPERATION_MAP
+from ._version import __version__
+SAMPLE_TYPES = (SampleMP, CountsMP, ClassicalShadowMP, ShadowExpvalMP)
+def _get_backend_name(backend):
+ try:
+ return backend.name() # BackendV1
+ except TypeError: # pragma: no cover
+ return backend.name # BackendV2
+class QiskitDeviceLegacy(QubitDevice, abc.ABC):
+ r"""Abstract Qiskit device for PennyLane.
+ Args:
+ wires (int or Iterable[Number, str]]): Number of subsystems represented by the device,
+ or iterable that contains unique labels for the subsystems as numbers (i.e., ``[-1, 0, 2]``)
+ or strings (``['ancilla', 'q1', 'q2']``).
+ provider (Provider | None): The Qiskit backend provider.
+ backend (str | Backend): the desired backend. If a string, a provider must be given.
+ shots (int or None): number of circuit evaluations/random samples used
+ to estimate expectation values and variances of observables. For state vector backends,
+ setting to ``None`` results in computing statistics like expectation values and variances analytically.
+ Keyword Args:
+ name (str): The name of the circuit. Default ``'circuit'``.
+ compile_backend (BaseBackend): The backend used for compilation. If you wish
+ to simulate a device compliant circuit, you can specify a backend here.
+ """
+ name = "Qiskit PennyLane plugin"
+ pennylane_requires = ">=0.30.0"
+ version = __version__
+ plugin_version = __version__
+ author = "Xanadu"
+ _capabilities = {
+ "model": "qubit",
+ "tensor_observables": True,
+ "inverse_operations": True,
+ }
+ _operation_map = QISKIT_OPERATION_MAP
+ _state_backends = {
+ "statevector_simulator",
+ "simulator_statevector",
+ "unitary_simulator",
+ "aer_simulator_statevector",
+ "aer_simulator_unitary",
+ }
+ """set[str]: Set of backend names that define the backends
+ that support returning the underlying quantum statevector"""
+ operations = set(_operation_map.keys())
+ observables = {
+ "PauliX",
+ "PauliY",
+ "PauliZ",
+ "Identity",
+ "Hadamard",
+ "Hermitian",
+ "Projector",
+ }
+ analytic_warning_message = (
+ "The analytic calculation of expectations, variances and "
+ "probabilities is only supported on statevector backends, not on the {}. "
+ "Such statistics obtained from this device are estimates based on samples."
+ )
+ _eigs = {}
+ def __init__(self, wires, provider, backend, shots=1024, **kwargs):
+ super().__init__(wires=wires, shots=shots)
+ self.provider = provider
+ if isinstance(backend, Backend):
+ self._backend = backend
+ self.backend_name = _get_backend_name(backend)
+ elif provider is None:
+ raise ValueError("Must pass a provider if the backend is not a Backend instance.")
+ else:
+ try:
+ self._backend = provider.get_backend(backend)
+ except QiskitBackendNotFoundError as e:
+ available_backends = list(map(_get_backend_name, provider.backends()))
+ raise ValueError(
+ f"Backend '{backend}' does not exist. Available backends "
+ f"are:\n {available_backends}"
+ ) from e
+ self.backend_name = _get_backend_name(self._backend)
+ # Keep track if the user specified analytic to be True
+ if shots is None and not self._is_state_backend:
+ # Raise a warning if no shots were specified for a hardware device
+ warnings.warn(self.analytic_warning_message.format(backend), UserWarning)
+ self.shots = 1024
+ self._capabilities["returns_state"] = self._is_state_backend
+ # Perform validation against backend
+ backend_qubits = (
+ backend.num_qubits
+ if isinstance(backend, BackendV2)
+ else self.backend.configuration().n_qubits
+ )
+ if backend_qubits and len(self.wires) > int(backend_qubits):
+ raise ValueError(f"Backend '{backend}' supports maximum {backend_qubits} wires")
+ # Initialize inner state
+ self.reset()
+ self.process_kwargs(kwargs)
+ def process_kwargs(self, kwargs):
+ """Processing the keyword arguments that were provided upon device initialization.
+ Args:
+ kwargs (dict): keyword arguments to be set for the device
+ """
+ self.compile_backend = None
+ if "compile_backend" in kwargs:
+ self.compile_backend = kwargs.pop("compile_backend")
+ if "noise_model" in kwargs:
+ noise_model = kwargs.pop("noise_model")
+ self.backend.set_options(noise_model=noise_model)
+ # set transpile_args
+ self.set_transpile_args(**kwargs)
+ # Get further arguments for run
+ self.run_args = {}
+ # Specify to have a memory for hw/hw simulators
+ compile_backend = self.compile_backend or self.backend
+ memory = str(compile_backend) not in self._state_backends
+ if memory:
+ kwargs["memory"] = True
+ # Consider the remaining kwargs as keyword arguments to run
+ self.run_args.update(kwargs)
+ @property
+ def _is_state_backend(self):
+ """Returns whether this device has a state backend."""
+ return self.backend_name in self._state_backends or self.backend.options.get("method") in {
+ "unitary",
+ "statevector",
+ }
+ @property
+ def _is_statevector_backend(self):
+ """Returns whether this device has a statevector backend."""
+ method = "statevector"
+ return method in self.backend_name or self.backend.options.get("method") == method
+ @property
+ def _is_unitary_backend(self):
+ """Returns whether this device has a unitary backend."""
+ method = "unitary"
+ return method in self.backend_name or self.backend.options.get("method") == method
+ def set_transpile_args(self, **kwargs):
+ """The transpile argument setter.
+ Keyword Args:
+ kwargs (dict): keyword arguments to be set for the Qiskit transpiler. For more details, see the
+ `Qiskit transpiler documentation `_
+ """
+ transpile_sig = inspect.signature(transpile).parameters
+ self.transpile_args = {arg: kwargs[arg] for arg in transpile_sig if arg in kwargs}
+ self.transpile_args.pop("circuits", None)
+ self.transpile_args.pop("backend", None)
+ @property
+ def backend(self):
+ """The Qiskit backend object.
+ Returns:
+ qiskit.providers.backend: Qiskit backend object.
+ """
+ return self._backend
+ def reset(self):
+ """Reset the Qiskit backend device"""
+ # Reset only internal data, not the options that are determined on
+ # device creation
+ self._reg = QuantumRegister(self.num_wires, "q")
+ self._creg = ClassicalRegister(self.num_wires, "c")
+ self._circuit = QuantumCircuit(self._reg, self._creg, name="temp")
+ self._current_job = None
+ self._state = None # statevector of a simulator backend
+ def create_circuit_object(self, operations, **kwargs):
+ """Builds the circuit objects based on the operations and measurements
+ specified to apply.
+ Args:
+ operations (list[~.Operation]): operations to apply to the device
+ Keyword args:
+ rotations (list[~.Operation]): Operations that rotate the circuit
+ pre-measurement into the eigenbasis of the observables.
+ """
+ rotations = kwargs.get("rotations", [])
+ applied_operations = self.apply_operations(operations)
+ # Rotating the state for measurement in the computational basis
+ rotation_circuits = self.apply_operations(rotations)
+ applied_operations.extend(rotation_circuits)
+ for circuit in applied_operations:
+ self._circuit &= circuit
+ if not self._is_state_backend:
+ # Add measurements if they are needed
+ for qr, cr in zip(self._reg, self._creg):
+ self._circuit.measure(qr, cr)
+ elif "aer" in self.backend_name:
+ self._circuit.save_state()
+ def apply(self, operations, **kwargs):
+ """Build the circuit object and apply the operations"""
+ self.create_circuit_object(operations, **kwargs)
+ # These operations need to run for all devices
+ compiled_circuit = self.compile()
+ self.run(compiled_circuit)
+ def apply_operations(self, operations):
+ """Apply the circuit operations.
+ This method serves as an auxiliary method to :meth:`~.QiskitDevice.apply`.
+ Args:
+ operations (List[pennylane.Operation]): operations to be applied
+ Returns:
+ list[QuantumCircuit]: a list of quantum circuit objects that
+ specify the corresponding operations
+ """
+ circuits = []
+ for operation in operations:
+ # Apply the circuit operations
+ device_wires = self.map_wires(operation.wires)
+ par = operation.parameters
+ for idx, p in enumerate(par):
+ if isinstance(p, np.ndarray):
+ # Convert arrays so that Qiskit accepts the parameter
+ par[idx] = p.tolist()
+ operation = operation.name
+ mapped_operation = self._operation_map[operation]
+ self.qubit_state_vector_check(operation)
+ qregs = [self._reg[i] for i in device_wires.labels]
+ if operation in ("QubitUnitary", "QubitStateVector", "StatePrep"):
+ # Need to revert the order of the quantum registers used in
+ # Qiskit such that it matches the PennyLane ordering
+ qregs = list(reversed(qregs))
+ if operation in ("Barrier",):
+ # Need to add the num_qubits for instantiating Barrier in Qiskit
+ par = [len(self._reg)]
+ dag = circuit_to_dag(QuantumCircuit(self._reg, self._creg, name=""))
+ gate = mapped_operation(*par)
+ dag.apply_operation_back(gate, qargs=qregs)
+ circuit = dag_to_circuit(dag)
+ circuits.append(circuit)
+ return circuits
+ def qubit_state_vector_check(self, operation):
+ """Input check for the StatePrepBase operations.
+ Args:
+ operation (pennylane.Operation): operation to be checked
+ Raises:
+ DeviceError: If the operation is QubitStateVector or StatePrep
+ """
+ if operation in ("QubitStateVector", "StatePrep"):
+ if self._is_unitary_backend:
+ raise DeviceError(
+ f"The {operation} operation "
+ "is not supported on the unitary simulator backend."
+ )
+ def compile(self):
+ """Compile the quantum circuit to target the provided compile_backend.
+ If compile_backend is None, then the target is simply the
+ backend.
+ """
+ compile_backend = self.compile_backend or self.backend
+ compiled_circuits = transpile(self._circuit, backend=compile_backend, **self.transpile_args)
+ return compiled_circuits
+ def run(self, qcirc):
+ """Run the compiled circuit and query the result.
+ Args:
+ qcirc (qiskit.QuantumCircuit): the quantum circuit to be run on the backend
+ """
+ self._current_job = self.backend.run(qcirc, shots=self.shots, **self.run_args)
+ result = self._current_job.result()
+ if self._is_state_backend:
+ self._state = self._get_state(result)
+ def _get_state(self, result, experiment=None):
+ """Returns the statevector for state simulator backends.
+ Args:
+ result (qiskit.Result): result object
+ experiment (str or None): the name of the experiment to get the state for.
+ Returns:
+ array[float]: size ``(2**num_wires,)`` statevector
+ """
+ if self._is_statevector_backend:
+ state = np.asarray(result.get_statevector(experiment))
+ elif self._is_unitary_backend:
+ unitary = np.asarray(result.get_unitary(experiment))
+ initial_state = np.zeros([2**self.num_wires])
+ initial_state[0] = 1
+ state = unitary @ initial_state
+ # reverse qubit order to match PennyLane convention
+ return state.reshape([2] * self.num_wires).T.flatten()
+ def generate_samples(self, circuit=None):
+ r"""Returns the computational basis samples generated for all wires.
+ Note that PennyLane uses the convention :math:`|q_0,q_1,\dots,q_{N-1}\rangle` where
+ :math:`q_0` is the most significant bit.
+ Args:
+ circuit (str or None): the name of the circuit to get the state for
+ Returns:
+ array[complex]: array of samples in the shape ``(dev.shots, dev.num_wires)``
+ """
+ # branch out depending on the type of backend
+ if self._is_state_backend:
+ # software simulator: need to sample from probabilities
+ return super().generate_samples()
+ # hardware or hardware simulator
+ samples = self._current_job.result().get_memory(circuit)
+ # reverse qubit order to match PennyLane convention
+ return np.vstack([np.array([int(i) for i in s[::-1]]) for s in samples])
+ @property
+ def state(self):
+ """Get state of the device"""
+ return self._state
+ def analytic_probability(self, wires=None):
+ """Get the analytic probability of the device"""
+ if self._state is None:
+ return None
+ prob = self.marginal_prob(np.abs(self._state) ** 2, wires)
+ return prob
+ def compile_circuits(self, circuits):
+ r"""Compiles multiple circuits one after the other.
+ Args:
+ circuits (list[.tapes.QuantumTape]): the circuits to be compiled
+ Returns:
+ list[QuantumCircuit]: the list of compiled circuits
+ """
+ # Compile each circuit object
+ compiled_circuits = []
+ for circuit in circuits:
+ # We need to reset the device here, else it will
+ # not start the next computation in the zero state
+ self.reset()
+ self.create_circuit_object(circuit.operations, rotations=circuit.diagonalizing_gates)
+ compiled_circ = self.compile()
+ compiled_circ.name = f"circ{len(compiled_circuits)}"
+ compiled_circuits.append(compiled_circ)
+ return compiled_circuits
+ def batch_execute(self, circuits, timeout: int = None):
+ """Batch execute the circuits on the device"""
+ compiled_circuits = self.compile_circuits(circuits)
+ if not compiled_circuits:
+ # At least one circuit must always be provided to the backend.
+ return []
+ # Send the batch of circuit objects using backend.run
+ self._current_job = self.backend.run(compiled_circuits, shots=self.shots, **self.run_args)
+ try:
+ result = self._current_job.result(timeout=timeout)
+ except TypeError: # pragma: no cover
+ # timeout not supported
+ result = self._current_job.result()
+ # increment counter for number of executions of qubit device
+ # pylint: disable=no-member
+ self._num_executions += 1
+ # Compute statistics using the state and/or samples
+ results = []
+ for circuit, circuit_obj in zip(circuits, compiled_circuits):
+ # Update the tracker
+ if self.tracker.active:
+ self.tracker.update(executions=1, shots=self.shots)
+ self.tracker.record()
+ if self._is_state_backend:
+ self._state = self._get_state(result, experiment=circuit_obj)
+ # generate computational basis samples
+ if self.shots is not None or any(
+ isinstance(m, SAMPLE_TYPES) for m in circuit.measurements
+ ):
+ self._samples = self.generate_samples(circuit_obj)
+ res = self.statistics(circuit)
+ single_measurement = len(circuit.measurements) == 1
+ res = res[0] if single_measurement else tuple(res)
+ results.append(res)
+ if self.tracker.active:
+ self.tracker.update(batches=1, batch_len=len(circuits))
+ self.tracker.record()
+ return results
diff --git a/tests/test_base_device.py b/tests/test_base_device.py
index 2fd3c2fe..d958fda8 100644
--- a/tests/test_base_device.py
+++ b/tests/test_base_device.py
@@ -34,8 +34,8 @@
from qiskit.providers import BackendV1, BackendV2
from qiskit import QuantumCircuit, transpile
-from pennylane_qiskit.qiskit_device2 import (
- QiskitDevice2,
+from pennylane_qiskit.qiskit_device import (
+ QiskitDevice,
@@ -117,7 +117,7 @@ def close(self): # This is just to appease a test
mocked_backend = MockedBackend()
legacy_backend = MockedBackendLegacy()
aer_backend = AerSimulator()
-test_dev = QiskitDevice2(wires=5, backend=aer_backend)
+test_dev = QiskitDevice(wires=5, backend=aer_backend)
class TestSupportForV1andV2:
@@ -129,7 +129,7 @@ class TestSupportForV1andV2:
def test_v1_and_v2_mocked(self, backend):
"""Test that device initializes with no error mocked"""
- dev = QiskitDevice2(wires=10, backend=backend)
+ dev = QiskitDevice(wires=10, backend=backend)
assert dev._backend == backend
@@ -141,7 +141,7 @@ def test_v1_and_v2_mocked(self, backend):
def test_v1_and_v2_manila(self, backend, shape):
"""Test that device initializes and runs without error with V1 and V2 backends by Qiskit"""
- dev = QiskitDevice2(wires=5, backend=backend)
+ dev = QiskitDevice(wires=5, backend=backend)
def circuit(x):
@@ -163,8 +163,8 @@ def test_compile_backend_kwarg(self):
compile_backend = MockedBackend(name="compile_backend")
main_backend = MockedBackend(name="main_backend")
- dev1 = QiskitDevice2(wires=5, backend=main_backend)
- dev2 = QiskitDevice2(wires=5, backend=main_backend, compile_backend=compile_backend)
+ dev1 = QiskitDevice(wires=5, backend=main_backend)
+ dev2 = QiskitDevice(wires=5, backend=main_backend, compile_backend=compile_backend)
assert dev1._compile_backend == dev1._backend == main_backend
@@ -179,7 +179,7 @@ def test_no_shots_warns_and_defaults(self):
match="Expected an integer number of shots, but received shots=None",
- dev = QiskitDevice2(wires=2, backend=aer_backend, shots=None)
+ dev = QiskitDevice(wires=2, backend=aer_backend, shots=None)
assert dev.shots.total_shots == 1024
@@ -189,15 +189,15 @@ def test_backend_wire_validation(self, backend):
the number of wires available on the backend, for both backend versions"""
with pytest.raises(ValueError, match="supports maximum"):
- QiskitDevice2(wires=500, backend=backend)
+ QiskitDevice(wires=500, backend=backend)
def test_setting_simulator_noise_model(self):
"""Test that the simulator noise model saved on a passed Options
object is used to set the backend noise model"""
new_backend = MockedBackend()
- dev1 = QiskitDevice2(wires=3, backend=aer_backend)
- dev2 = QiskitDevice2(wires=3, backend=new_backend, noise_model={"placeholder": 1})
+ dev1 = QiskitDevice(wires=3, backend=aer_backend)
+ dev2 = QiskitDevice(wires=3, backend=new_backend, noise_model={"placeholder": 1})
assert dev1.backend.options.noise_model is None
assert dev2.backend.options.noise_model == {"placeholder": 1}
@@ -210,7 +210,7 @@ class TestQiskitSessionManagement:
def test_default_no_session_on_initialization(self, backend):
"""Test that the default behaviour is no session at initialization"""
- dev = QiskitDevice2(wires=2, backend=backend)
+ dev = QiskitDevice(wires=2, backend=backend)
assert dev._session is None
@pytest.mark.parametrize("backend", [aer_backend, FakeManila(), FakeManilaV2()])
@@ -218,15 +218,15 @@ def test_initializing_with_session(self, backend):
"""Test that you can initialize a device with an existing Qiskit session"""
session = MockSession(backend=backend, max_time="1m")
- dev = QiskitDevice2(wires=2, backend=backend, session=session)
+ dev = QiskitDevice(wires=2, backend=backend, session=session)
assert dev._session == session
- @patch("pennylane_qiskit.qiskit_device2.Session")
+ @patch("pennylane_qiskit.qiskit_device.Session")
@pytest.mark.parametrize("initial_session", [None, MockSession(aer_backend)])
def test_using_session_context(self, mock_session, initial_session):
"""Test that you can add a session within a context manager"""
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session)
assert dev._session == initial_session
@@ -238,7 +238,7 @@ def test_using_session_context(self, mock_session, initial_session):
def test_using_session_context_options(self):
"""Test that you can set session options using qiskit_session"""
- dev = QiskitDevice2(wires=2, backend=aer_backend)
+ dev = QiskitDevice(wires=2, backend=aer_backend)
assert dev._session is None
@@ -254,7 +254,7 @@ def test_error_when_passing_unexpected_kwarg(self):
Qiskit allows for more customization we can automatically accomodate those needs. Right
now there are no such keyword arguments, so an error on Qiskit's side is raised."""
- dev = QiskitDevice2(wires=2, backend=aer_backend)
+ dev = QiskitDevice(wires=2, backend=aer_backend)
assert dev._session is None
@@ -269,7 +269,7 @@ def test_error_when_passing_unexpected_kwarg(self):
def test_no_warning_when_using_initial_session_options(self):
initial_session = Session(backend=aer_backend, max_time=30)
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session)
assert dev._session == initial_session
@@ -288,7 +288,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder):
default options, passed in from the `qiskit_session` take precedence, barring
`backend` or `service`"""
initial_session = Session(backend=aer_backend)
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session)
assert dev._session == initial_session
@@ -314,7 +314,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder):
assert dev._session == initial_session
max_time_session = Session(backend=aer_backend, max_time=60)
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=max_time_session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=max_time_session)
with qiskit_session(dev, max_time=30) as session:
assert dev._session == session
assert dev._session != initial_session
@@ -328,7 +328,7 @@ def test_warnings_when_overriding_session_context_options(self, recorder):
def test_update_session(self, initial_session):
"""Test that you can update the session stored on the device"""
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=initial_session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=initial_session)
assert dev._session == initial_session
new_session = MockSession(backend=aer_backend, max_time="1m")
@@ -519,7 +519,7 @@ def test_observable_stopping_condition(self, obs, expected):
def test_preprocess_split_non_commuting(self, measurements, num_tapes):
"""Test that `split_non_commuting` works as expected in the preprocess function."""
- dev = QiskitDevice2(wires=5, backend=aer_backend)
+ dev = QiskitDevice(wires=5, backend=aer_backend)
qs = QuantumScript([], measurements=measurements, shots=qml.measurements.Shots(1000))
program, _ = dev.preprocess()
@@ -542,7 +542,7 @@ def test_preprocess_splits_incompatible_primitive_measurements(self, measurement
on measurement type. Expval and Variance are one type (Estimator), Probs and raw-sample based measurements
are another type (Sampler)."""
- dev = QiskitDevice2(wires=5, backend=aer_backend)
+ dev = QiskitDevice(wires=5, backend=aer_backend)
qs = QuantumScript([], measurements=measurements, shots=qml.measurements.Shots(1000))
program, _ = dev.preprocess()
@@ -591,20 +591,20 @@ def test_warning_if_shots(self):
match="default_shots was found in the keyword arguments",
- dev = QiskitDevice2(wires=2, backend=aer_backend, default_shots=333)
+ dev = QiskitDevice(wires=2, backend=aer_backend, default_shots=333)
# Qiskit takes in `default_shots` to define the # of shots, therefore we use
# the kwarg "default_shots" rather than shots to pass it to Qiskit.
assert dev._kwargs["default_shots"] == 1024
- dev = QiskitDevice2(wires=2, backend=aer_backend, shots=200)
+ dev = QiskitDevice(wires=2, backend=aer_backend, shots=200)
assert dev._kwargs["default_shots"] == 200
with pytest.warns(
match="default_shots was found in the keyword arguments",
- dev = QiskitDevice2(wires=2, backend=aer_backend, options={"default_shots": 30})
+ dev = QiskitDevice(wires=2, backend=aer_backend, options={"default_shots": 30})
# resets to default since we reinitialize the device
assert dev._kwargs["default_shots"] == 1024
@@ -615,7 +615,7 @@ def test_warning_if_options_and_kwargs_overlap(self):
match="An overlap between",
- dev = QiskitDevice2(
+ dev = QiskitDevice(
options={"resilience_level": 1, "optimization_level": 1},
@@ -642,7 +642,7 @@ def test_options_and_kwargs_combine_into_unified_kwargs(self, backend):
"""Test that options set via the keyword argument options and options set via kwargs
will combine into a single unified kwargs that is passed to the device"""
- dev = QiskitDevice2(
+ dev = QiskitDevice(
options={"resilience_level": 1},
@@ -666,7 +666,7 @@ def test_no_error_is_raised_if_transpilation_options_are_passed(self, backend):
"""Tests that when transpilation options are passed in, they are properly
handled without error"""
- dev = QiskitDevice2(
+ dev = QiskitDevice(
options={"resilience_level": 1, "optimization_level": 1},
@@ -692,7 +692,7 @@ def circuit():
class TestDeviceProperties:
def test_name_property(self):
"""Test the backend property"""
- assert test_dev.name == "QiskitDevice2"
+ assert test_dev.name == "QiskitDevice"
def test_backend_property(self):
"""Test the backend property"""
@@ -704,7 +704,7 @@ def test_compile_backend_property(self, backend):
"""Test the compile_backend property"""
compile_backend = MockedBackend(name="compile_backend")
- dev = QiskitDevice2(wires=5, backend=backend, compile_backend=compile_backend)
+ dev = QiskitDevice(wires=5, backend=backend, compile_backend=compile_backend)
assert dev.compile_backend == dev._compile_backend
assert dev.compile_backend == compile_backend
@@ -717,7 +717,7 @@ def test_session_property(self):
"""Test the session property"""
session = MockSession(backend=aer_backend)
- dev = QiskitDevice2(wires=2, backend=aer_backend, session=session)
+ dev = QiskitDevice(wires=2, backend=aer_backend, session=session)
assert dev.session == dev._session
assert dev.session == session
@@ -725,7 +725,7 @@ def test_num_wires_property(self):
"""Test the num_wires property"""
wires = [1, 2, 3]
- dev = QiskitDevice2(wires=wires, backend=aer_backend)
+ dev = QiskitDevice(wires=wires, backend=aer_backend)
assert dev.num_wires == len(wires)
@@ -742,7 +742,7 @@ def test_get_transpile_args(self):
"circuits": [],
compile_backend = MockedBackend(name="compile_backend")
- dev = QiskitDevice2(
+ dev = QiskitDevice(
wires=5, backend=aer_backend, compile_backend=compile_backend, **transpile_args
assert dev._transpile_args == {
@@ -750,14 +750,14 @@ def test_get_transpile_args(self):
"seed_transpiler": 42,
- @patch("pennylane_qiskit.qiskit_device2.transpile")
+ @patch("pennylane_qiskit.qiskit_device.transpile")
@pytest.mark.parametrize("compile_backend", [None, MockedBackend(name="compile_backend")])
def test_compile_circuits(self, transpile_mock, compile_backend):
"""Tests compile_circuits with a mocked transpile function to avoid calling
a remote backend. Confirm compile_backend and transpile_args are used."""
transpile_args = {"seed_transpiler": 42, "optimization_level": 2}
- dev = QiskitDevice2(
+ dev = QiskitDevice(
wires=5, backend=aer_backend, compile_backend=compile_backend, **transpile_args
@@ -815,18 +815,18 @@ def get_counts():
assert len(np.argwhere([np.allclose(s, [0, 1]) for s in samples])) == results_dict["10"]
assert len(np.argwhere([np.allclose(s, [1, 0]) for s in samples])) == results_dict["01"]
- @patch("pennylane_qiskit.qiskit_device2.QiskitDevice2._execute_estimator")
+ @patch("pennylane_qiskit.qiskit_device.QiskitDevice._execute_estimator")
def test_execute_pipeline_primitives_no_session(self, mocker):
"""Test that a Primitives-based device initialized with no Session creates one for the
execution, and then returns the device session to None."""
- dev = QiskitDevice2(wires=5, backend=aer_backend, session=None)
+ dev = QiskitDevice(wires=5, backend=aer_backend, session=None)
assert dev._session is None
qs = QuantumScript([qml.PauliX(0), qml.PauliY(1)], measurements=[qml.expval(qml.PauliZ(0))])
- with patch("pennylane_qiskit.qiskit_device2.Session") as mock_session:
+ with patch("pennylane_qiskit.qiskit_device.Session") as mock_session:
mock_session.assert_called_once() # a session was created
@@ -837,7 +837,7 @@ def test_execute_pipeline_with_all_execute_types_mocked(self, mocker, backend):
"""Test that a device executes measurements that require raw samples via the sampler,
and the relevant primitive measurements via the estimator"""
- dev = QiskitDevice2(wires=5, backend=backend, session=MockSession(backend))
+ dev = QiskitDevice(wires=5, backend=backend, session=MockSession(backend))
qs = QuantumScript(
[qml.PauliX(0), qml.PauliY(1)],
@@ -865,8 +865,8 @@ def test_execute_pipeline_with_all_execute_types_mocked(self, mocker, backend):
- @patch("pennylane_qiskit.qiskit_device2.Estimator")
- @patch("pennylane_qiskit.qiskit_device2.QiskitDevice2._process_estimator_job")
+ @patch("pennylane_qiskit.qiskit_device.Estimator")
+ @patch("pennylane_qiskit.qiskit_device.QiskitDevice._process_estimator_job")
@pytest.mark.parametrize("session", [None, MockSession(aer_backend)])
def test_execute_estimator_mocked(self, mocked_estimator, mocked_process_fn, session):
"""Test the _execute_estimator function using a mocked version of Estimator
@@ -885,7 +885,7 @@ def test_execute_estimator_mocked(self, mocked_estimator, mocked_process_fn, ses
def test_shot_vector_error_mocked(self):
"""Test that a device that executes a circuit with an array of shots raises the appropriate ValueError"""
- dev = QiskitDevice2(wires=5, backend=aer_backend, session=MockSession(aer_backend))
+ dev = QiskitDevice(wires=5, backend=aer_backend, session=MockSession(aer_backend))
qs = QuantumScript(
@@ -919,7 +919,7 @@ def test_estimator_with_different_pauli_obs(self, mocker, wire, angle, op, expec
correspond correctly (wire ordering convention in Qiskit and PennyLane don't match.)
- dev = QiskitDevice2(wires=5, backend=aer_backend)
+ dev = QiskitDevice(wires=5, backend=aer_backend)
sampler_execute = mocker.spy(dev, "_execute_sampler")
estimator_execute = mocker.spy(dev, "_execute_estimator")
@@ -978,7 +978,7 @@ def test_estimator_with_various_multi_qubit_pauli_obs(
pl_dev = qml.device("default.qubit", wires=[0, 1, 2, 3])
- dev = QiskitDevice2(wires=[0, 1, 2, 3], backend=aer_backend)
+ dev = QiskitDevice(wires=[0, 1, 2, 3], backend=aer_backend)
sampler_execute = mocker.spy(dev, "_execute_sampler")
estimator_execute = mocker.spy(dev, "_execute_estimator")
@@ -1002,7 +1002,7 @@ def test_estimator_with_various_multi_qubit_pauli_obs(
def test_tape_shots_used_for_estimator(self, mocker):
"""Tests that device uses tape shots rather than device shots for estimator"""
- dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2)
+ dev = QiskitDevice(wires=5, backend=aer_backend, shots=2)
estimator_execute = mocker.spy(dev, "_execute_estimator")
@@ -1068,7 +1068,7 @@ def test_process_estimator_job(self, measurements, expectation):
assert isinstance(result[0].metadata, dict)
- processed_result = QiskitDevice2._process_estimator_job(qs.measurements, result)
+ processed_result = QiskitDevice._process_estimator_job(qs.measurements, result)
assert isinstance(processed_result, tuple)
assert np.allclose(processed_result, expectation, atol=0.1)
@@ -1076,7 +1076,7 @@ def test_process_estimator_job(self, measurements, expectation):
@pytest.mark.parametrize("num_shots", [50, 100])
def test_generate_samples(self, num_wires, num_shots):
qs = QuantumScript([], measurements=[qml.expval(qml.PauliX(0))])
- dev = QiskitDevice2(wires=num_wires, backend=aer_backend, shots=num_shots)
+ dev = QiskitDevice(wires=num_wires, backend=aer_backend, shots=num_shots)
dev._execute_sampler(circuit=qs, session=Session(backend=aer_backend))
samples = dev.generate_samples(0)
@@ -1099,7 +1099,7 @@ def test_generate_samples(self, num_wires, num_shots):
def test_tape_shots_used_for_sampler(self, mocker):
"""Tests that device uses tape shots rather than device shots for sampler"""
- dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2)
+ dev = QiskitDevice(wires=5, backend=aer_backend, shots=2)
sampler_execute = mocker.spy(dev, "_execute_sampler")
@@ -1119,7 +1119,7 @@ def circuit():
def test_error_for_shot_vector(self):
"""Tests that a ValueError is raised if a shot vector is passed."""
- dev = QiskitDevice2(wires=5, backend=aer_backend, shots=2)
+ dev = QiskitDevice(wires=5, backend=aer_backend, shots=2)
def circuit():
@@ -1147,7 +1147,7 @@ def test_no_pauli_observable_gives_accurate_answer(self, mocker, observable):
provides an accurate answer for measurements with observables that don't have a pauli_rep.
- dev = QiskitDevice2(wires=5, backend=aer_backend)
+ dev = QiskitDevice(wires=5, backend=aer_backend)
pl_dev = qml.device("default.qubit", wires=5)
@@ -1178,7 +1178,7 @@ def test_warning_for_split_execution_types_when_observable_no_pauli(self):
"""Test that a warning is raised when device is passed a measurement on
an observable that does not have a pauli_rep."""
- dev = QiskitDevice2(wires=5, backend=aer_backend)
+ dev = QiskitDevice(wires=5, backend=aer_backend)
def circuit():
@@ -1198,7 +1198,7 @@ def test_qiskit_probability_output_format(self, backend):
the same as pennylane's."""
dev = qml.device("default.qubit", wires=[0, 1, 2, 3, 4])
- qiskit_dev = QiskitDevice2(wires=[0, 1, 2, 3, 4], backend=backend)
+ qiskit_dev = QiskitDevice(wires=[0, 1, 2, 3, 4], backend=backend)
def circuit():
@@ -1220,7 +1220,7 @@ def test_sampler_output_shape(self, backend):
"""Test that the shape of the results produced from the sampler for the Qiskit device
is consistent with Pennylane"""
dev = qml.device("default.qubit", wires=5, shots=1024)
- qiskit_dev = QiskitDevice2(wires=5, backend=backend)
+ qiskit_dev = QiskitDevice(wires=5, backend=backend)
def circuit(x):
@@ -1244,7 +1244,7 @@ def test_sampler_output_shape_multi_measurements(self, backend):
"""Test that the shape of the results produced from the sampler for the Qiskit device
is consistent with Pennylane for circuits with multiple measurements"""
dev = qml.device("default.qubit", wires=5, shots=10)
- qiskit_dev = QiskitDevice2(wires=5, backend=backend, shots=10)
+ qiskit_dev = QiskitDevice(wires=5, backend=backend, shots=10)
def circuit(x):
@@ -1327,7 +1327,7 @@ def test_observables_that_need_split_non_commuting(self, observable):
"""Tests that observables that have non-commuting measurements are
processed correctly when executed by the Estimator or, in the case of
qml.Hadamard, executed by the Sampler via expval() or var"""
- qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=30000)
+ qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=30000)
def qiskit_circuit():
@@ -1362,7 +1362,7 @@ def circuit():
def test_observables_that_need_split_non_commuting_counts(self, observable):
"""Tests that observables that have non-commuting measurents are processed
correctly when executed by the Sampler via counts()"""
- qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=4000)
+ qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=4000)
def qiskit_circuit():
@@ -1424,7 +1424,7 @@ def circuit():
def test_observables_that_need_split_non_commuting_samples(self, observable):
"""Tests that observables that have non-commuting measurents are processed
correctly when executed by the Sampler via sample()"""
- qiskit_dev = QiskitDevice2(wires=3, backend=aer_backend, shots=20000)
+ qiskit_dev = QiskitDevice(wires=3, backend=aer_backend, shots=20000)
def qiskit_circuit():
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 39d7f93f..1e1d3db5 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -23,42 +23,29 @@
import pennylane as qml
from pennylane.numpy import tensor
-from semantic_version import Version
import pytest
import qiskit
import qiskit_aer
from qiskit.providers import QiskitBackendNotFoundError
-from pennylane_qiskit.qiskit_device import QiskitDevice
+from qiskit.providers.basic_provider import BasicProvider
+from pennylane_qiskit.qiskit_device_legacy import QiskitDeviceLegacy
# pylint: disable=protected-access, unused-argument, ungrouped-imports, too-many-arguments, too-few-public-methods
-if Version(qiskit.__version__) < Version("1.0.0"):
- pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicaer", qiskit.BasicAer)]
- def check_provider_backend_compatibility(pldevice, backend_name):
- """check compatibility of provided backend"""
- dev_name, _ = pldevice
- if (dev_name == "qiskit.aer" and "aer" not in backend_name) or (
- dev_name == "qiskit.basicaer" and "aer" in backend_name
- ):
- return (False, "Only the AerSimulator is supported on AerDevice")
- return True, None
- from qiskit.providers.basic_provider import BasicProvider
+pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicsim", BasicProvider())]
- pldevices = [("qiskit.aer", qiskit_aer.Aer), ("qiskit.basicsim", BasicProvider())]
- def check_provider_backend_compatibility(pldevice, backend_name):
- """check compatibility of provided backend"""
- dev_name, _ = pldevice
- if dev_name == "qiskit.aer" and backend_name == "basic_simulator":
- return (False, "basic_simulator is not supported on the AerDevice")
+def check_provider_backend_compatibility(pldevice, backend_name):
+ """Check the compatibility of provided backend"""
+ dev_name, _ = pldevice
+ if dev_name == "qiskit.aer" and backend_name == "basic_simulator":
+ return (False, "basic_simulator is not supported on the AerDevice")
- if dev_name == "qiskit.basicsim" and backend_name != "basic_simulator":
- return (False, "Only the basic_simulator backend works with the BasicSimulatorDevice")
- return True, None
+ if dev_name == "qiskit.basicsim" and backend_name != "basic_simulator":
+ return (False, "Only the basic_simulator backend works with the BasicSimulatorDevice")
+ return True, None
class TestDeviceIntegration:
@@ -78,7 +65,6 @@ def test_load_device(self, d, backend):
assert dev.shots == 1024
assert dev.short_name == d[0]
assert dev.provider == d[1]
- assert dev.capabilities()["returns_state"] == (backend in state_backends)
@pytest.mark.parametrize("d", pldevices)
def test_load_remote_device_with_backend_instance(self, d, backend):
@@ -90,32 +76,18 @@ def test_load_remote_device_with_backend_instance(self, d, backend):
except QiskitBackendNotFoundError:
pytest.skip("Backend is not compatible with specified device")
- dev = qml.device("qiskit.remote", wires=2, backend=backend_instance, shots=1024)
- assert dev.num_wires == 2
- assert dev.shots == 1024
- assert dev.short_name == "qiskit.remote"
- assert dev.provider is None
- assert dev.capabilities()["returns_state"] == (backend in state_backends)
- @pytest.mark.parametrize("d", pldevices)
- def test_load_remote_device_by_name(self, d, backend):
- """Test that the qiskit.remote device loads correctly when passed a provider and a backend
- name. This test is equivalent to `test_load_device` but on the qiskit.remote device instead
- of specialized devices that expose more configuration options."""
- # check compatibility between provider and backend, and skip if incompatible
- is_compatible, failure_msg = check_provider_backend_compatibility(d, backend)
- if not is_compatible:
- pytest.skip(failure_msg)
- _, provider = d
+ if backend_instance.configuration().n_qubits is None:
+ pytest.skip("No qubits?")
- dev = qml.device("qiskit.remote", wires=2, provider=provider, backend=backend, shots=1024)
- assert dev.num_wires == 2
- assert dev.shots == 1024
+ dev = qml.device(
+ "qiskit.remote",
+ wires=backend_instance.configuration().n_qubits,
+ backend=backend_instance,
+ shots=1024,
+ )
+ assert dev.num_wires == backend_instance.configuration().n_qubits
+ assert dev.shots.total_shots == 1024
assert dev.short_name == "qiskit.remote"
- assert dev.provider == provider
- assert dev.capabilities()["returns_state"] == (backend in state_backends)
def test_incorrect_backend(self):
"""Test that exception is raised if name is incorrect"""
@@ -129,12 +101,6 @@ def test_incorrect_backend_wires(self):
qml.device("qiskit.aer", wires=100, method="statevector")
- def test_remote_device_no_provider(self):
- """Test that the qiskit.remote device raises a ValueError if passed a backend
- by name but no provider to look up the name on."""
- with pytest.raises(ValueError, match=r"Must pass a provider"):
- qml.device("qiskit.remote", wires=2, backend="aer_simulator_statevector")
def test_args(self):
"""Test that the device requires correct arguments"""
with pytest.raises(TypeError, match="missing 1 required positional argument"):
@@ -591,7 +557,7 @@ def test_one_qubit_circuit_batch_params(self, shots, d, backend, tol, mocker):
b = np.linspace(0, 0.123, batch_dim)
c = np.linspace(0, 0.987, batch_dim)
- spy1 = mocker.spy(QiskitDevice, "batch_execute")
+ spy1 = mocker.spy(QiskitDeviceLegacy, "batch_execute")
spy2 = mocker.spy(dev.backend, "run")
@partial(qml.batch_params, all_operations=True)
@@ -605,7 +571,7 @@ def circuit(x, y, z):
assert np.allclose(circuit(a, b, c), np.cos(a) * np.sin(b), **tol)
- # Check that QiskitDevice.batch_execute was called
+ # Check that QiskitDeviceLegacy.batch_execute was called
assert spy1.call_count == 1
assert spy2.call_count == 1
@@ -625,7 +591,7 @@ def test_batch_execute_parameter_shift(self, shots, d, backend, tol, mocker):
dev = qml.device(d[0], wires=3, backend=backend, shots=shots)
- spy1 = mocker.spy(QiskitDevice, "batch_execute")
+ spy1 = mocker.spy(QiskitDeviceLegacy, "batch_execute")
spy2 = mocker.spy(dev.backend, "run")
@qml.qnode(dev, diff_method="parameter-shift")
@@ -642,7 +608,7 @@ def circuit(x, y):
expected = np.array([[-np.sin(y) * np.sin(x), np.cos(y) * np.cos(x)]])
assert np.allclose(res, expected, **tol)
- # Check that QiskitDevice.batch_execute was called twice
+ # Check that QiskitDeviceLegacy.batch_execute was called twice
assert spy1.call_count == 2
# Check that run was called twice: for the partial derivatives and for
diff --git a/tests/test_qiskit_device.py b/tests/test_qiskit_device.py
index 7eabcf9c..8674301d 100644
--- a/tests/test_qiskit_device.py
+++ b/tests/test_qiskit_device.py
@@ -25,7 +25,7 @@
import pennylane as qml
from pennylane_qiskit import AerDevice
-from pennylane_qiskit.qiskit_device import QiskitDevice
+from pennylane_qiskit.qiskit_device_legacy import QiskitDeviceLegacy
# pylint: disable=protected-access, unused-argument, too-few-public-methods
@@ -109,7 +109,7 @@ class TestSupportForV1andV2:
def test_v1_and_v2_mocked(self, dev_backend):
"""Test that device initializes with no error mocked"""
- dev = qml.device("qiskit.remote", wires=10, backend=dev_backend, use_primitives=True)
+ dev = qml.device("qiskit.aer", wires=10, backend=dev_backend)
assert dev._backend == dev_backend
@@ -121,7 +121,7 @@ def test_v1_and_v2_mocked(self, dev_backend):
def test_v1_and_v2_manila(self, dev_backend):
"""Test that device initializes with no error with V1 and V2 backends by Qiskit"""
- dev = qml.device("qiskit.remote", wires=5, backend=dev_backend, use_primitives=True)
+ dev = qml.device("qiskit.aer", wires=5, backend=dev_backend)
def circuit(x):
@@ -237,12 +237,12 @@ def test_calls_to_execute(self, device, n_tapes, mocker):
called and not the general execute method."""
dev = device(2)
- spy = mocker.spy(QiskitDevice, "execute")
+ spy = mocker.spy(QiskitDeviceLegacy, "execute")
tapes = [self.tape1] * n_tapes
- # Check that QiskitDevice.execute was not called
+ # Check that QiskitDeviceLegacyLegacy.execute was not called
assert spy.call_count == 0
@pytest.mark.parametrize("n_tapes", [1, 2, 3])
@@ -251,7 +251,7 @@ def test_calls_to_reset(self, n_tapes, mocker, device):
dev = device(2)
- spy = mocker.spy(QiskitDevice, "reset")
+ spy = mocker.spy(QiskitDeviceLegacy, "reset")
tapes = [self.tape1] * n_tapes