Source code for quantum_executor.result_collector

"""Implement thread-safe collectors for aggregating job results."""

import threading
import time
from collections.abc import Generator
from contextlib import contextmanager
from typing import TYPE_CHECKING
from typing import Any
from typing import Optional

if TYPE_CHECKING:  # pragma: no cover
    from quantum_executor.dispatch import Job
    from quantum_executor.job_runner import ResultData


[docs] class ReadWriteLock: """Simple reentrant read-write lock implementation. This lock allows multiple readers to acquire the lock concurrently, while a writer gains exclusive access. It is built atop a reentrant mutex so that the same thread may re-acquire read or write locks. Locking semantics: - Multiple threads can hold the read lock concurrently. - Only one thread can hold the write lock, and only when no readers hold the lock. - A thread that holds the write lock can re-acquire read or write. Usage: with lock.read(): # read-critical section with lock.write(): # write-critical section """ def __init__(self) -> None: """Initialize the ReadWriteLock.""" self._read_ready = threading.Condition(threading.RLock()) self._readers: int = 0 self._writers_waiting: int = 0 self._writer_active: bool = False
[docs] def acquire_read(self) -> None: """Acquire the lock for reading. Multiple threads can hold the read lock concurrently. """ with self._read_ready: while self._writers_waiting > 0 or self._writer_active: self._read_ready.wait() self._readers += 1
[docs] def release_read(self) -> None: """Release the read lock. Decrement the reader count and notify waiting writers if no readers remain. """ with self._read_ready: self._readers -= 1 if self._readers == 0: self._read_ready.notify_all()
[docs] def acquire_write(self) -> None: """Acquire the lock for writing. Blocks until all readers have released the lock. """ with self._read_ready: self._writers_waiting += 1 while self._readers > 0 or self._writer_active: self._read_ready.wait() self._writers_waiting -= 1 self._writer_active = True
[docs] def release_write(self) -> None: """Release the write lock.""" with self._read_ready: self._writer_active = False self._read_ready.notify_all()
[docs] @contextmanager def read(self) -> Generator[None, None, None]: """Acquire the lock for reading.""" self.acquire_read() try: yield finally: self.release_read()
[docs] @contextmanager def write(self) -> Generator[None, None, None]: """Acquire the lock for writing.""" self.acquire_write() try: yield finally: self.release_write()
[docs] class JobResult: """Container for holding the outcome of a job execution. Parameters ---------- job : Job The job instance that was executed. data : ResultData, optional The result data produced by the job execution, by default None. """ def __init__(self, job: "Job", data: Optional["ResultData"] = None) -> None: """Initialize a JobResult instance. Parameters ---------- job : Job The job instance. data : ResultData, optional The result data, by default None. """ self.job: Job = job self.data: Any = data self.complete: bool = data is not None def __repr__(self) -> str: """Represent the JobResult as a string. Returns ------- str A string representation indicating the job, its completion status, and result data. """ status = "Complete" if self.complete else "Pending" return f"JobResult(job={self.job}, status={status}, data={self.data})"
[docs] def get_data(self) -> Optional["ResultData"]: """Retrieve the result data if the job execution is complete. Returns ------- ResultData or None The result data if complete, otherwise None. """ return self.data if self.complete else None
[docs] class ResultCollector: """Thread-safe collector for job results stored in a nested dictionary. The structure mirrors the dispatch structure and allows storing and retrieving job results across multiple providers and backends. Locking: A single ReadWriteLock (_lock) protects nested_results, _job_mapping, and _complete/_completion_event. Always acquire ResultCollector._lock before acquiring any external locks. Attributes ---------- nested_results : Dict[str, Dict[str, List[JobResult]]] A nested dictionary mapping provider names to backends and their job results. """ def __init__(self) -> None: """Initialize the ResultCollector. Sets up the nested results dictionary, job mapping, locks, and a completion event. """ self.nested_results: dict[str, dict[str, list[JobResult]]] = {} # Map each Job → its placeholder JobResult, so we can update it directly. self._job_mapping: dict[Job, JobResult] = {} self._lock = ReadWriteLock() self._complete: bool = False self._completion_event = threading.Event() def __repr__(self) -> str: """Represent the ResultCollector as a string. Returns ------- str A summary including the number of complete and total job results and the collector state. """ with self._lock.read(): if self.complete: return f"ResultCollector({self.nested_results})" total_jobs = sum( len(job_list) for backends in self.nested_results.values() for job_list in backends.values() ) complete_jobs = sum( int(job.complete) for backends in self.nested_results.values() for job_list in backends.values() for job in job_list ) return f"ResultCollector(complete_jobs={complete_jobs}, total_jobs={total_jobs}, complete={self.complete})"
[docs] def register_job_mapping(self, job: "Job", provider_name: str, backend_name: str) -> None: """Register a job and create a placeholder JobResult in the collector. Parameters ---------- job : Job The job instance. provider_name : str The provider name. backend_name : str The backend name. """ with self._lock.write(): if provider_name not in self.nested_results: self.nested_results[provider_name] = {} if backend_name not in self.nested_results[provider_name]: self.nested_results[provider_name][backend_name] = [] placeholder = JobResult(job, data=None) self.nested_results[provider_name][backend_name].append(placeholder) self._job_mapping[job] = placeholder
[docs] def store_result(self, job: "Job", result_data: "ResultData") -> None: """Update a job's placeholder with the actual result data. After storing the result, the method checks if all registered jobs are complete and, if so, signals completion to waiting threads. Parameters ---------- job : Job The job whose result is to be stored. result_data : ResultData The result data produced by the job execution. Raises ------ ValueError If the job has not been registered. """ with self._lock.write(): if job not in self._job_mapping: raise ValueError("Job mapping not found. Call register_job_mapping first.") job_result = self._job_mapping[job] job_result.data = result_data job_result.complete = True # Automatically mark the collector complete if all jobs are done. if self._all_jobs_complete(): self._complete = True self._completion_event.set()
[docs] def wait_for_completion(self, timeout: float | None = None) -> bool: """Block until all registered job results are complete or until the timeout expires. This method waits on an internal event that is set when the collector is complete. Parameters ---------- timeout : float or None, optional Maximum number of seconds to wait. If None, waits indefinitely. Returns ------- bool True if the collector completed within the timeout, False otherwise. """ return self._completion_event.wait(timeout)
[docs] def get_results(self) -> dict[str, dict[str, list[Optional["ResultData"]]]]: """Retrieve a shallow copy of the nested job results. Returns ------- Dict[str, Dict[str, List[Optional[ResultData]]]] A copy of the job results nested dictionary with each job's data. """ with self._lock.read(): return { provider: {backend: [job.get_data() for job in job_list] for backend, job_list in backends.items()} for provider, backends in self.nested_results.items() }
[docs] def get_jobs(self) -> dict[str, dict[str, list[JobResult]]]: """Retrieve a shallow copy of the registered job results. Returns ------- Dict[str, Dict[str, List[JobResult]]] A copy of the nested dictionary of JobResult objects. """ with self._lock.read(): return { provider: {backend: job_list[:] for backend, job_list in backends.items()} for provider, backends in self.nested_results.items() }
@property def complete(self) -> bool: """Check whether all registered job results are complete. Returns ------- bool True if the collector is marked complete or if all job results are complete; False otherwise. """ with self._lock.read(): return self._complete or self._all_jobs_complete() @complete.setter def complete(self, value: bool) -> None: """Manually set the overall completion status of the collector. Setting this to True signals waiting threads. Parameters ---------- value : bool The new completion status. """ with self._lock.write(): self._complete = value if value: self._completion_event.set() else: self._completion_event.clear() def _all_jobs_complete(self) -> bool: """Check if all registered jobs have been completed. Returns ------- bool True if every registered JobResult is complete; otherwise False. """ # No lock here; callers must hold at least a read lock. for backends in self.nested_results.values(): for job_list in backends.values(): for job_result in job_list: if not job_result.complete: return False return True
[docs] class MergedResultCollector: """A specialized ResultCollector that handles merging of results. This class is used to collect and merge results from multiple jobs. Locking: When both the inner ResultCollector._lock and this MergedResultCollector._lock must be held, always acquire ResultCollector._lock first, then MergedResultCollector._lock. Parameters ---------- results : ResultCollector A ResultCollector instance containing job results. """ def __init__(self, results: ResultCollector) -> None: """Initialize the MergedResultCollector. Parameters ---------- results : ResultCollector A ResultCollector instance containing job results. """ self.results: ResultCollector = results self.merged_results: Any | None = None self._merged_results_ready_event = threading.Event() self.initial_policy_data: dict[str, Any] | None = None self.final_policy_data: dict[str, Any] | None = None self._lock = ReadWriteLock() @property def complete(self) -> bool: """Check if the collector has completed processing. Returns ------- bool True if all jobs are complete, False otherwise. """ return self.results.complete
[docs] def wait_for_completion(self, timeout: float | None = None) -> bool: """Block until all registered job results are complete or until the timeout expires. Parameters ---------- timeout : float or None, optional Maximum number of seconds to wait. If None, waits indefinitely. Returns ------- bool True if the collector completed within the timeout, False otherwise. """ start_time = time.time() if self.results.wait_for_completion(timeout=timeout): remaining_time = timeout - (time.time() - start_time) if timeout else None return self._merged_results_ready_event.wait(timeout=remaining_time) return False
[docs] def get_jobs(self) -> dict[str, dict[str, list[JobResult]]]: """Return the jobs stored in the collector. Returns ------- Dict[str, Dict[str, List[JobResult]]] A dictionary mapping provider names to backend names and their respective job results. """ return self.results.get_jobs()
[docs] def get_results(self) -> dict[str, dict[str, list[Optional["ResultData"]]]]: """Return the results stored in the collector. Returns ------- Dict[str, Dict[str, List[Optional[ResultData]]]] A dictionary mapping provider names to backend names and their respective result data. """ return self.results.get_results()
[docs] def set_merged_results( self, merged_results: Any, # noqa: ANN401 initial_policy_data: dict[str, Any], final_policy_data: dict[str, Any], ) -> None: """Update the merged results, initial policy data, and final policy data. Parameters ---------- merged_results : Any The merged results to store. initial_policy_data : Dict[str, Any] The initial policy data to store. final_policy_data : Dict[str, Any] The final policy data to store. """ with self._lock.write(): self.merged_results = merged_results self.initial_policy_data = initial_policy_data self.final_policy_data = final_policy_data self._merged_results_ready_event.set()
[docs] def get_merged_results(self) -> Any: # noqa: ANN401 """Return the merged results. Returns ------- Any The merged results. """ with self._lock.read(): return self.merged_results
[docs] def get_initial_policy_data(self) -> dict[str, Any] | None: """Return the initial policy data. Returns ------- Optional[Dict[str, Any]] The initial policy data. """ with self._lock.read(): return self.initial_policy_data
[docs] def get_final_policy_data(self) -> dict[str, Any] | None: """Return the final policy data. Returns ------- Optional[Dict[str, Any]] The final policy data. """ with self._lock.read(): return self.final_policy_data
def __repr__(self) -> str: """Return a string representation of the MergedResultCollector. Returns ------- str A summary including the merged results and initial/final policy data. """ with self._lock.read(): if self.merged_results is not None: return ( f"MergedResultCollector(merged_results={self.merged_results}, " f"initial_policy_data={self.initial_policy_data}, " f"final_policy_data={self.final_policy_data})" ) # Snapshot jobs under proper locking via the inner collector jobs = self.results.get_jobs() total_jobs = sum(len(job_list) for backends in jobs.values() for job_list in backends.values()) complete_jobs = sum( int(job.complete) for backends in jobs.values() for job_list in backends.values() for job in job_list ) return ( f"MergedResultCollector(complete_jobs={complete_jobs}, " f"total_jobs={total_jobs}, complete={self.complete})" )