Skip to content

Progress Reporting

Long-running selection algorithms can take 10+ minutes on real-world pools. The library emits structured progress events while it runs so calling applications can show live feedback to users — a CLI spinner, a progress row in a database, a WebSocket push to a browser, anything.

Why this exists

Without progress events, an application calling run_stratification has to guess whether a long-running selection is healthy or wedged. Logging a logger.debug line is fine for the CLI tailing stdout, but unhelpful when the selection is running in a Celery worker that talks to a Flask UI via a database row.

The progress reporting API gives library callers a structured stream of events so they can route progress wherever it needs to go.

Quickstart

Pass any object satisfying the ProgressReporter protocol via the progress_reporter keyword argument:

from sortition_algorithms.core import run_stratification
from sortition_algorithms.progress_rich import RichProgressReporter

with RichProgressReporter() as reporter:
    success, panels, report = run_stratification(
        features=features,
        people=people,
        number_people_wanted=100,
        settings=settings,
        progress_reporter=reporter,
    )

If you don't pass anything, the library uses a no-op default and behaves exactly as before.

The protocol

class ProgressReporter(Protocol):
    def start_phase(
        self,
        name: str,
        total: int | None = None,
        *,
        message: str | None = None,
    ) -> None: ...

    def update(self, current: int, *, message: str | None = None) -> None: ...

    def end_phase(self) -> None: ...

Any class with these three methods works — Protocol means you don't have to inherit anything. The full reference lives in the Modules page under sortition_algorithms.progress.

Phases are flat: each start_phase implicitly ends the previous one. name is a stable machine-readable identifier; message is a human-readable description that the library always supplies. Use name if you want to switch on the phase to (e.g.) display different icons; use message directly if you just want to show text.

Phases emitted by the library

The phase names below are part of the public API: callers may switch on them to customise display. Adding new phases is non-breaking; renaming an existing one would be a breaking change.

name total When
legacy_attempt max_attempts each retry of the legacy algorithm
multiplicative_weights multiplicative_weights_rounds (typically 200) initial diverse-committee search (maximin/leximin/nash)
maximin_optimization None maximin convergence loop
nash_optimization None Nash convergence loop
leximin_outer people.count leximin's "fix probabilities" outer loop
diversimax None diversimax single-shot ILP — emitted once, no updates

A total of None means it's a convergence loop with no fixed work budget; current is then just an iteration counter.

Throttling

The library does not throttle. It calls update every iteration of the inner loop, which can be hundreds of times per second on a fast solver. That's fine for in-memory sinks but expensive for anything that talks to a database or the network. Throttling is the caller's responsibility.

The database recipe below shows the standard pattern: timestamp the last write, drop updates within min_interval_seconds of the previous one, but always flush phase transitions immediately so the UI sees them without lag.

Recipes

Database-row reporter (Flask + Celery), throttled to 1Hz

A long-running selection in a Celery worker writes its current phase and progress into a SortitionRun row. A Flask view polls that row to render a progress bar in the UI.

import time

from sortition_algorithms.progress import ProgressReporter
from myapp import db
from myapp.models import SortitionRun


class DatabaseProgressReporter:
    """Persist the latest progress event to a SortitionRun row.

    Throttles writes to at most once per ``min_interval_seconds``. Phase
    transitions always flush immediately so the UI sees them without lag.
    """

    def __init__(self, run_id: int, *, min_interval_seconds: float = 1.0) -> None:
        self.run_id = run_id
        self.min_interval = min_interval_seconds
        self._last_write = 0.0
        self._phase_name = ""
        self._phase_total: int | None = None

    def start_phase(self, name: str, total: int | None = None, *, message: str | None = None) -> None:
        self._phase_name = name
        self._phase_total = total
        self._write(current=0, message=message or name, force=True)

    def update(self, current: int, *, message: str | None = None) -> None:
        self._write(current=current, message=message)

    def end_phase(self) -> None:
        pass

    def _write(self, *, current: int, message: str | None, force: bool = False) -> None:
        now = time.monotonic()
        if not force and (now - self._last_write) < self.min_interval:
            return
        self._last_write = now
        SortitionRun.query.filter_by(id=self.run_id).update({
            "phase": self._phase_name,
            "progress_current": current,
            "progress_total": self._phase_total,
            "progress_message": message or "",
        })
        db.session.commit()

stdout reporter

For ad-hoc scripts, a five-line reporter that prints each event:

class StdoutProgressReporter:
    def start_phase(self, name, total=None, *, message=None):
        print(f"[{name}] {message or ''} (total={total})")

    def update(self, current, *, message=None):
        print(f"  {current}: {message or ''}")

    def end_phase(self):
        print("  done")

Composite (fan-out) reporter

Send every event to multiple sinks at once — for example, a database row and a stdout line in development:

from sortition_algorithms.progress import ProgressReporter


class CompositeProgressReporter:
    """Forward every event to multiple child reporters."""

    def __init__(self, *reporters: ProgressReporter) -> None:
        self._children = reporters

    def start_phase(self, name, total=None, *, message=None):
        for r in self._children:
            r.start_phase(name, total, message=message)

    def update(self, current, *, message=None):
        for r in self._children:
            r.update(current, message=message)

    def end_phase(self):
        for r in self._children:
            r.end_phase()

asyncio.Queue reporter for WebSockets / SSE

For async web frameworks pushing progress to a browser via WebSockets or Server-Sent Events. Drop the reporter into a synchronous worker thread and have the async side drain the queue:

import asyncio


class AsyncQueueProgressReporter:
    """Push every event into an asyncio.Queue.

    The reporter itself is sync (so it can be called from a worker thread
    inside the library), but it pushes thread-safely into the queue using
    ``loop.call_soon_threadsafe``.
    """

    def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop) -> None:
        self._queue = queue
        self._loop = loop

    def start_phase(self, name, total=None, *, message=None):
        self._push({"type": "start_phase", "name": name, "total": total, "message": message})

    def update(self, current, *, message=None):
        self._push({"type": "update", "current": current, "message": message})

    def end_phase(self):
        self._push({"type": "end_phase"})

    def _push(self, event: dict) -> None:
        self._loop.call_soon_threadsafe(self._queue.put_nowait, event)

The async side then awaits queue.get() and forwards each event to the WebSocket / SSE stream.

Caveats

Exceptions are swallowed but logged

If your reporter raises in any method, the library catches the exception and logs a warning to the sortition_algorithms logger — a buggy reporter must never kill a 10-minute selection. Tracebacks are included via logger.warning(..., exc_info=True).

This means you should still test your reporter, but you don't need to wrap every method in a try/except defensively.

Thread safety

The library is single-threaded and only calls a reporter from one thread at a time. If you drive a reporter from another thread (for example, reading its state from a Flask request thread while the selection runs in a Celery worker), you are responsible for any locking. The reporters bundled with the library — NullProgressReporter, ErrorSwallowingReporter, and RichProgressReporter — are safe to read concurrently from a single external thread, but custom reporters should add their own locks if they mutate state.

Adding new phases is fine; renaming them is not

Phase name values are part of the public API. New phases can land in any release; renaming an existing one would be a breaking change.