quantum_executor package

Subpackages

Submodules

quantum_executor.dispatch module

Module containing classes representing quantum jobs and their dispatch.

class quantum_executor.dispatch.Dispatch(initial_jobs: dict[str, dict[str, list[dict[str, Any]]]] | None = None)[source]

Bases: object

Hold a collection of jobs grouped by provider and backend.

Parameters:

initial_jobs (Optional[Dict[str, Dict[str, List[Dict[str, Any]]]]]) – Nested dictionary mapping provider names to backend names to lists of job-info dicts. Each job-info dict must have keys: - ‘circuit’: Any - ‘shots’: int - optionally ‘configuration’: Dict[str, Any] - optionally ‘id’: str

add_job(provider_name, backend_name, circuits, shots, config=None)[source]

Add one or multiple jobs for the specified provider/backend.

all_jobs()[source]

Iterator over all (provider, backend, job) tuples.

items()[source]

Returns the nested dictionary representation of the dispatch.

add_job(provider_name: str, backend_name: str, circuits: Any | list[Any], shots: int | list[int], config: dict[str, Any] | None = None) None[source]

Add one or more jobs to the dispatch.

If circuits is a list, each element is paired with either a single integer shots (repeated) or with an element from a list of shot counts.

Parameters:
  • provider_name (str) – Name of the quantum provider.

  • backend_name (str) – Name of the backend.

  • circuits (Union[Any, List[Any]]) – A single circuit or a list of circuits.

  • shots (Union[int, List[int]]) – A single shot count or a list of shot counts corresponding to circuits.

  • config (Optional[Dict[str, Any]], optional) – Additional configuration for the job(s), by default None.

Raises:

ValueError – If the lengths of circuits and shots lists do not match as required.

all_jobs() Generator[tuple[str, str, Job], None, None][source]

Return terator over all jobs in the dispatch.

Yields:

Tuple[str, str, Job] – A tuple with (provider_name, backend_name, job).

items() dict[str, dict[str, list[Job]]][source]

Return a shallow copy of the internal jobs dictionary.

Returns:

The nested dictionary mapping provider to backend to list of jobs.

Return type:

Dict[str, Dict[str, List[Job]]]

to_dict() dict[str, dict[str, list[dict[str, Any]]]][source]

Return a dictionary representation of the Dispatch.

Returns:

Nested dictionary mapping provider -> backend -> list of job-info dicts.

Return type:

Dict[str, Dict[str, List[Dict[str, Any]]]]

class quantum_executor.dispatch.Job(circuit: Any, shots: int, configuration: dict[str, Any] | None = None)[source]

Bases: object

Represent a single quantum execution request.

Parameters:
  • circuit (Any) – The quantum circuit to be executed.

  • shots (int) – The number of measurement shots.

  • configuration (Optional[Dict[str, Any]]) – Additional configuration options (e.g., noise models). Defaults to None.

circuit: Any
configuration: dict[str, Any]
id: str
shots: int
to_dict() dict[str, Any][source]

Return a dictionary representation of the Job.

Returns:

Dictionary containing job details: id, circuit, shots, and configuration.

Return type:

Dict[str, Any]

quantum_executor.executor module

The QuantumExecutor orchestrates quantum job splitting, dispatching, execution, and (optionally) result merging.

class quantum_executor.executor.QuantumExecutor(providers_info: dict[str, dict[str, Any]] | None = None, providers: list[str] | None = None, policies_folder: str = '/home/docs/checkouts/readthedocs.org/user_builds/quantum-executor/checkouts/latest/src/quantum_executor/policies', max_workers: int | None = None, raise_exc: bool = False, virtual_provider: VirtualProvider | None = None)[source]

Bases: object

Manage splitting, dispatching, execution, and optional merging of quantum jobs.

Parameters:
  • providers_info (Dict[str, Dict[str, Any]], optional) –

    A dictionary mapping provider names to their respective API keys or configuration. The keys should be the provider names (e.g., “ionq”, “azure”) and the values should be dictionaries containing the necessary parameters for initialization. For example:

    {

    “ionq”: {“api_key”: “your-api-key-here”}, “azure”: {“subscription_id”: “your-subscription-id”, “resource_group”: “your-resource-group”}, “braket”: {“aws_access_key_id”: “your-access-key-id”,

    ”aws_secret_access_key”: “your-secret-access-key”},

    ”local_aer”: {}, “qbraid”: {“api_key”: “your-api-key-here”},

    }

  • providers (List[str], optional) – Which providers to initialize.

  • policies_folder (str, optional) – Where to load/save policy files.

  • max_workers (int, optional) – Max processes for async execution.

  • raise_exc (bool, optional) – If True, propagate initialization or policy-load errors.

  • virtual_provider (VirtualProvider, optional) – If provided, use this instead of creating a new one.

add_policy(name: str, split_policy: Callable[[...], Any] | None = None, merge_policy: Callable[[...], Any] | None = None) None[source]

Dynamically add or update a policy (split and/or merge).

Parameters:
  • name (str) – Policy name.

  • split_policy (Callable[..., Any], optional) – Split function.

  • merge_policy (Callable[..., Any], optional) – Merge function.

add_policy_from_file(file_path: str, raise_exc: bool | None = None) None[source]

Load a policy from file and (re)load all policies.

Parameters:
  • file_path (str) – Path to policy file.

  • raise_exc (bool, optional) – If True, raise on errors. Defaults to constructor setting.

static default_providers() list[str][source]

Get a list of default providers.

Returns:

A list of provider names that are available for use.

Return type:

List[str]

generate_dispatch(circuits: Any | Sequence[Any], shots: int | Sequence[int], backends: dict[str, list[str]], split_policy: str = 'uniform', split_data: dict[str, Any] | None = None) tuple[Dispatch, dict[str, Any]][source]

Split a circuit into jobs based on the specified split policy.

Parameters:
  • circuits (Any or Sequence[Any]) – Quantum circuit or list of quantum circuits.

  • shots (int or Sequence[int]) – Number of shots or list of numbers of shots. If a list, it must match the length of circuits. If a single int, all circuits will use the same number of shots.

  • backends (dict[str, list[str]]) – Provider → list of backends.

  • split_policy (str, optional) – Which split policy to use.

  • split_data (dict, optional) – Initial data for split policy.

Returns:

A Dispatch object containing the jobs and any updated split data.

Return type:

tuple[Dispatch, dict[str, Any]]

get_merge_policy(name: str) Callable[[...], Any][source]

Get a merge policy by name.

Parameters:

name (str) – Policy name.

Returns:

The merge policy function.

Return type:

Callable[…, Any]

Raises:

KeyError – If not found or policy lacks a merge.

get_split_policy(name: str) Callable[[...], Any][source]

Get a split policy by name.

Parameters:

name (str) – Policy name.

Returns:

The split policy function.

Return type:

Callable[…, Any]

Raises:

KeyError – If not found or policy lacks a split.

property policies: list[str]

List all loaded policy names.

Returns:

List of loaded policy names.

Return type:

List[str]

run_dispatch(dispatch: Dispatch | DispatchDict, multiprocess: bool = False, wait: bool = True, max_workers: int | None = None, merge_policy: str | None = None, merge_data: dict[str, Any] | None = None) ResultCollector | MergedResultCollector[source]

Execute all jobs in a Dispatch and optionally merge their results.

Parameters:
  • dispatch (Dispatch or DispatchDict) – Jobs to execute.

  • multiprocess (bool, optional) – If True, run in parallel processes.

  • wait (bool, optional) – If True, block until execution (and merge) finishes. If False, jobs run in a background thread (sequentially if multiprocess=False, or gathering + merging in threads if multiprocess=True).

  • max_workers (int, optional) – Override for max parallel processes.

  • merge_policy (str or None, optional) – Which merge policy to apply after dispatch.

  • merge_data (dict, optional) – Initial data for merge policy.

Returns:

Raw results if merge_policy is None, otherwise merged.

Return type:

ResultCollector or MergedResultCollector

run_experiment(circuits: Any | Sequence[Any], shots: int | Sequence[int], backends: dict[str, list[str]], split_policy: str = 'uniform', merge_policy: str | None = None, multiprocess: bool = False, wait: bool = True, split_data: dict[str, Any] | None = None, merge_data: dict[str, Any] | None = None, max_workers: int | None = None) ResultCollector | MergedResultCollector[source]

Split a circuit into jobs, dispatch them, and optionally merge results.

Parameters:
  • circuits (Any or Sequence[Any]) – Quantum circuit or list of quantum circuits.

  • shots (int or Sequence[int]) – Number of shots or list of numbers of shots. If a list, it must match the length of circuits. If a single int, all circuits will use the same number of shots.

  • backends (dict[str, list[str]]) – Provider → list of backends.

  • split_policy (str, optional) – Which split policy to use.

  • merge_policy (str or None, optional) – Which merge policy to use; if None, skip merging.

  • multiprocess (bool, optional) – If True, run jobs in parallel processes.

  • wait (bool, optional) – If True, block until execution (and merge) finishes.

  • split_data (dict, optional) – Initial data for split policy.

  • merge_data (dict, optional) – Initial data for merge policy, if None, use updated split data.

  • max_workers (int, optional) – Override for max parallel processes.

Returns:

Unmerged collector if merge_policy is None, otherwise merged.

Return type:

ResultCollector or MergedResultCollector

property virtual_provider: VirtualProvider

Get the virtual provider.

Returns:

The virtual provider instance.

Return type:

VirtualProvider

quantum_executor.executor.add_policy_from_file(file_path: str, policy_folder: str, raise_exc: bool = False) None[source]

Load a policy module from file and copy it into the policies folder.

Parameters:
  • file_path (str) – Path to the Python file containing the policy.

  • policy_folder (str) – Folder where policies live.

  • raise_exc (bool, optional) – If True, raise on errors. Otherwise, log warnings. Defaults to False.

quantum_executor.executor.load_policies_from_folder(folder_path: str, raise_exc: bool = False) dict[str, dict[str, Callable[[...], Any]]][source]

Dynamically load split and/or merge policies from Python files in a folder.

Parameters:
  • folder_path (str) – Path to the folder containing policy files.

  • raise_exc (bool, optional) – If True, raise on missing folder or load errors. Otherwise, log a warning. Defaults to False.

Returns:

Mapping policy names → dict with keys “split” and/or “merge”.

Return type:

Dict[str, Dict[str, Callable[…, Any]]]

quantum_executor.job_runner module

Module with helper function to execute a single quantum job.

quantum_executor.job_runner.run_single_job_static(provider_name: str, backend_name: str, circuit: Any, shots: int, config: dict[str, Any] | None = None, providers_info: dict[str, dict[str, Any]] | None = None, providers: list[str] | None = None, raise_exc: bool = True, virtual_provider: VirtualProvider | None = None) dict[str, Any][source]

Worker function to execute a single quantum job.

Parameters:
  • provider_name (str) – Name of the quantum provider (e.g., “local_aer”, “ionq”, etc.).

  • backend_name (str) – The specific backend name for the provider.

  • circuit (Any) – The quantum circuit to be executed.

  • shots (int) – Number of execution shots.

  • config (Dict[str, Any], optional) – Additional job configuration parameters.

  • providers_info (Dict[str, Dict[str, Any]], optional) –

    A dictionary mapping provider names to their respective API keys or configuration. The keys should be the provider names (e.g., “ionq”, “azure”) and the values should be dictionaries containing the necessary parameters for initialization. For example: {

    ”ionq”: {“api_key”: “your-api-key-here”}, “azure”: {“subscription_id”: “your-subscription-id”, “resource_group”: “your-resource-group”}, “braket”: {“aws_access_key_id”: “your-access-key-id”,

    ”aws_secret_access_key”: “your-secret-access-key”},

    ”local_aer”: {}, “qbraid”: {“api_key”: “your-api-key-here”},

    }

  • providers (Dict[str, str], optional) – A list of provider names to include in the initialization. Only the providers with names in this list will be initialized. Defaults to None, meaning all providers are included.

  • raise_exc (bool, optional) – If True, exceptions are re-raised; otherwise, they are logged and returned as error data.

  • virtual_provider (Optional[VirtualProvider], optional) – An optional instance of VirtualProvider. If None, a new one is created.

Returns:

The result counts from the job execution or an error dictionary.

Return type:

ResultData

quantum_executor.result_collector module

Implement thread-safe collectors for aggregating job results.

class quantum_executor.result_collector.JobResult(job: Job, data: ResultData | None = None)[source]

Bases: object

Container for holding the outcome of a job execution.

Parameters:
  • job (Job) – The job instance that was executed.

  • data (ResultData, optional) – The result data produced by the job execution, by default None.

get_data() ResultData | None[source]

Retrieve the result data if the job execution is complete.

Returns:

The result data if complete, otherwise None.

Return type:

ResultData or None

class quantum_executor.result_collector.MergedResultCollector(results: ResultCollector)[source]

Bases: object

A specialized ResultCollector that handles merging of results.

This class is used to collect and merge results from multiple jobs.

Locking:

When both the inner ResultCollector._lock and this MergedResultCollector._lock must be held, always acquire ResultCollector._lock first, then MergedResultCollector._lock.

Parameters:

results (ResultCollector) – A ResultCollector instance containing job results.

property complete: bool

Check if the collector has completed processing.

Returns:

True if all jobs are complete, False otherwise.

Return type:

bool

get_final_policy_data() dict[str, Any] | None[source]

Return the final policy data.

Returns:

The final policy data.

Return type:

Optional[Dict[str, Any]]

get_initial_policy_data() dict[str, Any] | None[source]

Return the initial policy data.

Returns:

The initial policy data.

Return type:

Optional[Dict[str, Any]]

get_jobs() dict[str, dict[str, list[JobResult]]][source]

Return the jobs stored in the collector.

Returns:

A dictionary mapping provider names to backend names and their respective job results.

Return type:

Dict[str, Dict[str, List[JobResult]]]

get_merged_results() Any[source]

Return the merged results.

Returns:

The merged results.

Return type:

Any

get_results() dict[str, dict[str, list[ResultData | None]]][source]

Return the results stored in the collector.

Returns:

A dictionary mapping provider names to backend names and their respective result data.

Return type:

Dict[str, Dict[str, List[Optional[ResultData]]]]

set_merged_results(merged_results: Any, initial_policy_data: dict[str, Any], final_policy_data: dict[str, Any]) None[source]

Update the merged results, initial policy data, and final policy data.

Parameters:
  • merged_results (Any) – The merged results to store.

  • initial_policy_data (Dict[str, Any]) – The initial policy data to store.

  • final_policy_data (Dict[str, Any]) – The final policy data to store.

wait_for_completion(timeout: float | None = None) bool[source]

Block until all registered job results are complete or until the timeout expires.

Parameters:

timeout (float or None, optional) – Maximum number of seconds to wait. If None, waits indefinitely.

Returns:

True if the collector completed within the timeout, False otherwise.

Return type:

bool

class quantum_executor.result_collector.ReadWriteLock[source]

Bases: object

Simple reentrant read-write lock implementation.

This lock allows multiple readers to acquire the lock concurrently, while a writer gains exclusive access. It is built atop a reentrant mutex so that the same thread may re-acquire read or write locks.

Locking semantics:
  • Multiple threads can hold the read lock concurrently.

  • Only one thread can hold the write lock, and only when no readers hold the lock.

  • A thread that holds the write lock can re-acquire read or write.

Usage:
with lock.read():

# read-critical section

with lock.write():

# write-critical section

acquire_read() None[source]

Acquire the lock for reading.

Multiple threads can hold the read lock concurrently.

acquire_write() None[source]

Acquire the lock for writing.

Blocks until all readers have released the lock.

read() Generator[None, None, None][source]

Acquire the lock for reading.

release_read() None[source]

Release the read lock.

Decrement the reader count and notify waiting writers if no readers remain.

release_write() None[source]

Release the write lock.

write() Generator[None, None, None][source]

Acquire the lock for writing.

class quantum_executor.result_collector.ResultCollector[source]

Bases: object

Thread-safe collector for job results stored in a nested dictionary.

The structure mirrors the dispatch structure and allows storing and retrieving job results across multiple providers and backends.

Locking:

A single ReadWriteLock (_lock) protects nested_results, _job_mapping, and _complete/_completion_event. Always acquire ResultCollector._lock before acquiring any external locks.

nested_results

A nested dictionary mapping provider names to backends and their job results.

Type:

Dict[str, Dict[str, List[JobResult]]]

property complete: bool

Check whether all registered job results are complete.

Returns:

True if the collector is marked complete or if all job results are complete; False otherwise.

Return type:

bool

get_jobs() dict[str, dict[str, list[JobResult]]][source]

Retrieve a shallow copy of the registered job results.

Returns:

A copy of the nested dictionary of JobResult objects.

Return type:

Dict[str, Dict[str, List[JobResult]]]

get_results() dict[str, dict[str, list[ResultData | None]]][source]

Retrieve a shallow copy of the nested job results.

Returns:

A copy of the job results nested dictionary with each job’s data.

Return type:

Dict[str, Dict[str, List[Optional[ResultData]]]]

register_job_mapping(job: Job, provider_name: str, backend_name: str) None[source]

Register a job and create a placeholder JobResult in the collector.

Parameters:
  • job (Job) – The job instance.

  • provider_name (str) – The provider name.

  • backend_name (str) – The backend name.

store_result(job: Job, result_data: ResultData) None[source]

Update a job’s placeholder with the actual result data.

After storing the result, the method checks if all registered jobs are complete and, if so, signals completion to waiting threads.

Parameters:
  • job (Job) – The job whose result is to be stored.

  • result_data (ResultData) – The result data produced by the job execution.

Raises:

ValueError – If the job has not been registered.

wait_for_completion(timeout: float | None = None) bool[source]

Block until all registered job results are complete or until the timeout expires.

This method waits on an internal event that is set when the collector is complete.

Parameters:

timeout (float or None, optional) – Maximum number of seconds to wait. If None, waits indefinitely.

Returns:

True if the collector completed within the timeout, False otherwise.

Return type:

bool

quantum_executor.virtual_provider module

VirtualProvider module for managing quantum computing providers.

This module provides the VirtualProvider class, which is responsible for initializing and managing multiple quantum computing providers. It allows users to retrieve available backends, filter them based on their online status, and access specific backends by their identifiers.

class quantum_executor.virtual_provider.VirtualProvider(providers_info: dict[str, dict[str, Any]] | None = None, include: list[str] | None = None, raise_exc: bool = False)[source]

Bases: object

Manage the initialization of quantum computing providers and retrieval of available backends.

This class is responsible for:
  • Instantiating provider classes using the supplied API keys (if required).

  • Retrieving all available backends from each provider.

  • Optionally filtering backends based on their online status.

Instead of specifying which providers to exclude, users can now optionally specify a list of provider names to include. If the include list is not provided, all available providers will be initialized.

Parameters:
  • providers_info (Dict[str, Dict[str, Any]], optional) –

    A dictionary mapping provider names to their respective API keys or configuration. The keys should be the provider names (e.g., “ionq”, “azure”) and the values should be dictionaries containing the necessary parameters for initialization. For example:

    {

    “ionq”: {“api_key”: “your-api-key-here”}, “azure”: {“subscription_id”: “your-subscription-id”, “resource_group”: “your-resource-group”}, “braket”: {“aws_access_key_id”: “your-access-key-id”,

    ”aws_secret_access_key”: “your-secret-access-key”},

    ”local_aer”: {}, “qbraid”: {“api_key”: “your-api-key-here”},

    }

  • include (list[str], optional) – A list of provider names to include in the initialization. Only the providers with names in this list will be initialized. Defaults to None, meaning all providers are included.

  • raise_exc (bool, optional) – If True, exceptions during provider initialization will be propagated. Otherwise, the error is logged and initialization continues. Defaults to False.

Examples

>>> vp = VirtualProvider(providers_info={"ionq": "your-api-key-here"}, include=["ionq"])
>>> providers = vp.get_providers()
>>> backends = vp.get_backends(online=True)
add_provider(provider_name: str, provider: QuantumProvider) None[source]

Add a new provider to the list of initialized providers.

Parameters:
  • provider_name (str) – The name of the provider to add.

  • provider (QuantumProvider) – An instance of the provider to add.

Raises:

ValueError – If the provider name is already in use.

static default_providers() list[str][source]

Get a list of default providers.

Returns:

A list of provider names that are available for use.

Return type:

List[str]

get_backend(provider_name: str, backend_name: str, online: bool = True) QuantumDevice[source]

Retrieve a specific backend from the specified provider.

The method fetches the backend using its identifier and, if required, checks that the backend is online. If the provider is not initialized or the backend is offline, an error is raised.

Parameters:
  • provider_name (str) – The name of the provider from which the backend should be retrieved.

  • backend_name (str) – The identifier (device_id) of the backend to retrieve.

  • online (bool, optional) – If True, raises an error if the backend is not online. Defaults to True.

Returns:

An instance of the requested backend.

Return type:

QuantumDevice

Raises:
  • ValueError – If the provider with the given name is not initialized.

  • RuntimeError – If the backend is offline when online status is required, or if another error occurs during retrieval.

get_backends(online: bool = True) dict[str, dict[str, QuantumDevice]][source]

Retrieve available backends for each provider.

The method retrieves devices from each provider and, if requested, filters them based on their online status.

Parameters:

online (bool, optional) – If True, only devices with an online status are returned; if False, all devices are returned regardless of status. Defaults to True.

Returns:

A dictionary mapping provider names to dictionaries of backend IDs and QuantumDevice instances. For example:

{
“local_aer”: {

“backend_id_1”: QuantumDevice(…), “backend_id_2”: QuantumDevice(…),

}, “ionq”: { … },

}

Return type:

Dict[str, Dict[str, QuantumDevice]]

get_providers() dict[str, QuantumProvider][source]

Get the initialized providers.

Returns:

A dictionary mapping provider names to their respective provider instances. For example:

{

“local_aer”: LocalAERProvider(…), “ionq”: IonQProvider(…),

}

Return type:

dict[str, QuantumProvider]

Module contents

Quantum Executor Module.

class quantum_executor.Dispatch(initial_jobs: dict[str, dict[str, list[dict[str, Any]]]] | None = None)[source]

Bases: object

Hold a collection of jobs grouped by provider and backend.

Parameters:

initial_jobs (Optional[Dict[str, Dict[str, List[Dict[str, Any]]]]]) – Nested dictionary mapping provider names to backend names to lists of job-info dicts. Each job-info dict must have keys: - ‘circuit’: Any - ‘shots’: int - optionally ‘configuration’: Dict[str, Any] - optionally ‘id’: str

add_job(provider_name, backend_name, circuits, shots, config=None)[source]

Add one or multiple jobs for the specified provider/backend.

all_jobs()[source]

Iterator over all (provider, backend, job) tuples.

items()[source]

Returns the nested dictionary representation of the dispatch.

add_job(provider_name: str, backend_name: str, circuits: Any | list[Any], shots: int | list[int], config: dict[str, Any] | None = None) None[source]

Add one or more jobs to the dispatch.

If circuits is a list, each element is paired with either a single integer shots (repeated) or with an element from a list of shot counts.

Parameters:
  • provider_name (str) – Name of the quantum provider.

  • backend_name (str) – Name of the backend.

  • circuits (Union[Any, List[Any]]) – A single circuit or a list of circuits.

  • shots (Union[int, List[int]]) – A single shot count or a list of shot counts corresponding to circuits.

  • config (Optional[Dict[str, Any]], optional) – Additional configuration for the job(s), by default None.

Raises:

ValueError – If the lengths of circuits and shots lists do not match as required.

all_jobs() Generator[tuple[str, str, Job], None, None][source]

Return terator over all jobs in the dispatch.

Yields:

Tuple[str, str, Job] – A tuple with (provider_name, backend_name, job).

items() dict[str, dict[str, list[Job]]][source]

Return a shallow copy of the internal jobs dictionary.

Returns:

The nested dictionary mapping provider to backend to list of jobs.

Return type:

Dict[str, Dict[str, List[Job]]]

to_dict() dict[str, dict[str, list[dict[str, Any]]]][source]

Return a dictionary representation of the Dispatch.

Returns:

Nested dictionary mapping provider -> backend -> list of job-info dicts.

Return type:

Dict[str, Dict[str, List[Dict[str, Any]]]]

class quantum_executor.QuantumExecutor(providers_info: dict[str, dict[str, Any]] | None = None, providers: list[str] | None = None, policies_folder: str = '/home/docs/checkouts/readthedocs.org/user_builds/quantum-executor/checkouts/latest/src/quantum_executor/policies', max_workers: int | None = None, raise_exc: bool = False, virtual_provider: VirtualProvider | None = None)[source]

Bases: object

Manage splitting, dispatching, execution, and optional merging of quantum jobs.

Parameters:
  • providers_info (Dict[str, Dict[str, Any]], optional) –

    A dictionary mapping provider names to their respective API keys or configuration. The keys should be the provider names (e.g., “ionq”, “azure”) and the values should be dictionaries containing the necessary parameters for initialization. For example:

    {

    “ionq”: {“api_key”: “your-api-key-here”}, “azure”: {“subscription_id”: “your-subscription-id”, “resource_group”: “your-resource-group”}, “braket”: {“aws_access_key_id”: “your-access-key-id”,

    ”aws_secret_access_key”: “your-secret-access-key”},

    ”local_aer”: {}, “qbraid”: {“api_key”: “your-api-key-here”},

    }

  • providers (List[str], optional) – Which providers to initialize.

  • policies_folder (str, optional) – Where to load/save policy files.

  • max_workers (int, optional) – Max processes for async execution.

  • raise_exc (bool, optional) – If True, propagate initialization or policy-load errors.

  • virtual_provider (VirtualProvider, optional) – If provided, use this instead of creating a new one.

add_policy(name: str, split_policy: Callable[[...], Any] | None = None, merge_policy: Callable[[...], Any] | None = None) None[source]

Dynamically add or update a policy (split and/or merge).

Parameters:
  • name (str) – Policy name.

  • split_policy (Callable[..., Any], optional) – Split function.

  • merge_policy (Callable[..., Any], optional) – Merge function.

add_policy_from_file(file_path: str, raise_exc: bool | None = None) None[source]

Load a policy from file and (re)load all policies.

Parameters:
  • file_path (str) – Path to policy file.

  • raise_exc (bool, optional) – If True, raise on errors. Defaults to constructor setting.

static default_providers() list[str][source]

Get a list of default providers.

Returns:

A list of provider names that are available for use.

Return type:

List[str]

generate_dispatch(circuits: Any | Sequence[Any], shots: int | Sequence[int], backends: dict[str, list[str]], split_policy: str = 'uniform', split_data: dict[str, Any] | None = None) tuple[Dispatch, dict[str, Any]][source]

Split a circuit into jobs based on the specified split policy.

Parameters:
  • circuits (Any or Sequence[Any]) – Quantum circuit or list of quantum circuits.

  • shots (int or Sequence[int]) – Number of shots or list of numbers of shots. If a list, it must match the length of circuits. If a single int, all circuits will use the same number of shots.

  • backends (dict[str, list[str]]) – Provider → list of backends.

  • split_policy (str, optional) – Which split policy to use.

  • split_data (dict, optional) – Initial data for split policy.

Returns:

A Dispatch object containing the jobs and any updated split data.

Return type:

tuple[Dispatch, dict[str, Any]]

get_merge_policy(name: str) Callable[[...], Any][source]

Get a merge policy by name.

Parameters:

name (str) – Policy name.

Returns:

The merge policy function.

Return type:

Callable[…, Any]

Raises:

KeyError – If not found or policy lacks a merge.

get_split_policy(name: str) Callable[[...], Any][source]

Get a split policy by name.

Parameters:

name (str) – Policy name.

Returns:

The split policy function.

Return type:

Callable[…, Any]

Raises:

KeyError – If not found or policy lacks a split.

property policies: list[str]

List all loaded policy names.

Returns:

List of loaded policy names.

Return type:

List[str]

run_dispatch(dispatch: Dispatch | DispatchDict, multiprocess: bool = False, wait: bool = True, max_workers: int | None = None, merge_policy: str | None = None, merge_data: dict[str, Any] | None = None) ResultCollector | MergedResultCollector[source]

Execute all jobs in a Dispatch and optionally merge their results.

Parameters:
  • dispatch (Dispatch or DispatchDict) – Jobs to execute.

  • multiprocess (bool, optional) – If True, run in parallel processes.

  • wait (bool, optional) – If True, block until execution (and merge) finishes. If False, jobs run in a background thread (sequentially if multiprocess=False, or gathering + merging in threads if multiprocess=True).

  • max_workers (int, optional) – Override for max parallel processes.

  • merge_policy (str or None, optional) – Which merge policy to apply after dispatch.

  • merge_data (dict, optional) – Initial data for merge policy.

Returns:

Raw results if merge_policy is None, otherwise merged.

Return type:

ResultCollector or MergedResultCollector

run_experiment(circuits: Any | Sequence[Any], shots: int | Sequence[int], backends: dict[str, list[str]], split_policy: str = 'uniform', merge_policy: str | None = None, multiprocess: bool = False, wait: bool = True, split_data: dict[str, Any] | None = None, merge_data: dict[str, Any] | None = None, max_workers: int | None = None) ResultCollector | MergedResultCollector[source]

Split a circuit into jobs, dispatch them, and optionally merge results.

Parameters:
  • circuits (Any or Sequence[Any]) – Quantum circuit or list of quantum circuits.

  • shots (int or Sequence[int]) – Number of shots or list of numbers of shots. If a list, it must match the length of circuits. If a single int, all circuits will use the same number of shots.

  • backends (dict[str, list[str]]) – Provider → list of backends.

  • split_policy (str, optional) – Which split policy to use.

  • merge_policy (str or None, optional) – Which merge policy to use; if None, skip merging.

  • multiprocess (bool, optional) – If True, run jobs in parallel processes.

  • wait (bool, optional) – If True, block until execution (and merge) finishes.

  • split_data (dict, optional) – Initial data for split policy.

  • merge_data (dict, optional) – Initial data for merge policy, if None, use updated split data.

  • max_workers (int, optional) – Override for max parallel processes.

Returns:

Unmerged collector if merge_policy is None, otherwise merged.

Return type:

ResultCollector or MergedResultCollector

property virtual_provider: VirtualProvider

Get the virtual provider.

Returns:

The virtual provider instance.

Return type:

VirtualProvider

class quantum_executor.VirtualProvider(providers_info: dict[str, dict[str, Any]] | None = None, include: list[str] | None = None, raise_exc: bool = False)[source]

Bases: object

Manage the initialization of quantum computing providers and retrieval of available backends.

This class is responsible for:
  • Instantiating provider classes using the supplied API keys (if required).

  • Retrieving all available backends from each provider.

  • Optionally filtering backends based on their online status.

Instead of specifying which providers to exclude, users can now optionally specify a list of provider names to include. If the include list is not provided, all available providers will be initialized.

Parameters:
  • providers_info (Dict[str, Dict[str, Any]], optional) –

    A dictionary mapping provider names to their respective API keys or configuration. The keys should be the provider names (e.g., “ionq”, “azure”) and the values should be dictionaries containing the necessary parameters for initialization. For example:

    {

    “ionq”: {“api_key”: “your-api-key-here”}, “azure”: {“subscription_id”: “your-subscription-id”, “resource_group”: “your-resource-group”}, “braket”: {“aws_access_key_id”: “your-access-key-id”,

    ”aws_secret_access_key”: “your-secret-access-key”},

    ”local_aer”: {}, “qbraid”: {“api_key”: “your-api-key-here”},

    }

  • include (list[str], optional) – A list of provider names to include in the initialization. Only the providers with names in this list will be initialized. Defaults to None, meaning all providers are included.

  • raise_exc (bool, optional) – If True, exceptions during provider initialization will be propagated. Otherwise, the error is logged and initialization continues. Defaults to False.

Examples

>>> vp = VirtualProvider(providers_info={"ionq": "your-api-key-here"}, include=["ionq"])
>>> providers = vp.get_providers()
>>> backends = vp.get_backends(online=True)
add_provider(provider_name: str, provider: QuantumProvider) None[source]

Add a new provider to the list of initialized providers.

Parameters:
  • provider_name (str) – The name of the provider to add.

  • provider (QuantumProvider) – An instance of the provider to add.

Raises:

ValueError – If the provider name is already in use.

static default_providers() list[str][source]

Get a list of default providers.

Returns:

A list of provider names that are available for use.

Return type:

List[str]

get_backend(provider_name: str, backend_name: str, online: bool = True) QuantumDevice[source]

Retrieve a specific backend from the specified provider.

The method fetches the backend using its identifier and, if required, checks that the backend is online. If the provider is not initialized or the backend is offline, an error is raised.

Parameters:
  • provider_name (str) – The name of the provider from which the backend should be retrieved.

  • backend_name (str) – The identifier (device_id) of the backend to retrieve.

  • online (bool, optional) – If True, raises an error if the backend is not online. Defaults to True.

Returns:

An instance of the requested backend.

Return type:

QuantumDevice

Raises:
  • ValueError – If the provider with the given name is not initialized.

  • RuntimeError – If the backend is offline when online status is required, or if another error occurs during retrieval.

get_backends(online: bool = True) dict[str, dict[str, QuantumDevice]][source]

Retrieve available backends for each provider.

The method retrieves devices from each provider and, if requested, filters them based on their online status.

Parameters:

online (bool, optional) – If True, only devices with an online status are returned; if False, all devices are returned regardless of status. Defaults to True.

Returns:

A dictionary mapping provider names to dictionaries of backend IDs and QuantumDevice instances. For example:

{
“local_aer”: {

“backend_id_1”: QuantumDevice(…), “backend_id_2”: QuantumDevice(…),

}, “ionq”: { … },

}

Return type:

Dict[str, Dict[str, QuantumDevice]]

get_providers() dict[str, QuantumProvider][source]

Get the initialized providers.

Returns:

A dictionary mapping provider names to their respective provider instances. For example:

{

“local_aer”: LocalAERProvider(…), “ionq”: IonQProvider(…),

}

Return type:

dict[str, QuantumProvider]