š Usage Guide¶
Last updated: 29āÆAprilāÆ2025 This document is an extended walkthrough of every major feature of Quantum Executor (QE). All examples are executable: open the companion UsageāÆGuide notebook in the repository and run the cells lineābyāline.
š TableāÆofāÆContents¶
āļø Installation & Prerequisites¶
# Recommended: create and activate a fresh virtualāenvironment first
python3 -m venv qexec_env && source qexec_env/bin/activate
pip install quantum-executor # installs QE and core dependencies
# Optional extras for provider SDKs ā install only those you need
pip install 'quantum-executor[azure,braket,ionq,qiskit]'
PythonĀ ā„āÆ3.10 is required. Each cloud provider (AzureāÆQuantum, AmazonāÆBraket, IonQ, etc.) expects credentials. Consult their docs for environmentāvariable names, or use QEās
providers_infoparameter (see below).
š Initializing the Quantum Executor¶
from quantum_executor import QuantumExecutor
# Show providers that QE can configure outāofātheābox
print(QuantumExecutor.default_providers())
['azure', 'braket', 'ionq', 'local_aer', 'qbraid', 'qiskit']
Minimal initialization¶
executor = QuantumExecutor() # tries to init **all** providers
Selective initialization¶
import os
executor = QuantumExecutor(
providers=["local_aer", "ionq", "azure"], # use only this subset
providers_info={
# Secrets *never* hardācode them ā read from envāvars or vault instead
"ionq": {"api_key": os.getenv("IONQ_API_KEY")},
"azure": {"resource_id": "<GUID>", "location": "westus2"},
},
)
QE internally creates a VirtualProvider that proxies calls to the real SDKs, giving you a uniform API across vendors.
š Inspecting Providers & Backends¶
# Which providers were successfully configured?
executor.virtual_provider.get_providers()
['local_aer', 'ionq', 'azure']
# All backends, grouped by provider
from pprint import pprint
pprint(executor.virtual_provider.get_backends())
{
'local_aer': ['aer_simulator', 'fake_torino', ...],
'ionq': ['simulator', 'qpu'],
'azure': [...]
š¦ Preparing a Dispatch¶
A Dispatch is a declarative map that tells QE what to run where. Think of it as the āexecution planā for your experiment.
Creating Circuits¶
from qiskit import QuantumCircuit
# -- Qiskit circuit (2āqubit Bell state) --------------------------
qiskit_circuit = QuantumCircuit(2, 2)
qiskit_circuit.h(0)
qiskit_circuit.cx(0, 1)
qiskit_circuit.measure_all()
# -- OPENQASMĀ 2.0 equivalent -------------------------------------
openqasm_circuit = '''
OPENQASM 2.0;
include "qelib1.inc";
qreg q[2];
creg c[2];
h q[0];
cx q[0], q[1];
measure q -> c;
'''
Languageāagnostic: QE will transpile/convert circuits if a backend requires a specific IR.
Declarative Dispatch (dictāstyle)¶
dispatch = {
"local_aer": {
"fake_torino": [
{ # JobĀ #1
"circuit": qiskit_circuit,
"shots": 1024,
"config": {"seed": 42}, # passed verbatim to the SDK
},
{ # JobĀ #2
"circuit": openqasm_circuit,
"shots": 2048,
}
],
"aer_simulator": [
{ # JobĀ #3
"circuit": qiskit_circuit,
"shots": 1024,
"config": {"seed": 24},
}
],
},
"ionq": {
"simulator": [
{ # JobĀ #4
"circuit": qiskit_circuit,
"shots": 1024,
"config": {"noise": {"model": "aria-1"}},
}
]
}
}
Diagram of relationships
provider āā¬ā backend āā¬ā Job 1
ā āā Job 2
āā backend āā¬ā Job 3
āā ...
Programmatic Dispatch (Dispatch class)¶
When the set of jobs is dynamic (e.g., produced in a loop or by a heuristic), the Dispatch helper is clearer:
from quantum_executor import Dispatch
dispatch = Dispatch() # empty container
dispatch.add_job("local_aer", "fake_torino", qiskit_circuit,
shots=1024, config={"seed": 42})
dispatch.add_job("local_aer", "fake_torino", openqasm_circuit,
shots=2048, config={"seed": 24})
dispatch.add_job("local_aer", "aer_simulator", qiskit_circuit, shots=1024)
dispatch.add_job("ionq", "simulator", qiskit_circuit,
shots=1024, config={"noise": {"model": "aria-1"}})
print(dispatch)
Dispatch({
'local_aer': {
'fake_torino': [
Job(id='0ca2e105ā...', circuit_type=QuantumCircuit, shots=1024, config={'seed': 42}),
Job(id='9f16ea90ā...', circuit_type=str, shots=2048, config={'seed': 24})
],
'aer_simulator': [
Job(id='10803529ā...', circuit_type=QuantumCircuit, shots=1024, config={})
]
},
'ionq': {
'simulator': [
Job(id='125f447bā...', circuit_type=QuantumCircuit, shots=1024,
config={'noise': {'model': 'aria-1'}})
]
}
})
š Running a Dispatch¶
results = executor.run_dispatch(
dispatch,
multiprocess=True, # one Python process **per backend**
wait=True # block until *all* jobs finish
)
Blocking vsāÆNonāBlocking¶
|
Return immediately? |
Useācase |
|---|---|---|
|
āĀ (no) |
Simple scripts, CI pipelines |
|
ā Ā (yes) |
Longārunning experiments, dashboards |
async_results = executor.run_dispatch(dispatch, multiprocess=True, wait=False)
print(async_results)
ResultCollector(complete_jobs=0, total_jobs=4, complete=False)
Call async_results.complete or async_results.wait_for_completion() whenever you need to synchronize.
Understanding the ResultCollector¶
Internally it mirrors the shape of the original dispatch:
from pprint import pprint
pprint(results.get_jobs())
{
'local_aer': {
'fake_torino': [
JobResult(job=Job(id='c7c59f83ā...', circuit_type=QuantumCircuit, shots=1024, config={'seed':42}),
status=Complete,
data={'00': 489, '01': 10, '10': 7, '11': 518}),
JobResult(job=Job(id='45e7b013ā...', circuit_type=str, shots=2048, config={'seed':24}),
status=Complete,
data={'00': 1041, '01': 34, '10': 16, '11': 957})
],
'aer_simulator': [
JobResult(job=Job(id='d6adfc07ā...', circuit_type=QuantumCircuit, shots=1024, config={'seed':24}),
status=Complete,
data={'00': 507, '11': 517})
]
},
'ionq': {
'simulator': [
JobResult(job=Job(id='cdcbf866ā...', circuit_type=QuantumCircuit, shots=1024,
config={'noise': {'model':'aria-1'}}),
status=Complete,
data={'00': 512, '11': 512})
]
}
}
Accessors:
results.get_results() # dict[str, dict[str, list[dict[str,int]]]]
results.get_jobs() # same shape but `JobResult` objects
āļø Designing Split Policies¶
Sometimes one logical experiment needs to be fannedāout to many backends. A split policy encapsulates that decision. It must implement:
def split(
circuit: Any,
shots: int,
backends: dict[str, list[str]],
virtual_provider: VirtualProvider,
policy_data: Any | None = None,
) -> tuple[Dispatch, Any]:
...
Parameters ā¢
circuit: single circuit object (any dialect). ā¢shots: total shots requested by the user. ā¢backends: the allowālist of backends the executor may use. ā¢virtual_provider: for querying capabilities, qubit counts, etc. ā¢policy_data: optional state carried across invocations (e.g., for adaptive policies).
ExampleĀ ā EvenāSplit Policy¶
from typing import Any
from quantum_executor.dispatch import Dispatch
from quantum_executor.virtual_provider import VirtualProvider
def split(
circuit: Any,
shots: int,
backends: dict[str, list[str]],
virtual_provider: VirtualProvider,
policy_data: Any | None = None,
) -> tuple[Dispatch, Any]:
'''
Distribute `shots` as evenly as possible over all candidate backends.
'''
num_backends = sum(map(len, backends.values()))
base, remainder = divmod(shots, num_backends)
# Preācompute the shot allocation
allocation = [base + (1 if i < remainder else 0)
for i in range(num_backends)]
dispatch = Dispatch()
idx = 0
for provider_name, backend_list in backends.items():
for backend_name in backend_list:
dispatch.add_job(
provider_name=provider_name,
backend_name=backend_name,
circuits=circuit.copy(), # avoid stateāsharing
shots=allocation[idx],
)
idx += 1
return dispatch, policy_data
Register & run:
executor.add_policy("even_split", split)
results = executor.run_experiment(
circuit=qiskit_circuit,
shots=1024,
backends={
"local_aer": ["fake_torino", "aer_simulator"],
"ionq": ["simulator"],
},
split_policy="even_split",
multiprocess=True,
wait=True,
)
ResultCollector({
'local_aer': {
'fake_torino': [
JobResult(...shots=342, data={'00': 150, '01': 4, '10': 5, '11': 183})
],
'aer_simulator': [
JobResult(...shots=341, data={'00': 173, '11': 168})
]
},
'ionq': {
'simulator': [
JobResult(...shots=342, data={'00': 171, '11': 171})
]
}
})
š Aggregating Data with Merge Policies¶
A merge policy postāprocesses raw job outputs into an experimentālevel result.
def merge(
results: dict[str, dict[str, list[ResultData]]],
policy_data: Any,
) -> tuple[Any, Any]:
...
Where ResultData is typically a dict[str, int] mapping bitāstrings to counts.
ExampleĀ ā Sum Frequencies¶
from quantum_executor.job_runner import ResultData
def merge_sum(
results: dict[str, dict[str, list[ResultData]]],
policy_data: Any,
) -> tuple[dict[str, int], Any]:
merged: dict[str, int] = {}
for provider_results in results.values():
for backend_results in provider_results.values():
for data in backend_results:
for bitstring, count in data.items():
merged[bitstring] = merged.get(bitstring, 0) + count
return merged, policy_data
executor.add_policy("sum_freqs", merge_policy=merge_sum)
merged_results = executor.run_dispatch(
dispatch,
multiprocess=True,
wait=True,
merge_policy="sum_freqs",
)
print(merged_results)
MergedResultCollector(
merged_results = {'00': 2561, '01': 33, '10': 40, '11': 2486},
initial_policy_data = {},
final_policy_data = {}
)
MergedResultCollector extras¶
Method |
Purpose |
|---|---|
|
Return whatever the merge policy emitted |
|
Any seed or state passed in |
|
The policyās updated state out |
š„ Advanced Topics¶
Multiprocessing Strategies¶
multiprocess=Falseā simpler debugging, serial execution.multiprocess=Trueā one worker per backend; beware of pickling limits.
ProviderāSpecific Configuration¶
Some providers accept extra fields inside the config dict:
Provider |
Key |
Example |
|---|---|---|
IonQ |
|
|
Qiskit |
|
|
Braket |
|
|
Consult vendor docsāQE simply forwards the blob.
ā Previous: Home | Next: HowāÆItāÆWorks ā