diff --git a/CHANGELOG.md b/CHANGELOG.md index 71e3599..a1d30cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +* Support direct-access mode on AQT devices (#164) + ## qiskit-aqt-provider v1.5.0 * Docs: add examples on setting run options in primitives (#156) diff --git a/docs/apidoc/job.rst b/docs/apidoc/job.rst index e7ebb08..c020628 100644 --- a/docs/apidoc/job.rst +++ b/docs/apidoc/job.rst @@ -6,6 +6,11 @@ AQTJob .. autoclass:: qiskit_aqt_provider.aqt_job.AQTJob :members: + :exclude-members: __init__ + +.. autoclass:: qiskit_aqt_provider.aqt_job.AQTDirectAccessJob + :members: + :exclude-members: __init__, submit .. autoclass:: qiskit_aqt_provider.aqt_job.Progress :members: diff --git a/docs/apidoc/resource.rst b/docs/apidoc/resource.rst index aba638f..d700c36 100644 --- a/docs/apidoc/resource.rst +++ b/docs/apidoc/resource.rst @@ -7,24 +7,23 @@ AQTResource .. autoclass:: qiskit_aqt_provider.aqt_resource.AQTResource :members: :show-inheritance: + :exclude-members: submit, result, __init__ -.. autoclass:: qiskit_aqt_provider.aqt_resource.OfflineSimulatorResource +.. autoclass:: qiskit_aqt_provider.aqt_resource.AQTDirectAccessResource :members: :show-inheritance: + :exclude-members: submit, result, __init__ -.. autopydantic_model:: qiskit_aqt_provider.api_models.ResourceId - :exclude-members: model_computed_fields - :model-show-json: False - :model-show-validator-members: False - :model-show-validator-summary: False - :model-show-field-summary: False - :member-order: bysource +.. autoclass:: qiskit_aqt_provider.aqt_resource.OfflineSimulatorResource + :members: :show-inheritance: + :exclude-members: submit, result, __init__ .. autoclass:: qiskit_aqt_provider.aqt_resource.UnknownOptionWarning :exclude-members: __init__, __new__ :show-inheritance: -.. autoclass:: qiskit_aqt_provider.api_models.UnknownJobError +.. autoclass:: qiskit_aqt_provider.aqt_resource._ResourceBase :show-inheritance: - :exclude-members: __init__, __new__ + :exclude-members: __init__, __new__, get_scheduling_stage_plugin, get_translation_stage_plugin + :members: diff --git a/docs/index.rst b/docs/index.rst index cfa163b..08aaba8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -66,10 +66,10 @@ For more details see the :ref:`user guide `, a selection of `example :caption: Reference :hidden: - AQTProvider - AQTResource - AQTJob - AQTOptions + Provider + Backends + Job handles + Options Qiskit primitives Transpiler plugin diff --git a/qiskit_aqt_provider/api_models.py b/qiskit_aqt_provider/api_models.py index 0cd5cea..712dece 100644 --- a/qiskit_aqt_provider/api_models.py +++ b/qiskit_aqt_provider/api_models.py @@ -58,7 +58,7 @@ def http_client(*, base_url: str, token: str) -> httpx.Client: token: access token for the remote service. """ headers = {"Authorization": f"Bearer {token}", "User-Agent": USER_AGENT} - return httpx.Client(headers=headers, base_url=base_url, timeout=10.0) + return httpx.Client(headers=headers, base_url=base_url, timeout=10.0, follow_redirects=True) class Workspaces(pdt.RootModel[list[api_models.Workspace]]): diff --git a/qiskit_aqt_provider/api_models_direct.py b/qiskit_aqt_provider/api_models_direct.py new file mode 100644 index 0000000..c9c0a60 --- /dev/null +++ b/qiskit_aqt_provider/api_models_direct.py @@ -0,0 +1,58 @@ +# This code is part of Qiskit. +# +# (C) Copyright Alpine Quantum Technologies GmbH 2024 +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""API models specific to the direct access API.""" + +import uuid +from typing import Annotated, Literal, Union + +import pydantic as pdt +from typing_extensions import Self + + +class JobResultError(pdt.BaseModel): + """Failed job result payload.""" + + status: Literal["error"] = "error" + + +class JobResultFinished(pdt.BaseModel): + """Successful job result payload.""" + + status: Literal["finished"] = "finished" + result: list[list[Annotated[int, pdt.Field(le=1, ge=0)]]] + + +class JobResult(pdt.BaseModel): + """Result model on the direct access API.""" + + job_id: uuid.UUID + payload: Union[JobResultFinished, JobResultError] = pdt.Field(discriminator="status") + + @classmethod + def create_error(cls, *, job_id: uuid.UUID) -> Self: + """Create an error result (for tests). + + Args: + job_id: job identifier. + """ + return cls(job_id=job_id, payload=JobResultError()) + + @classmethod + def create_finished(cls, *, job_id: uuid.UUID, result: list[list[int]]) -> Self: + """Create a success result (for tests). + + Args: + job_id: job identifier. + result: mock measured samples. + """ + return cls(job_id=job_id, payload=JobResultFinished(result=result)) diff --git a/qiskit_aqt_provider/aqt_job.py b/qiskit_aqt_provider/aqt_job.py index 09ea527..91c22af 100644 --- a/qiskit_aqt_provider/aqt_job.py +++ b/qiskit_aqt_provider/aqt_job.py @@ -33,11 +33,12 @@ from typing_extensions import Self, TypeAlias, assert_never from qiskit_aqt_provider import api_models_generated, persistence +from qiskit_aqt_provider.api_models_direct import JobResultError from qiskit_aqt_provider.aqt_options import AQTOptions from qiskit_aqt_provider.circuit_to_aqt import circuits_to_aqt_job if TYPE_CHECKING: # pragma: no cover - from qiskit_aqt_provider.aqt_resource import AQTResource + from qiskit_aqt_provider.aqt_resource import AQTDirectAccessResource, AQTResource # Tags for the status of AQT API jobs @@ -114,7 +115,7 @@ def __exit__(*args) -> None: ... class AQTJob(JobV1): - """Handle for quantum circuits jobs running on AQT backends. + """Handle for quantum circuits jobs running on AQT cloud backends. Jobs contain one or more quantum circuits that are executed with a common set of options (see :class:`AQTOptions `). @@ -365,30 +366,10 @@ def callback( if isinstance(self.status_payload, JobFinished): for circuit_index, circuit in enumerate(self.circuits): samples = self.status_payload.results[circuit_index] - meas_map = _build_memory_mapping(circuit) - data: dict[str, Any] = { - "counts": _format_counts(samples, meas_map), - } - - if self.options.memory: - data["memory"] = [ - "".join(str(x) for x in reversed(states)) for states in samples - ] - results.append( - { - "shots": self.options.shots, - "success": True, - "status": JobStatus.DONE, - "data": data, - "header": { - "memory_slots": circuit.num_clbits, - "creg_sizes": [[reg.name, reg.size] for reg in circuit.cregs], - "qreg_sizes": [[reg.name, reg.size] for reg in circuit.qregs], - "name": circuit.name, - "metadata": circuit.metadata or {}, - }, - } + _partial_qiskit_result_dict( + samples, circuit, shots=self.options.shots, memory=self.options.memory + ) ) return Result.from_dict( @@ -405,6 +386,135 @@ def callback( ) +class AQTDirectAccessJob(JobV1): + """Handle for quantum circuits jobs running on direct-access AQT backends. + + Use :meth:`AQTDirectAccessResource.run ` + to get a handle and evaluate circuits on a direct-access backend. + """ + + _backend: "AQTDirectAccessResource" + + def __init__( + self, + backend: "AQTDirectAccessResource", + circuits: list[QuantumCircuit], + options: AQTOptions, + ): + """Initialize the :class:`AQTDirectAccessJob` instance. + + Args: + backend: backend to run the job on. + circuits: list of circuits to execute. + options: overridden resource options for this job. + """ + super().__init__(backend, "") + + self.circuits = circuits + self.options = options + self.api_submit_payload = circuits_to_aqt_job(circuits, options.shots) + + self._job_id = uuid.uuid4() + self._status = JobStatus.INITIALIZING + + def submit(self) -> None: + """No-op on direct-access backends.""" + + def result(self) -> Result: + """Iteratively submit all circuits and block until full completion. + + If an error occurs, the remaining circuits are not executed and the whole + job is marked as failed. + + Returns: + The combined result of all circuit evaluations. + """ + if self.options.with_progress_bar: + context: Union[tqdm[NoReturn], _MockProgressBar] = tqdm(total=len(self.circuits)) + else: + context = _MockProgressBar(total=len(self.circuits)) + + result = { + "backend_name": self._backend.name, + "backend_version": self._backend.version, + "qobj_id": id(self.circuits), + "job_id": self.job_id(), + "success": True, + "results": [], + } + + with context as progress_bar: + for circuit_index, circuit in enumerate(self.circuits): + api_circuit = self.api_submit_payload.payload.circuits[circuit_index] + job_id = self._backend.submit(api_circuit) + api_result = self._backend.result(job_id) + + if isinstance(api_result.payload, JobResultError): + break + + result["results"].append( + _partial_qiskit_result_dict( + api_result.payload.result, + circuit, + shots=self.options.shots, + memory=self.options.memory, + ) + ) + + progress_bar.update(1) + else: # no circuits in the job, or all executed successfully + self._status = JobStatus.DONE + return Result.from_dict(result) + + self._status = JobStatus.ERROR + result["success"] = False + return Result.from_dict(result) + + def status(self) -> JobStatus: + """Query the job's status. + + Returns: + Aggregated job status for all the circuits in this job. + """ + return self._status + + +def _partial_qiskit_result_dict( + samples: list[list[int]], circuit: QuantumCircuit, *, shots: int, memory: bool +) -> dict[str, Any]: + """Build the Qiskit result dict for a single circuit evaluation. + + Args: + samples: measurement outcome of the circuit evaluation. + circuit: the evaluated circuit. + shots: number of repetitions of the circuit evaluation. + memory: whether to fill the classical memory dump field with the measurement results. + + Returns: + Dict, suitable for Qiskit's `Result.from_dict` factory. + """ + meas_map = _build_memory_mapping(circuit) + + data: dict[str, Any] = {"counts": _format_counts(samples, meas_map)} + + if memory: + data["memory"] = ["".join(str(x) for x in reversed(states)) for states in samples] + + return { + "shots": shots, + "success": True, + "status": JobStatus.DONE, + "data": data, + "header": { + "memory_slots": circuit.num_clbits, + "creg_sizes": [[reg.name, reg.size] for reg in circuit.cregs], + "qreg_sizes": [[reg.name, reg.size] for reg in circuit.qregs], + "name": circuit.name, + "metadata": circuit.metadata or {}, + }, + } + + def _build_memory_mapping(circuit: QuantumCircuit) -> dict[int, set[int]]: """Scan the circuit for measurement instructions and collect qubit to classical bits mappings. diff --git a/qiskit_aqt_provider/aqt_options.py b/qiskit_aqt_provider/aqt_options.py index c66c794..b74fd47 100644 --- a/qiskit_aqt_provider/aqt_options.py +++ b/qiskit_aqt_provider/aqt_options.py @@ -24,7 +24,7 @@ class AQTOptions(pdt.BaseModel, Mapping[str, Any]): Options can be set on a backend globally or on a per-job basis. To update an option globally, set the corresponding attribute in the backend's - :attr:`options ` attribute: + :attr:`options ` attribute: >>> import qiskit >>> from qiskit_aqt_provider import AQTProvider @@ -42,7 +42,8 @@ class AQTOptions(pdt.BaseModel, Mapping[str, Any]): 50 Option overrides can also be applied on a per-job basis, as keyword arguments to - :meth:`AQTResource.run `: + :meth:`AQTResource.run ` or + :meth:`AQTDirectAccessResource.run `: >>> backend.options.shots 50 diff --git a/qiskit_aqt_provider/aqt_provider.py b/qiskit_aqt_provider/aqt_provider.py index 2a04a50..2e6169a 100644 --- a/qiskit_aqt_provider/aqt_provider.py +++ b/qiskit_aqt_provider/aqt_provider.py @@ -36,8 +36,11 @@ from typing_extensions import TypeAlias, override from qiskit_aqt_provider import api_models - -from .aqt_resource import AQTResource, OfflineSimulatorResource +from qiskit_aqt_provider.aqt_resource import ( + AQTDirectAccessResource, + AQTResource, + OfflineSimulatorResource, +) StrPath: TypeAlias = Union[str, Path] @@ -67,7 +70,7 @@ class OfflineSimulator: class BackendsTable(Sequence[AQTResource]): - """Pretty-printable collection of AQT backends. + """Pretty-printable collection of AQT cloud backends. The :meth:`__str__` method returns a plain text table representation of the available backends. The :meth:`_repr_html_` method returns an HTML representation that is automatically used @@ -207,7 +210,7 @@ def backends( backend_type: Optional[Literal["device", "simulator", "offline_simulator"]] = None, workspace: Optional[Union[str, Pattern[str]]] = None, ) -> BackendsTable: - """Search for backends matching given criteria. + """Search for cloud backends matching given criteria. With no arguments, return all backends accessible with the configured access token. @@ -290,7 +293,7 @@ def get_backend( backend_type: Optional[Literal["device", "simulator", "offline_simulator"]] = None, workspace: Optional[Union[str, Pattern[str]]] = None, ) -> AQTResource: - """Return a single backend matching the specified filtering. + """Return a handle for a cloud quantum computing resource matching the specified filtering. Args: name: filter for the backend name. @@ -314,3 +317,11 @@ def get_backend( raise QiskitBackendNotFoundError("No backend matches the criteria") return backends[0] + + def get_direct_access_backend(self, base_url: str, /) -> AQTDirectAccessResource: + """Return a handle for a direct-access quantum computing resource. + + Args: + base_url: URL of the direct-access interface. + """ + return AQTDirectAccessResource(self, base_url) diff --git a/qiskit_aqt_provider/aqt_resource.py b/qiskit_aqt_provider/aqt_resource.py index 26fba11..b870263 100644 --- a/qiskit_aqt_provider/aqt_resource.py +++ b/qiskit_aqt_provider/aqt_resource.py @@ -21,6 +21,7 @@ ) from uuid import UUID +import httpx from qiskit import QuantumCircuit from qiskit.circuit.library import RXGate, RXXGate, RZGate from qiskit.circuit.measure import Measure @@ -32,8 +33,8 @@ from qiskit_aer import AerJob, AerSimulator, noise from typing_extensions import override -from qiskit_aqt_provider import api_models -from qiskit_aqt_provider.aqt_job import AQTJob +from qiskit_aqt_provider import api_models, api_models_direct +from qiskit_aqt_provider.aqt_job import AQTDirectAccessJob, AQTJob from qiskit_aqt_provider.aqt_options import AQTOptions from qiskit_aqt_provider.circuit_to_aqt import aqt_to_qiskit_circuit @@ -73,30 +74,25 @@ def make_transpiler_target(target_cls: type[TargetT], num_qubits: int) -> Target return target -class AQTResource(Backend): - """Qiskit backend for AQT quantum computing resources.""" +_JobType = TypeVar("_JobType", AQTJob, AQTDirectAccessJob) - def __init__( - self, - provider: "AQTProvider", - resource_id: api_models.ResourceId, - ): - """Initialize the backend. + +class _ResourceBase(Backend): + """Common setup for AQT backends.""" + + def __init__(self, provider: "AQTProvider", name: str): + """Initialize the Qiskit backend. Args: provider: Qiskit provider that owns this backend. - resource_id: description of resource to target. + name: name of the backend. """ - super().__init__(name=resource_id.resource_id, provider=provider) - - self.resource_id = resource_id - - self._http_client = provider._http_client + super().__init__(name=name, provider=provider) num_qubits = 20 self._configuration = BackendConfiguration.from_dict( { - "backend_name": resource_id.resource_name, + "backend_name": name, "backend_version": 2, "url": provider.portal_url, "simulator": True, @@ -118,9 +114,118 @@ def __init__( } ) self._target = make_transpiler_target(Target, num_qubits) - self._options = AQTOptions() + def configuration(self) -> BackendConfiguration: + """Legacy Qiskit backend configuration.""" + return self._configuration + + @property + def max_circuits(self) -> int: + """Maximum number of circuits per batch.""" + return 2000 + + @property + def target(self) -> Target: + """Transpilation target for this backend.""" + return self._target + + @classmethod + def _default_options(cls) -> QiskitOptions: + """Default backend options, in Qiskit format.""" + return QiskitOptions() + + @property + def options(self) -> AQTOptions: + """Configured backend options.""" + return self._options + + def get_scheduling_stage_plugin(self) -> str: + """Name of the custom scheduling stage plugin for the Qiskit transpiler.""" + return "aqt" + + def get_translation_stage_plugin(self) -> str: + """Name of the custom translation stage plugin for the Qiskit transpiler.""" + return "aqt" + + def _create_job( + self, + job_type: type[_JobType], + circuits: Union[QuantumCircuit, list[QuantumCircuit]], + **options: Any, + ) -> _JobType: + """Initialize a job handle of a given type. + + Helper function for the ``run()`` method implementations. + + Args: + job_type: type of the job handle to initialize. + circuits: circuits to execute when the job is submitted. + options: backend options overrides. + """ + if not isinstance(circuits, list): + circuits = [circuits] + + valid_options = {key: value for key, value in options.items() if key in self.options} + unknown_options = set(options) - set(valid_options) + + if unknown_options: + for unknown_option in unknown_options: + warnings.warn( + f"Option {unknown_option} is not used by this backend", + UnknownOptionWarning, + stacklevel=2, + ) + + options_copy = self.options.model_copy() + options_copy.update_options(**valid_options) + + return job_type( + self, + circuits, + options_copy, + ) + + +class AQTResource(_ResourceBase): + """Qiskit backend for AQT cloud quantum computing resources. + + Use :meth:`AQTProvider.get_backend ` + to retrieve backend instances. + """ + + def __init__( + self, + provider: "AQTProvider", + resource_id: api_models.ResourceId, + ): + """Initialize the backend. + + Args: + provider: Qiskit provider that owns this backend. + resource_id: description of resource to target. + """ + super().__init__(name=resource_id.resource_id, provider=provider) + + self._http_client: httpx.Client = provider._http_client + self.resource_id = resource_id + + def run(self, circuits: Union[QuantumCircuit, list[QuantumCircuit]], **options: Any) -> AQTJob: + """Submit circuits for execution on this resource. + + Args: + circuits: circuits to execute + options: overrides for this resource's options. Elements should be valid fields + of the :class:`AQTOptions ` model. + Unknown fields are ignored with a :class:`UnknownOptionWarning`. + + Returns: + A handle to the submitted job. + """ + job = self._create_job(AQTJob, circuits, **options) + job.submit() + return job + def submit(self, job: AQTJob) -> UUID: """Submit a quantum circuits job to the AQT resource. @@ -160,48 +265,37 @@ def result(self, job_id: UUID) -> api_models.JobResponse: resp.raise_for_status() return api_models.Response.model_validate(resp.json()) - def configuration(self) -> BackendConfiguration: - """Legacy Qiskit backend configuration.""" - warnings.warn( - "The configuration() method is deprecated and will be removed in a " - "future release. Instead you should access these attributes directly " - "off the object or via the .target attribute. You can refer to qiskit " - "backend interface transition guide for the exact changes: " - "https://docs.quantum.ibm.com/api/qiskit/providers#migrating-between-backend-api-versions", - DeprecationWarning, - ) - return self._configuration - @property - def max_circuits(self) -> int: - """Maximum number of circuits per batch.""" - return 2000 +class AQTDirectAccessResource(_ResourceBase): + """Qiskit backend for AQT direct-access quantum computing resources. - @property - def target(self) -> Target: - """Transpilation target for this backend.""" - return self._target + Use :meth:`AQTProvider.get_direct_access_backend ` + to retrieve backend instances. + """ - @classmethod - def _default_options(cls) -> QiskitOptions: - """Default backend options, in Qiskit format.""" - return QiskitOptions() + def __init__( + self, + provider: "AQTProvider", + base_url: str, + ) -> None: + """Initialize the backend. - @property - def options(self) -> AQTOptions: - """Configured backend options.""" - return self._options + Args: + provider: Qiskit provider that owns the backend. + base_url: URL of the direct-access interface. + """ + self._http_client = api_models.http_client(base_url=base_url, token=provider.access_token) - def get_scheduling_stage_plugin(self) -> str: - """Name of the custom scheduling stage plugin for the Qiskit transpiler.""" - return "aqt" + super().__init__(provider=provider, name="direct-access") - def get_translation_stage_plugin(self) -> str: - """Name of the custom translation stage plugin for the Qiskit transpiler.""" - return "aqt" + def run( + self, circuits: Union[QuantumCircuit, list[QuantumCircuit]], **options: Any + ) -> AQTDirectAccessJob: + """Prepare circuits for execution on this resource. - def run(self, circuits: Union[QuantumCircuit, list[QuantumCircuit]], **options: Any) -> AQTJob: - """Submit circuits for execution on this resource. + .. warning:: The circuits are only evaluated during + the :meth:`AQTDirectAccessJob.result ` + call. Args: circuits: circuits to execute @@ -210,32 +304,37 @@ def run(self, circuits: Union[QuantumCircuit, list[QuantumCircuit]], **options: Unknown fields are ignored with a :class:`UnknownOptionWarning`. Returns: - A handle to the submitted job. + A handle to the prepared job. """ - if not isinstance(circuits, list): - circuits = [circuits] + return self._create_job(AQTDirectAccessJob, circuits, **options) - valid_options = {key: value for key, value in options.items() if key in self.options} - unknown_options = set(options) - set(valid_options) + def submit(self, circuit: api_models.QuantumCircuit) -> UUID: + """Submit a quantum circuit job to the AQT resource. - if unknown_options: - for unknown_option in unknown_options: - warnings.warn( - f"Option {unknown_option} is not used by this backend", - UnknownOptionWarning, - stacklevel=2, - ) + Args: + circuit: circuit to evaluate, in API format. - options_copy = self.options.model_copy() - options_copy.update_options(**valid_options) + Returns: + The unique identifier of the submitted job. + """ + resp = self._http_client.put("/circuit", json=circuit.model_dump()) + resp.raise_for_status() + return UUID(resp.json()) - job = AQTJob( - self, - circuits, - options_copy, - ) - job.submit() - return job + def result(self, job_id: UUID) -> api_models_direct.JobResult: + """Query the result of a specific job. + + Block until a result (success or error) is available. + + Args: + job_id: unique identifier of the target job. + + Returns: + Job result, as API payload. + """ + resp = self._http_client.get(f"/circuit/result/{job_id}") + resp.raise_for_status() + return api_models_direct.JobResult.model_validate(resp.json()) def qubit_states_from_int(state: int, num_qubits: int) -> list[int]: diff --git a/qiskit_aqt_provider/test/fixtures.py b/qiskit_aqt_provider/test/fixtures.py index c6dbfeb..aac1a2d 100644 --- a/qiskit_aqt_provider/test/fixtures.py +++ b/qiskit_aqt_provider/test/fixtures.py @@ -15,16 +15,28 @@ This module is exposed as pytest plugin for this project. """ +import json +import re import uuid +import httpx import pytest +from pytest_httpx import HTTPXMock from qiskit.circuit import QuantumCircuit +from qiskit.providers import BackendV2 +from qiskit_aer import AerSimulator from typing_extensions import override -from qiskit_aqt_provider import api_models +from qiskit_aqt_provider import api_models, api_models_direct from qiskit_aqt_provider.aqt_job import AQTJob from qiskit_aqt_provider.aqt_provider import AQTProvider -from qiskit_aqt_provider.aqt_resource import OfflineSimulatorResource +from qiskit_aqt_provider.aqt_resource import ( + AQTDirectAccessResource, + OfflineSimulatorResource, + qubit_states_from_int, +) +from qiskit_aqt_provider.circuit_to_aqt import aqt_to_qiskit_circuit +from qiskit_aqt_provider.test.resources import DummyDirectAccessResource class MockSimulator(OfflineSimulatorResource): @@ -65,5 +77,73 @@ def submitted_circuits(self) -> list[list[QuantumCircuit]]: @pytest.fixture(name="offline_simulator_no_noise") def fixture_offline_simulator_no_noise() -> MockSimulator: - """Noiseless offline simulator resource.""" + """Noiseless offline simulator resource, as cloud backend.""" return MockSimulator(noisy=False) + + +@pytest.fixture(name="offline_simulator_no_noise_direct_access") +def fixture_offline_simulator_no_noise_direct_access( + httpx_mock: HTTPXMock, +) -> AQTDirectAccessResource: + """Noiseless offline simulator resource, as direct-access backend.""" + simulator = AerSimulator(method="statevector") + + inflight_circuits: dict[uuid.UUID, api_models.QuantumCircuit] = {} + + def handle_submit(request: httpx.Request) -> httpx.Response: + data = api_models.QuantumCircuit.model_validate_json(request.content.decode("utf-8")) + + job_id = uuid.uuid4() + inflight_circuits[job_id] = data.model_copy(deep=True) + + return httpx.Response( + status_code=httpx.codes.OK, + text=f'"{job_id}"', + ) + + def handle_result(request: httpx.Request) -> httpx.Response: + _, job_id_str = request.url.path.rsplit("/", maxsplit=1) + job_id = uuid.UUID(job_id_str) + + data = inflight_circuits[job_id] + qiskit_circuit = aqt_to_qiskit_circuit(data.quantum_circuit, data.number_of_qubits) + result = simulator.run(qiskit_circuit, shots=data.repetitions).result() + + samples: list[list[int]] = [] + for hex_state, occurrences in result.data()["counts"].items(): + samples.extend( + [ + qubit_states_from_int(int(hex_state, 16), qiskit_circuit.num_qubits) + for _ in range(occurrences) + ] + ) + + return httpx.Response( + status_code=httpx.codes.OK, + json=json.loads( + api_models_direct.JobResult.create_finished( + job_id=job_id, + result=samples, + ).model_dump_json() + ), + ) + + httpx_mock.add_callback(handle_submit, method="PUT", url=re.compile(".+/circuit/?$")) + httpx_mock.add_callback( + handle_result, method="GET", url=re.compile(".+/circuit/result/[0-9a-f-]+$") + ) + + return DummyDirectAccessResource("token") + + +@pytest.fixture( + name="any_offline_simulator_no_noise", + params=["offline_simulator_no_noise", "offline_simulator_no_noise_direct_access"], +) +def fixture_any_offline_simulator_no_noise(request: pytest.FixtureRequest) -> BackendV2: + """Noiseless, offline simulator backend. + + The fixture is parametrized to successively run the dependent tests + with a regular cloud-bound backend, and a direct-access one. + """ + return request.getfixturevalue(request.param) diff --git a/qiskit_aqt_provider/test/resources.py b/qiskit_aqt_provider/test/resources.py index be1d038..6a86ebf 100644 --- a/qiskit_aqt_provider/test/resources.py +++ b/qiskit_aqt_provider/test/resources.py @@ -25,7 +25,7 @@ from qiskit_aqt_provider import api_models from qiskit_aqt_provider.aqt_job import AQTJob from qiskit_aqt_provider.aqt_provider import AQTProvider -from qiskit_aqt_provider.aqt_resource import AQTResource +from qiskit_aqt_provider.aqt_resource import AQTDirectAccessResource, AQTResource class JobStatus(enum.Enum): @@ -229,3 +229,14 @@ def __init__(self, token: str) -> None: resource_type="simulator", ), ) + + +class DummyDirectAccessResource(AQTDirectAccessResource): + """A non-functional direct-access resource, for testing purposes.""" + + def __init__(self, token: str) -> None: + """Initialize the dummy backend.""" + super().__init__( + AQTProvider(token), + base_url="direct-access-example.aqt.eu:6020", + ) diff --git a/test/test_execution.py b/test/test_execution.py index dc76328..a008b4a 100644 --- a/test/test_execution.py +++ b/test/test_execution.py @@ -25,7 +25,7 @@ import pytest import qiskit from qiskit import ClassicalRegister, QiskitError, QuantumCircuit, QuantumRegister, quantum_info -from qiskit.providers import Backend +from qiskit.providers import BackendV2 from qiskit.providers.jobstatus import JobStatus from qiskit.transpiler import TranspilerError from qiskit_aer import AerProvider, AerSimulator @@ -39,12 +39,12 @@ @pytest.mark.parametrize("shots", [200]) -def test_empty_circuit(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_empty_circuit(shots: int, any_offline_simulator_no_noise: BackendV2) -> None: """Run an empty circuit.""" qc = QuantumCircuit(1) qc.measure_all() - job = offline_simulator_no_noise.run(qc, shots=shots) + job = any_offline_simulator_no_noise.run(qc, shots=shots) assert job.result().get_counts() == {"0": shots} @@ -102,20 +102,20 @@ def test_cancelled_circuit() -> None: @pytest.mark.parametrize("shots", [1, 100, 200]) -def test_simple_backend_run(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_simple_backend_run(shots: int, any_offline_simulator_no_noise: BackendV2) -> None: """Run a simple circuit with `backend.run`.""" qc = QuantumCircuit(1) qc.rx(pi, 0) qc.measure_all() - trans_qc = qiskit.transpile(qc, offline_simulator_no_noise) - job = offline_simulator_no_noise.run(trans_qc, shots=shots) + trans_qc = qiskit.transpile(qc, any_offline_simulator_no_noise) + job = any_offline_simulator_no_noise.run(trans_qc, shots=shots) assert job.result().get_counts() == {"1": shots} -@pytest.mark.parametrize("backend", [MockSimulator(noisy=False), MockSimulator(noisy=True)]) -def test_simple_backend_execute_noisy(backend: MockSimulator) -> None: +@pytest.mark.parametrize("resource", [MockSimulator(noisy=False), MockSimulator(noisy=True)]) +def test_simple_backend_execute_noisy(resource: MockSimulator) -> None: """Execute a simple circuit on a noisy and noiseless backend. Check that the noisy backend is indeed noisy. """ @@ -131,12 +131,12 @@ def test_simple_backend_execute_noisy(backend: MockSimulator) -> None: counts: typing.Counter[str] = Counter() for _ in range(total_shots // shots): - job = backend.run(qiskit.transpile(qc, backend=backend), shots=shots) + job = resource.run(qiskit.transpile(qc, backend=resource), shots=shots) counts += Counter(job.result().get_counts()) assert sum(counts.values()) == total_shots - if backend.with_noise_model: + if resource.with_noise_model: assert set(counts.keys()) == {"0", "1"} assert counts["0"] < 0.1 * counts["1"] # very crude else: @@ -144,7 +144,7 @@ def test_simple_backend_execute_noisy(backend: MockSimulator) -> None: @pytest.mark.parametrize("shots", [100]) -def test_ancilla_qubits_mapping(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_ancilla_qubits_mapping(shots: int, any_offline_simulator_no_noise: BackendV2) -> None: """Run a circuit with two quantum registers, with only one mapped to the classical memory.""" qr = QuantumRegister(2) qr_aux = QuantumRegister(3) @@ -156,14 +156,16 @@ def test_ancilla_qubits_mapping(shots: int, offline_simulator_no_noise: AQTResou qc.rxx(pi / 2, qr_aux[0], qr_aux[1]) qc.measure(qr, memory) - trans_qc = qiskit.transpile(qc, offline_simulator_no_noise) - job = offline_simulator_no_noise.run(trans_qc, shots=shots) + trans_qc = qiskit.transpile(qc, any_offline_simulator_no_noise) + job = any_offline_simulator_no_noise.run(trans_qc, shots=shots) # only two bits in the counts dict because memory has two bits width assert job.result().get_counts() == {"11": shots} @pytest.mark.parametrize("shots", [100]) -def test_multiple_classical_registers(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_multiple_classical_registers( + shots: int, any_offline_simulator_no_noise: BackendV2 +) -> None: """Run a circuit with the final state mapped to multiple classical registers.""" qr = QuantumRegister(5) memory_a = ClassicalRegister(2) @@ -175,8 +177,8 @@ def test_multiple_classical_registers(shots: int, offline_simulator_no_noise: AQ qc.measure(qr[:2], memory_a) qc.measure(qr[2:], memory_b) - trans_qc = qiskit.transpile(qc, offline_simulator_no_noise) - job = offline_simulator_no_noise.run(trans_qc, shots=shots) + trans_qc = qiskit.transpile(qc, any_offline_simulator_no_noise) + job = any_offline_simulator_no_noise.run(trans_qc, shots=shots) # counts are returned as "memory_b memory_a", msb first assert job.result().get_counts() == {"010 01": shots} @@ -185,7 +187,7 @@ def test_multiple_classical_registers(shots: int, offline_simulator_no_noise: AQ @pytest.mark.parametrize("shots", [123]) @pytest.mark.parametrize("memory_opt", [True, False]) def test_get_memory_simple( - shots: int, memory_opt: bool, offline_simulator_no_noise: AQTResource + shots: int, memory_opt: bool, any_offline_simulator_no_noise: BackendV2 ) -> None: """Check that the raw bitstrings can be accessed for each shot via the get_memory() method in Qiskit's Result. @@ -197,8 +199,8 @@ def test_get_memory_simple( qc.cx(0, 1) qc.measure_all() - result = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise), shots=shots, memory=memory_opt + result = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise), shots=shots, memory=memory_opt ).result() if memory_opt: @@ -212,7 +214,7 @@ def test_get_memory_simple( @pytest.mark.parametrize("shots", [123]) -def test_get_memory_ancilla_qubits(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_get_memory_ancilla_qubits(shots: int, any_offline_simulator_no_noise: BackendV2) -> None: """Check that the raw bistrings returned by get_memory() in Qiskit's Result only contain the mapped classical bits. """ @@ -226,8 +228,8 @@ def test_get_memory_ancilla_qubits(shots: int, offline_simulator_no_noise: AQTRe qc.rxx(pi / 2, qr_aux[0], qr_aux[1]) qc.measure(qr, memory) - job = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise), shots=shots, memory=True + job = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise), shots=shots, memory=True ) memory = job.result().get_memory() @@ -236,7 +238,7 @@ def test_get_memory_ancilla_qubits(shots: int, offline_simulator_no_noise: AQTRe @pytest.mark.parametrize("shots", [123]) -def test_get_memory_bit_ordering(shots: int, offline_simulator_no_noise: AQTResource) -> None: +def test_get_memory_bit_ordering(shots: int, any_offline_simulator_no_noise: BackendV2) -> None: """Check that the bitstrings returned by the results produced by AQT jobs have the same bit order as the Qiskit Aer simulators. """ @@ -248,8 +250,8 @@ def test_get_memory_bit_ordering(shots: int, offline_simulator_no_noise: AQTReso qc.measure_all() aqt_memory = ( - offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise), shots=shots, memory=True + any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise), shots=shots, memory=True ) .result() .get_memory() @@ -271,7 +273,7 @@ def test_get_memory_bit_ordering(shots: int, offline_simulator_no_noise: AQTReso pytest.param(AerProvider().get_backend("aer_simulator"), id="aer-simulator"), ], ) -def test_regression_issue_85(backend: Backend) -> None: +def test_regression_issue_85(backend: BackendV2) -> None: """Check that qubit and clbit permutations are properly handled by the offline simulators. This is a regression test for #85. Check that executing circuits with qubit/clbit @@ -302,7 +304,7 @@ def test_regression_issue_85(backend: Backend) -> None: @pytest.mark.parametrize(("shots", "qubits"), [(100, 5), (100, 8)]) -def test_bell_states(shots: int, qubits: int, offline_simulator_no_noise: AQTResource) -> None: +def test_bell_states(shots: int, qubits: int, any_offline_simulator_no_noise: BackendV2) -> None: """Create a N qubits Bell state.""" qc = QuantumCircuit(qubits) qc.h(0) @@ -310,8 +312,8 @@ def test_bell_states(shots: int, qubits: int, offline_simulator_no_noise: AQTRes qc.cx(0, qubit) qc.measure_all() - job = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise), shots=shots + job = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise), shots=shots ) counts = job.result().get_counts() @@ -332,7 +334,7 @@ def test_bell_states(shots: int, qubits: int, offline_simulator_no_noise: AQTRes def test_state_preparation( target_state: Union[int, str, quantum_info.Statevector, list[complex]], optimization_level: int, - offline_simulator_no_noise: AQTResource, + any_offline_simulator_no_noise: BackendV2, ) -> None: """Test the state preparation unitary factory. @@ -344,8 +346,8 @@ def test_state_preparation( qc.measure_all() shots = 100 - job = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise, optimization_level=optimization_level), + job = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise, optimization_level=optimization_level), shots=shots, ) counts = job.result().get_counts() @@ -355,7 +357,7 @@ def test_state_preparation( @pytest.mark.parametrize("optimization_level", range(4)) def test_state_preparation_single_qubit( - optimization_level: int, offline_simulator_no_noise: AQTResource + optimization_level: int, any_offline_simulator_no_noise: BackendV2 ) -> None: """Test the state preparation unitary factory, targeting a single qubit in the register.""" qreg = QuantumRegister(4) @@ -364,8 +366,8 @@ def test_state_preparation_single_qubit( qc.measure_all() shots = 100 - job = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise, optimization_level=optimization_level), + job = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise, optimization_level=optimization_level), shots=shots, ) counts = job.result().get_counts() @@ -394,21 +396,21 @@ def test_initialize_not_supported(offline_simulator_no_noise: AQTResource) -> No @pytest.mark.parametrize("optimization_level", range(4)) -def test_cswap(optimization_level: int, offline_simulator_no_noise: AQTResource) -> None: +def test_cswap(optimization_level: int, any_offline_simulator_no_noise: BackendV2) -> None: """Verify that CSWAP (Fredkin) gates can be transpiled and executed (in a trivial case).""" qc = QuantumCircuit(3) qc.prepare_state("101") qc.cswap(0, 1, 2) trans_qc = qiskit.transpile( - qc, offline_simulator_no_noise, optimization_level=optimization_level + qc, any_offline_simulator_no_noise, optimization_level=optimization_level ) assert_circuits_equivalent(qc, trans_qc) qc.measure_all() shots = 200 - job = offline_simulator_no_noise.run( - qiskit.transpile(qc, offline_simulator_no_noise, optimization_level=optimization_level), + job = any_offline_simulator_no_noise.run( + qiskit.transpile(qc, any_offline_simulator_no_noise, optimization_level=optimization_level), shots=shots, ) counts = job.result().get_counts() diff --git a/test/test_primitives.py b/test/test_primitives.py index 46dc935..f58469c 100644 --- a/test/test_primitives.py +++ b/test/test_primitives.py @@ -23,17 +23,29 @@ BaseSamplerV1, Sampler, ) -from qiskit.providers import Backend +from qiskit.providers import Backend, BackendV2 from qiskit.quantum_info import SparsePauliOp from qiskit.transpiler.exceptions import TranspilerError -from qiskit_aqt_provider.aqt_resource import AQTResource from qiskit_aqt_provider.primitives import AQTSampler from qiskit_aqt_provider.primitives.estimator import AQTEstimator from qiskit_aqt_provider.test.circuits import assert_circuits_equal from qiskit_aqt_provider.test.fixtures import MockSimulator +@pytest.fixture(scope="module") +def assert_all_responses_were_requested() -> bool: + """Disable pytest-httpx check that all mocked responses are used for this module. + + Some tests in this module request the offline_simulator_no_noise_direct_access + fixture without using it, thus not calling the mocked HTTP responses it contains. + + # TODO: use alternative HTTPXMock setup when available. + # See: https://github.com/Colin-b/pytest_httpx/issues/137 + """ + return False + + def test_backend_primitives_are_v1() -> None: """Check that `BackendSampler` and `BackendEstimator` have primitives V1 interfaces. @@ -66,8 +78,7 @@ def test_backend_primitives_are_v1() -> None: ], ) def test_circuit_sampling_primitive( - get_sampler: Callable[[Backend], BaseSamplerV1], - offline_simulator_no_noise: AQTResource, + get_sampler: Callable[[Backend], BaseSamplerV1], any_offline_simulator_no_noise: BackendV2 ) -> None: """Check that a `Sampler` primitive using an AQT backend can sample parametric circuits.""" theta = Parameter("θ") @@ -81,7 +92,7 @@ def test_circuit_sampling_primitive( assert qc.num_parameters > 0 - sampler = get_sampler(offline_simulator_no_noise) + sampler = get_sampler(any_offline_simulator_no_noise) sampled = sampler.run(qc, [pi]).result().quasi_dists assert sampled == [{3: 1.0}] diff --git a/test/test_resource.py b/test/test_resource.py index 75f391a..1bcfef2 100644 --- a/test/test_resource.py +++ b/test/test_resource.py @@ -10,8 +10,10 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. +import itertools import json import math +import re import uuid from contextlib import AbstractContextManager, nullcontext from typing import Any @@ -25,16 +27,21 @@ from polyfactory.factories.pydantic_factory import ModelFactory from pytest_httpx import HTTPXMock from qiskit import QuantumCircuit +from qiskit.providers import JobStatus from qiskit.providers.exceptions import JobTimeoutError -from qiskit_aqt_provider import api_models +from qiskit_aqt_provider import api_models, api_models_direct from qiskit_aqt_provider.aqt_job import AQTJob from qiskit_aqt_provider.aqt_options import AQTOptions from qiskit_aqt_provider.aqt_resource import AQTResource from qiskit_aqt_provider.circuit_to_aqt import circuits_to_aqt_job from qiskit_aqt_provider.test.circuits import assert_circuits_equal, empty_circuit, random_circuit from qiskit_aqt_provider.test.fixtures import MockSimulator -from qiskit_aqt_provider.test.resources import DummyResource, TestResource +from qiskit_aqt_provider.test.resources import ( + DummyDirectAccessResource, + DummyResource, + TestResource, +) from qiskit_aqt_provider.test.utils import get_field_constraint from qiskit_aqt_provider.versions import USER_AGENT @@ -431,3 +438,190 @@ def test_offline_simulator_resource_propagate_memory_option( result = offline_simulator_no_noise.run(qc).result() with context: assert len(result.get_memory()) == default_shots + + +def test_direct_access_bad_request(httpx_mock: HTTPXMock) -> None: + """Check that direct-access resources raise a httpx.HTTPError if the request is flagged bad by the server.""" + backend = DummyDirectAccessResource("token") + httpx_mock.add_response(status_code=httpx.codes.BAD_REQUEST) + + job = backend.run(empty_circuit(2)) + with pytest.raises(httpx.HTTPError): + job.result() + + +@pytest.mark.parametrize("success", [False, True]) +def test_direct_access_job_status(success: bool, httpx_mock: HTTPXMock) -> None: + """Check the expected Qiskit job status on direct-access resources. + + Since the transactions are synchronous, there are only three possible statuses: + 1. initializing: the job was created but is not executing + 2. done: the job executed successfully + 3. error: the job execution failed. + """ + shots = 100 + + def handle_submit(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + return httpx.Response(status_code=httpx.codes.OK, text=f'"{uuid.uuid4()}"') + + def handle_result(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + _, job_id = request.url.path.rsplit("/", maxsplit=1) + + return httpx.Response( + status_code=httpx.codes.OK, + json=json.loads( + api_models_direct.JobResult.create_finished( + job_id=uuid.UUID(job_id), result=[[0] for _ in range(shots)] + ).model_dump_json() + if success + else api_models_direct.JobResult.create_error( + job_id=uuid.UUID(job_id) + ).model_dump_json() + ), + ) + + httpx_mock.add_callback(handle_submit, method="PUT", url=re.compile(".+/circuit/?$")) + httpx_mock.add_callback( + handle_result, method="GET", url=re.compile(".+/circuit/result/[0-9a-f-]+$") + ) + + backend = DummyDirectAccessResource("token") + job = backend.run(empty_circuit(1), shots=shots) + + assert job.status() is JobStatus.INITIALIZING + + result = job.result() + assert result.success is success + + if success: + assert job.status() is JobStatus.DONE + else: + assert job.status() is JobStatus.ERROR + + +def test_direct_access_mocked_successful_transaction(httpx_mock: HTTPXMock) -> None: + """Mock a successful single-circuit transaction on a direct-access resource.""" + token = str(uuid.uuid4()) + backend = DummyDirectAccessResource(token) + backend.options.with_progress_bar = False + + shots = 122 + qc = empty_circuit(2) + + expected_job_id = str(uuid.uuid4()) + + def handle_submit(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + data = api_models.QuantumCircuit.model_validate_json(request.content.decode("utf-8")) + assert data.repetitions == shots + + return httpx.Response( + status_code=httpx.codes.OK, + text=f'"{expected_job_id}"', + ) + + def handle_result(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + _, job_id = request.url.path.rsplit("/", maxsplit=1) + assert job_id == expected_job_id + + return httpx.Response( + status_code=httpx.codes.OK, + json=json.loads( + api_models_direct.JobResult.create_finished( + job_id=uuid.UUID(job_id), + result=[[0, 0] if s % 2 == 0 else [1, 0] for s in range(shots)], + ).model_dump_json() + ), + ) + + httpx_mock.add_callback(handle_submit, method="PUT", url=re.compile(".+/circuit/?$")) + httpx_mock.add_callback( + handle_result, method="GET", url=re.compile(".+/circuit/result/[0-9a-f-]+$") + ) + + job = backend.run(qc, shots=shots) + result = job.result() + + assert result.get_counts() == {"00": shots // 2, "01": shots // 2} + + +def test_direct_access_mocked_failed_transaction(httpx_mock: HTTPXMock) -> None: + """Mock a failed multi-circuit transaction on a direct-access resource. + + The first two circuits succeed, the third one not. The fourth circuit would succeed, + but is never executed. + """ + token = str(uuid.uuid4()) + backend = DummyDirectAccessResource(token) + backend.options.with_progress_bar = False + + shots = 122 + qc = empty_circuit(2) + + job_ids = [str(uuid.uuid4()) for _ in range(4)] + # produce 2 times the same id before going to the next, one value + # for handle_submit, the other one for handle result. + job_ids_iter = itertools.chain.from_iterable(zip(job_ids, job_ids)) + + # circuit executions' planned success + success = [True, True, False, True] + success_iter = iter(success) + + circuit_submissions = 0 + + def handle_submit(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + data = api_models.QuantumCircuit.model_validate_json(request.content.decode("utf-8")) + assert data.repetitions == shots + + nonlocal circuit_submissions + circuit_submissions += 1 + + return httpx.Response( + status_code=httpx.codes.OK, + text=f'"{next(job_ids_iter)}"', + ) + + def handle_result(request: httpx.Request) -> httpx.Response: + assert request.headers["user-agent"] == USER_AGENT + + _, job_id = request.url.path.rsplit("/", maxsplit=1) + assert job_id == next(job_ids_iter) + + return httpx.Response( + status_code=httpx.codes.OK, + json=json.loads( + api_models_direct.JobResult.create_finished( + job_id=uuid.UUID(job_id), result=[[0, 1] for _ in range(shots)] + ).model_dump_json() + if next(success_iter) + else api_models_direct.JobResult.create_error( + job_id=uuid.UUID(job_id) + ).model_dump_json() + ), + ) + + httpx_mock.add_callback(handle_submit, method="PUT", url=re.compile(".+/circuit/?$")) + httpx_mock.add_callback( + handle_result, method="GET", url=re.compile(".+/circuit/result/[0-9a-f-]+$") + ) + + job = backend.run([qc, qc, qc, qc], shots=shots) + result = job.result() + + assert not result.success # not all circuits executed successfully + + counts = result.get_counts() + assert isinstance(counts, list) # multiple successful circuit executions + assert len(counts) == 2 # the first two circuits executed successfully + assert counts == [{"10": shots}, {"10": shots}] + + assert circuit_submissions == 3 # the last circuit was never submitted