Skip to content

Modules

Adapters for loading and saving data.

Initially we have CSV files locally, and Google Docs Spreadsheets.

CSVFileDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
class CSVFileDataSource(AbstractDataSource):
    def __init__(
        self,
        *,
        features_file: Path,
        people_file: Path,
        already_selected_file: Path | None,
        selected_file: Path,
        remaining_file: Path,
    ) -> None:
        self.features_file = features_file
        self.people_file = people_file
        self.already_selected_file = already_selected_file
        self.selected_file = selected_file
        self.remaining_file = remaining_file

    @property
    def people_data_container(self) -> str:
        return f" CSV file '{self.people_file}'"

    @property
    def already_selected_data_container(self) -> str:
        return f" CSV file '{self.already_selected_file}'"

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_features_from_file", file_path=str(self.features_file))
        with open(self.features_file, newline="") as csv_file:
            feature_reader = csv.DictReader(csv_file, strict=True)
            assert feature_reader.fieldnames is not None
            yield list(feature_reader.fieldnames), feature_reader

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_people_from_file", file_path=str(self.people_file))
        with open(self.people_file, newline="") as csv_file:
            people_reader = csv.DictReader(csv_file, strict=True)
            assert people_reader.fieldnames is not None
            yield list(people_reader.fieldnames), people_reader

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        if self.already_selected_file is None or not self.already_selected_file.exists():
            report.add_message("no_already_selected_file")
            yield [], []
            return
        report.add_message("loading_already_selected_from_file", file_path=str(self.already_selected_file))
        with open(self.already_selected_file, newline="") as csv_file:
            already_selected_reader = csv.DictReader(csv_file, strict=True)
            assert already_selected_reader.fieldnames is not None
            yield list(already_selected_reader.fieldnames), already_selected_reader

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        report.add_message_and_log("writing_selected_csv", logging.INFO, file_path=self.selected_file)
        with open(self.selected_file, "w", newline="") as csv_file:
            _write_csv_rows(csv_file, selected)

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        report.add_message_and_log("writing_remaining_csv", logging.INFO, file_path=self.remaining_file)
        with open(self.remaining_file, "w", newline="") as csv_file:
            _write_csv_rows(csv_file, remaining)

    def highlight_dupes(self, dupes: list[int]) -> None:
        """Cannot highlight a CSV file"""

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading features from {self.features_file}",
            *[str(e) for e in error.all_errors],
        ])

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from {self.people_file}",
            *[str(e) for e in error.all_errors],
        ])

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading already selected people from {self.already_selected_file}",
            *[str(e) for e in error.all_errors],
        ])

highlight_dupes(dupes)

Cannot highlight a CSV file

Source code in src/sortition_algorithms/adapters.py
def highlight_dupes(self, dupes: list[int]) -> None:
    """Cannot highlight a CSV file"""

CSVStringDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
class CSVStringDataSource(AbstractDataSource):
    def __init__(self, features_data: str, people_data: str, already_selected_data: str = "") -> None:
        self.features_data = features_data
        self.people_data = people_data
        self.already_selected_data = already_selected_data
        self.selected_file = StringIO()
        self.remaining_file = StringIO()
        self.selected_file_written = False
        self.remaining_file_written = False

    @property
    def people_data_container(self) -> str:
        return "people CSV data"

    @property
    def already_selected_data_container(self) -> str:
        return "already selected CSV data"

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_features_from_string")
        feature_reader = csv.DictReader(StringIO(self.features_data), strict=True)
        assert feature_reader.fieldnames is not None
        yield list(feature_reader.fieldnames), feature_reader

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_people_from_string")
        people_reader = csv.DictReader(StringIO(self.people_data), strict=True)
        assert people_reader.fieldnames is not None
        yield list(people_reader.fieldnames), people_reader

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        if not self.already_selected_data or not self.already_selected_data.strip():
            report.add_message("no_already_selected_data")
            yield [], []
            return
        report.add_message("loading_already_selected_from_string")
        already_selected_reader = csv.DictReader(StringIO(self.already_selected_data), strict=True)
        assert already_selected_reader.fieldnames is not None
        yield list(already_selected_reader.fieldnames), already_selected_reader

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        _write_csv_rows(self.selected_file, selected)
        self.selected_file_written = True

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        _write_csv_rows(self.remaining_file, remaining)
        self.remaining_file_written = True

    def highlight_dupes(self, dupes: list[int]) -> None:
        """Cannot highlight a CSV file"""

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

highlight_dupes(dupes)

Cannot highlight a CSV file

Source code in src/sortition_algorithms/adapters.py
def highlight_dupes(self, dupes: list[int]) -> None:
    """Cannot highlight a CSV file"""

GSheetDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
class GSheetDataSource(AbstractDataSource):
    scope: ClassVar = [
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/drive",
    ]
    hl_light_blue: ClassVar = {
        "backgroundColor": {
            "red": 153 / 255,
            "green": 204 / 255,
            "blue": 255 / 255,
        }
    }
    hl_orange: ClassVar = {"backgroundColor": {"red": 5, "green": 2.5, "blue": 0}}

    def __init__(
        self, *, feature_tab_name: str, people_tab_name: str, already_selected_tab_name: str = "", auth_json_path: Path
    ) -> None:
        self.feature_tab_name = feature_tab_name
        self.people_tab_name = people_tab_name
        self.already_selected_tab_name = already_selected_tab_name
        self.auth_json_path = auth_json_path
        self._client: gspread.client.Client | None = None
        self._spreadsheet: gspread.Spreadsheet | None = None
        self.new_tab_default_size_rows = 2
        self.new_tab_default_size_cols = 40
        self._g_sheet_name = ""
        self._open_g_sheet_name = ""
        self.selected_tab_name = ""
        self.remaining_tab_name = ""
        self.tab_namer = GSheetTabNamer()
        self._report = RunReport()

    @property
    def people_data_container(self) -> str:
        return f"'{self.people_tab_name}' tab"

    @property
    def already_selected_data_container(self) -> str:
        return f"'{self.already_selected_tab_name}' tab"

    @property
    def client(self) -> gspread.client.Client:
        if self._client is None:
            creds = ServiceAccountCredentials.from_json_keyfile_name(str(self.auth_json_path), self.scope)
            # if we're getting rate limited, go slower!
            # by using the BackOffHTTPClient, that will sleep and retry
            # if it gets an error related to API usage rate limits.
            self._client = gspread.authorize(creds, http_client=gspread.BackOffHTTPClient)
        return self._client

    @property
    def spreadsheet(self) -> gspread.Spreadsheet:
        if self._open_g_sheet_name != self._g_sheet_name:
            # reset the spreadsheet if the name changed
            self._spreadsheet = None
            self.tab_namer.reset()
        if self._spreadsheet is None:
            if self._g_sheet_name.startswith("https://"):
                self._spreadsheet = self.client.open_by_url(self._g_sheet_name)
            else:
                self._spreadsheet = self.client.open(self._g_sheet_name)
            self._open_g_sheet_name = self._g_sheet_name
            self._report.add_message_and_log("opened_gsheet", logging.INFO, title=self._spreadsheet.title)
        return self._spreadsheet

    def _get_tab(self, tab_name: str) -> gspread.Worksheet | None:
        if not self._g_sheet_name:
            return None
        tab_list = self.spreadsheet.worksheets()
        try:
            return next(tab for tab in tab_list if tab.title == tab_name)
        except StopIteration:
            return None

    def _tab_exists(self, tab_name: str) -> bool:
        return bool(self._get_tab(tab_name))

    def _get_tab_titles(self) -> list[str]:
        if not self._g_sheet_name:
            return []
        return [tab.title for tab in self.spreadsheet.worksheets()]

    def _create_tab(self, tab_name: str) -> gspread.Worksheet:
        return self.spreadsheet.add_worksheet(
            title=tab_name,
            rows=self.new_tab_default_size_rows,
            cols=self.new_tab_default_size_cols,
        )

    def set_g_sheet_name(self, g_sheet_name: str) -> None:
        # if we're changing spreadsheet, reset the spreadsheet object
        if self._g_sheet_name != g_sheet_name:
            self._spreadsheet = None
            self._g_sheet_name = g_sheet_name
            self.tab_namer.reset()

    def get_title(self) -> str:
        try:
            return self.spreadsheet.title
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}."
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        try:
            if not self._tab_exists(self.feature_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.feature_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={"tab_name": self.feature_tab_name, "spreadsheet_title": self.spreadsheet.title},
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}."
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err
        tab_features = self.spreadsheet.worksheet(self.feature_tab_name)
        feature_head = tab_features.row_values(1)
        feature_body = _stringify_records(tab_features.get_all_records(expected_headers=[]))
        yield feature_head, feature_body

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        try:
            if not self._tab_exists(self.people_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.people_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={"tab_name": self.people_tab_name, "spreadsheet_title": self.spreadsheet.title},
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}. "
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

        tab_people = self.spreadsheet.worksheet(self.people_tab_name)
        # if we don't read this in here we can't check if there are 2 columns with the same name
        people_head = tab_people.row_values(1)
        # the numericise_ignore doesn't convert the phone numbers to ints...
        # 1 Oct 2024: the final argument with expected_headers is to deal with the fact that
        # updated versions of gspread can't cope with duplicate headers
        people_body = _stringify_records(
            tab_people.get_all_records(
                numericise_ignore=["all"],
                expected_headers=[],
            )
        )
        self._report.add_message("reading_gsheet_tab", tab_name=self.people_tab_name)
        yield people_head, people_body

    def _find_header_row(
        self, all_values: gspread.ValueRange | list[list[Any]], min_headers: int = 3
    ) -> tuple[int, list[str]]:
        """
        Find the first row that looks like a header row in the worksheet.

        A row is considered a header row if it has at least min_headers non-empty cells.
        This helps skip over title rows, empty rows, etc.

        Args:
            worksheet: The worksheet to search
            min_headers: Minimum number of non-empty cells to consider a row as headers (default: 3)

        Returns:
            Tuple of (row_number, header_values) where row_number is 1-indexed
        """
        # Find the first row with enough non-empty cells to be a header row
        for row_idx, row in enumerate(all_values, start=1):
            # Count non-empty cells in this row
            non_empty_cells = [cell.strip() for cell in row if cell and cell.strip()]
            if len(non_empty_cells) >= min_headers:
                return row_idx, list(row)

        # If no suitable header row found, return empty
        return 1, []

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        if not self.already_selected_tab_name:
            # If no tab name provided, return empty data
            self._report.add_message("no_already_selected_tab")
            yield [], []
            return

        try:
            if not self._tab_exists(self.already_selected_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.already_selected_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={
                        "tab_name": self.already_selected_tab_name,
                        "spreadsheet_title": self.spreadsheet.title,
                    },
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}. "
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

        tab_already_selected = self.spreadsheet.worksheet(self.already_selected_tab_name)

        # Get all values from the sheet
        all_values = tab_already_selected.get_all_values(value_render_option=ValueRenderOption.unformatted)

        # Find which row contains the headers
        header_row_num, already_selected_head = self._find_header_row(all_values)

        # If no header row found or it's empty, return empty data
        if not already_selected_head or not any(cell.strip() for cell in already_selected_head):
            self._report.add_line(
                f"Tab '{self.already_selected_tab_name}' is empty or has no valid header row, using empty data."
            )
            yield [], []
            return

        # the numericise_ignore doesn't convert the phone numbers to ints...
        # 1 Oct 2024: the final argument with expected_headers is to deal with the fact that
        # updated versions of gspread can't cope with duplicate headers
        already_selected_body = _stringify_records(
            tab_already_selected.get_all_records(
                head=header_row_num,
                numericise_ignore=["all"],
                expected_headers=[],
            )
        )
        self._report.add_message(
            "reading_already_selected_tab", tab_name=self.already_selected_tab_name, header_row=header_row_num
        )
        yield already_selected_head, already_selected_body

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        self.tab_namer.find_unused_tab_suffix(self._get_tab_titles())
        tab_selected = self._create_tab(self.tab_namer.selected_tab_name())
        report.add_message_and_log("writing_selected_tab", logging.INFO, tab_name=tab_selected.title)
        self.selected_tab_name = tab_selected.title
        tab_selected.update(selected)
        tab_selected.format("A1:U1", self.hl_light_blue)
        user_logger.info(f"Selected people written to {tab_selected.title} tab")

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        # the number is selected during write_selected(), so we reuse it here
        tab_remaining = self._create_tab(self.tab_namer.remaining_tab_name())
        report.add_message_and_log("writing_remaining_tab", logging.INFO, tab_name=tab_remaining.title)
        self.remaining_tab_name = tab_remaining.title
        tab_remaining.update(remaining)
        tab_remaining.format("A1:U1", self.hl_light_blue)

    def highlight_dupes(self, dupes: list[int]) -> None:
        if not dupes:
            return
        tab_remaining = self._get_tab(self.tab_namer.remaining_tab_name())
        assert tab_remaining is not None, "highlight_dupes() has been called without first calling write_remaining()"
        # note that the indexes we have produced start at 0, but the row indexes start at 1
        # so we need to add 1 to the indexes.
        row_strings = [f"A{index + 1}:U{index + 1}" for index in dupes]
        tab_remaining.format(row_strings, self.hl_orange)

    def delete_old_output_tabs(self, dry_run: bool = False) -> list[str]:
        """
        Find and delete all tabs with names starting with the tab stubs for selected or remaining

        Args:
            dry_run: If True, report what would be deleted without actually deleting.

        Returns:
            List of tab names that were deleted (or would be deleted in dry_run mode).
        """
        if not self._g_sheet_name:
            return []

        all_tabs = self.spreadsheet.worksheets()
        tabs_to_delete: list[gspread.Worksheet] = []

        for tab in all_tabs:
            if self.tab_namer.matches_stubs(tab.title):
                tabs_to_delete.append(tab)

        deleted_names: list[str] = []
        for tab in tabs_to_delete:
            deleted_names.append(tab.title)
            if not dry_run:
                self.spreadsheet.del_worksheet(tab)

        return deleted_names

    def _annotate_parse_errors_with_cell_names(self, error: ParseTableMultiError, headers: Sequence[str]) -> list[str]:
        msgs: list[str] = []
        for sub_error in error.all_errors:
            if isinstance(sub_error, ParseTableErrorMsg):
                cell_name = get_cell_name(sub_error.row, sub_error.key, headers)
                msgs.append(f"{sub_error.msg} - see cell {cell_name}")
            else:
                cell_names = [get_cell_name(sub_error.row, key, headers) for key in sub_error.keys]
                msgs.append(f"{sub_error.msg} - see cells {' '.join(cell_names)}")
        return msgs

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading features from '{self.feature_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from '{self.people_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from '{self.already_selected_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

delete_old_output_tabs(dry_run=False)

Find and delete all tabs with names starting with the tab stubs for selected or remaining

Parameters:

Name Type Description Default
dry_run bool

If True, report what would be deleted without actually deleting.

False

Returns:

Type Description
list[str]

List of tab names that were deleted (or would be deleted in dry_run mode).

Source code in src/sortition_algorithms/adapters.py
def delete_old_output_tabs(self, dry_run: bool = False) -> list[str]:
    """
    Find and delete all tabs with names starting with the tab stubs for selected or remaining

    Args:
        dry_run: If True, report what would be deleted without actually deleting.

    Returns:
        List of tab names that were deleted (or would be deleted in dry_run mode).
    """
    if not self._g_sheet_name:
        return []

    all_tabs = self.spreadsheet.worksheets()
    tabs_to_delete: list[gspread.Worksheet] = []

    for tab in all_tabs:
        if self.tab_namer.matches_stubs(tab.title):
            tabs_to_delete.append(tab)

    deleted_names: list[str] = []
    for tab in tabs_to_delete:
        deleted_names.append(tab.title)
        if not dry_run:
            self.spreadsheet.del_worksheet(tab)

    return deleted_names

generate_dupes(people_remaining_rows, people_selected_rows, settings, already_selected=None)

Generate a list of indexes of people who share an address with someone else in this set of rows.

Note that the first row of people_remaining_rows is the column headers. The indexes generated are for the rows in this table, so the index takes account of the first row being the header.

So if we had people_remaining_rows:

id,name,address_line_1,postcode 1,Alice,33 Acacia Avenue,W1A 1AA 1,Bob,31 Acacia Avenue,W1A 1AA 1,Charlotte,33 Acacia Avenue,W1A 1AA 1,David,33 Acacia Avenue,W1B 1BB

And settings with check_same_address_columns = ["address_line_1", "postcode"]

Then we should return [1, 3]

Source code in src/sortition_algorithms/adapters.py
def generate_dupes(  # noqa: C901
    people_remaining_rows: list[list[str]],
    people_selected_rows: list[list[str]],
    settings: Settings,
    already_selected: People | None = None,
) -> list[int]:
    """
    Generate a list of indexes of people who share an address with someone else in this set of rows.

    Note that the first row of people_remaining_rows is the column headers.  The indexes generated
    are for the rows in this table, so the index takes account of the first row being the header.

    So if we had people_remaining_rows:

    id,name,address_line_1,postcode
    1,Alice,33 Acacia Avenue,W1A 1AA
    1,Bob,31 Acacia Avenue,W1A 1AA
    1,Charlotte,33 Acacia Avenue,W1A 1AA
    1,David,33 Acacia Avenue,W1B 1BB

    And settings with `check_same_address_columns = ["address_line_1", "postcode"]`

    Then we should return [1, 3]
    """
    if not settings.check_same_address:
        return []

    table_col_names = people_remaining_rows[0]
    address_col_indexes: list[int] = [
        index for index, col in enumerate(table_col_names) if col in settings.check_same_address_columns
    ]

    def _address_from_row(person: list[str]) -> tuple[str, ...]:
        return tuple(col.lower() for col_index, col in enumerate(person) if col_index in address_col_indexes)

    # first, we assemble a dict with the key being the address, the value being the list of
    # indexes of people at that address
    address_remaining_index: dict[tuple[str, ...], list[int]] = defaultdict(list)
    for person_index, person in enumerate(people_remaining_rows[1:], start=1):  # skip the header row
        address_remaining_index[_address_from_row(person)].append(person_index)

    # now extract all those people where the number of people at their address is more than one
    dupes: set[int] = set()
    for persons_at_address in address_remaining_index.values():
        if len(persons_at_address) > 1:
            dupes.update(persons_at_address)

    # Now we assemble the list of all selected addresses.
    already_selected_addresses: set[tuple[str, ...]] = set()
    # First those selected in this round of selection
    for person in people_selected_rows[1:]:  # skip the header row
        already_selected_addresses.add(_address_from_row(person))
    # Then those previously selected (if supplied)
    if already_selected:
        for selected_key in already_selected:
            already_selected_addresses.add(
                already_selected.get_address(selected_key, settings.check_same_address_columns)
            )

    # and check if any of the remaining people has an address matching
    # those selected in this round or previous rounds
    for person_index, person in enumerate(people_remaining_rows[1:], start=1):  # skip the header row
        if _address_from_row(person) in already_selected_addresses:
            dupes.add(person_index)

    return sorted(dupes)

find_any_committee(features, people, number_people_wanted, check_same_address_columns)

Find any single feasible committee that satisfies the quotas.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns to check for same address, or empty list if not checking addresses.

required

Returns:

Type Description
tuple[list[frozenset[str]], RunReport]

tuple of (list containing one committee as frozenset of person_ids, empty report)

Raises:

Type Description
InfeasibleQuotasError

If quotas are infeasible

SelectionError

If solver fails for other reasons

Source code in src/sortition_algorithms/committee_generation/__init__.py
def find_any_committee(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], RunReport]:
    """Find any single feasible committee that satisfies the quotas.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns to check for same address, or empty list if
                                    not checking addresses.

    Returns:
        tuple of (list containing one committee as frozenset of person_ids, empty report)

    Raises:
        InfeasibleQuotasError: If quotas are infeasible
        SelectionError: If solver fails for other reasons
    """
    _, agent_vars = setup_committee_generation(features, people, number_people_wanted, check_same_address_columns)
    committee = ilp_results_to_committee(agent_vars)
    return [committee], RunReport()

find_distribution_leximin(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the third-lowest probability and so forth.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

Raises:

Type Description
RuntimeError

If Gurobi is not available

Source code in src/sortition_algorithms/committee_generation/leximin.py
def find_distribution_leximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected
    (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the
    third-lowest probability and so forth.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    Raises:
        RuntimeError: If Gurobi is not available
    """
    if not GUROBI_AVAILABLE:
        msg = "Leximin algorithm requires Gurobi solver which is not available"
        raise RuntimeError(msg, "gurobi_not_available", {})

    report = RunReport()
    report.add_message_and_log("using_leximin_algorithm", logging.INFO)
    grb.setParam("OutputFlag", 0)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_report = generate_initial_committees(
        new_committee_model, agent_vars, 3 * people.count
    )
    report.add_report(initial_report)

    # Run the main leximin optimization loop to fix agent probabilities
    fixed_probabilities = _run_leximin_main_loop(new_committee_model, agent_vars, committees, people, report)

    # Convert fixed agent probabilities to committee probabilities
    probabilities_normalised = _solve_leximin_primal_for_final_probabilities(committees, fixed_probabilities)

    return list(committees), probabilities_normalised, report

find_distribution_maximin(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings
Source code in src/sortition_algorithms/committee_generation/maximin.py
def find_distribution_maximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings
    """
    report = RunReport()
    report.add_message_and_log("using_maximin_algorithm", logging.INFO)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, init_report = generate_initial_committees(new_committee_model, agent_vars, people.count)
    report.add_report(init_report)

    # Set up the incremental LP model for column generation
    incremental_model, incr_agent_vars, upper_bound_var = _setup_maximin_incremental_model(committees, covered_agents)

    # Run the main optimization loop
    return _run_maximin_optimization_loop(
        new_committee_model,
        agent_vars,
        incremental_model,
        incr_agent_vars,
        upper_bound_var,
        committees,
        covered_agents,
        report,
    )

find_distribution_nash(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of selection probabilities over all persons.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.

Source code in src/sortition_algorithms/committee_generation/nash.py
def find_distribution_nash(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of
    selection probabilities over all persons.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing
    log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and
    this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.
    """
    report = RunReport()
    report.add_message_and_log("using_nash_algorithm", logging.INFO)

    # Set up an ILP used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that include every possible agent
    committee_set, covered_agents, initial_report = generate_initial_committees(
        new_committee_model, agent_vars, 2 * people.count
    )
    committees = list(committee_set)
    report.add_report(initial_report)

    # Map the covered agents to indices in a list for easier matrix representation
    entitlements, contributes_to_entitlement = _define_entitlements(covered_agents)

    # Run the main Nash welfare optimization loop
    return _run_nash_optimization_loop(
        new_committee_model,
        agent_vars,
        committees,
        entitlements,
        contributes_to_entitlement,
        covered_agents,
        number_people_wanted,
        report,
    )

find_random_sample_legacy(people, features, number_people_wanted, check_same_address_columns=None)

Legacy stratified random selection algorithm.

Implements the original algorithm that uses greedy selection based on priority ratios. Always selects from the most urgently needed category first (highest ratio of (min-selected)/remaining), then randomly picks within that category.

Parameters:

Name Type Description Default
people People

People collection

required
features FeatureCollection

Feature definitions with min/max targets

required
number_people_wanted int

Number of people to select

required
check_same_address_columns list[str] | None

Address columns for household identification, or empty if no address checking to be done.

None

Returns:

Type Description
list[frozenset[str]]

Tuple of (selected_committees, output_messages) where:

RunReport
  • selected_committees: List containing one frozenset of selected person IDs
tuple[list[frozenset[str]], RunReport]
  • report: report containing log messages about the selection process

Raises:

Type Description
SelectionError

If selection becomes impossible (not enough people, etc.)

Source code in src/sortition_algorithms/committee_generation/legacy.py
def find_random_sample_legacy(
    people: People,
    features: FeatureCollection,
    number_people_wanted: int,
    check_same_address_columns: list[str] | None = None,
) -> tuple[list[frozenset[str]], RunReport]:
    """
    Legacy stratified random selection algorithm.

    Implements the original algorithm that uses greedy selection based on priority ratios.
    Always selects from the most urgently needed category first (highest ratio of
    (min-selected)/remaining), then randomly picks within that category.

    Args:
        people: People collection
        features: Feature definitions with min/max targets
        number_people_wanted: Number of people to select
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        Tuple of (selected_committees, output_messages) where:
        - selected_committees: List containing one frozenset of selected person IDs
        - report: report containing log messages about the selection process

    Raises:
        SelectionError: If selection becomes impossible (not enough people, etc.)
    """
    report = RunReport()
    report.add_message("using_legacy_algorithm")
    people_selected: set[str] = set()

    # Create PeopleFeatures and initialize
    people_features = PeopleFeatures(people, features, check_same_address_columns or [])
    people_features.update_all_features_remaining()
    people_features.prune_for_feature_max_0()

    # Main selection loop
    for count in range(number_people_wanted):
        # Find the category with highest priority ratio
        try:
            ratio_result = people_features.find_max_ratio_category()
        except errors.SelectionError as e:
            msg = f"Selection failed on iteration {count + 1}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Find the randomly selected person within that category
        target_feature = ratio_result.feature_name
        target_value = ratio_result.feature_value
        random_position = ratio_result.random_person_index

        selected_person_key = people_features.people.find_person_by_position_in_category(
            target_feature, target_value, random_position
        )

        # Should never select the same person twice
        assert selected_person_key not in people_selected, f"Person {selected_person_key} was already selected"

        # Select the person (this also removes household members if configured)
        people_selected.add(selected_person_key)
        selected_person_data = people_features.people.get_person_dict(selected_person_key)
        household_members_removed = people_features.select_person(selected_person_key)

        # Add output messages about household member removal
        if household_members_removed:
            report.add_line(
                f"Selected {selected_person_key}, also removed household members: "
                f"{', '.join(household_members_removed)}"
            )

        # Handle any categories that are now full after this selection
        try:
            category_report = people_features.handle_category_full_deletions(selected_person_data)
            report.add_report(category_report)
        except errors.SelectionError as e:
            msg = f"Selection failed after selecting {selected_person_key}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Check if we're about to run out of people (but not on the last iteration)
        if count < (number_people_wanted - 1) and people_features.people.count == 0:
            msg = "Selection failed: Ran out of people before completing selection"
            raise errors.RetryableSelectionError(msg)

    # Return in legacy format: list containing single frozenset
    return [frozenset(people_selected)], report

standardize_distribution(committees, probabilities)

Remove committees with zero probability and renormalize.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities

required

Returns:

Type Description
tuple[list[frozenset[str]], list[float]]

tuple of (filtered_committees, normalized_probabilities)

Source code in src/sortition_algorithms/committee_generation/__init__.py
def standardize_distribution(
    committees: list[frozenset[str]],
    probabilities: list[float],
) -> tuple[list[frozenset[str]], list[float]]:
    """Remove committees with zero probability and renormalize.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities

    Returns:
        tuple of (filtered_committees, normalized_probabilities)
    """
    assert len(committees) == len(probabilities)
    new_committees = []
    new_probabilities = []
    for committee, prob in zip(committees, probabilities, strict=False):
        if prob >= EPS2:
            new_committees.append(committee)
            new_probabilities.append(prob)
    prob_sum = sum(new_probabilities)
    new_probabilities = [prob / prob_sum for prob in new_probabilities]
    return new_committees, new_probabilities

generate_initial_committees(new_committee_model, agent_vars, multiplicative_weights_rounds)

To speed up the main iteration of the maximin and Nash algorithms, start from a diverse set of feasible committees. In particular, each agent that can be included in any committee will be included in at least one of these committees.

Parameters:

Name Type Description Default
new_committee_model Model

MIP model for finding committees

required
agent_vars dict[str, Var]

dict mapping agent_id to binary MIP variables

required
multiplicative_weights_rounds int

number of rounds for the multiplicative weights phase

required

Returns:

Type Description
set[frozenset[str]]

tuple of (committees, covered_agents, output_lines)

frozenset[str]
  • committees: set of feasible committees discovered
RunReport
  • covered_agents: frozenset of all agents included in some committee
tuple[set[frozenset[str]], frozenset[str], RunReport]
  • report: run report
tuple[set[frozenset[str]], frozenset[str], RunReport]
  • output_lines: list of debug messages
Source code in src/sortition_algorithms/committee_generation/common.py
def generate_initial_committees(
    new_committee_model: mip.model.Model,
    agent_vars: dict[str, mip.entities.Var],
    multiplicative_weights_rounds: int,
) -> tuple[set[frozenset[str]], frozenset[str], RunReport]:
    """To speed up the main iteration of the maximin and Nash algorithms, start from a diverse set of feasible
    committees. In particular, each agent that can be included in any committee will be included in at least one of
    these committees.

    Args:
        new_committee_model: MIP model for finding committees
        agent_vars: dict mapping agent_id to binary MIP variables
        multiplicative_weights_rounds: number of rounds for the multiplicative weights phase

    Returns:
        tuple of (committees, covered_agents, output_lines)
        - committees: set of feasible committees discovered
        - covered_agents: frozenset of all agents included in some committee
        - report: run report
        - output_lines: list of debug messages
    """
    report = RunReport()

    # Phase 1: Use multiplicative weights algorithm to find diverse committees
    committees, covered_agents = _run_multiplicative_weights_phase(
        new_committee_model, agent_vars, multiplicative_weights_rounds
    )

    # Phase 2: Find committees for any agents not yet covered
    additional_committees, covered_agents, coverage_report = _find_committees_for_uncovered_agents(
        new_committee_model, agent_vars, covered_agents
    )
    committees.update(additional_committees)
    report.add_report(coverage_report)

    # Validation and final output
    assert len(committees) >= 1  # We assume quotas are feasible at this stage

    if len(covered_agents) == len(agent_vars):
        report.add_message_and_log("all_agents_in_feasible_committees", logging.INFO)

    return committees, frozenset(covered_agents), report

ilp_results_to_committee(variables)

Extract the selected committee from ILP solver variables.

Parameters:

Name Type Description Default
variables dict[str, Var]

dict mapping person_id to binary MIP variables

required

Returns:

Type Description
frozenset[str]

frozenset of person_ids who are selected (have variable value > 0.5)

Raises:

Type Description
ValueError

If variables don't have values (solver failed)

Source code in src/sortition_algorithms/committee_generation/common.py
def ilp_results_to_committee(variables: dict[str, mip.entities.Var]) -> frozenset[str]:
    """Extract the selected committee from ILP solver variables.

    Args:
        variables: dict mapping person_id to binary MIP variables

    Returns:
        frozenset of person_ids who are selected (have variable value > 0.5)

    Raises:
        ValueError: If variables don't have values (solver failed)
    """
    try:
        committee = frozenset(person_id for person_id in variables if variables[person_id].x > 0.5)
    # unfortunately, MIP sometimes throws generic Exceptions rather than a subclass
    except Exception as error:
        msg = f"It seems like some variables do not have a value. Original exception: {error}."
        raise ValueError(msg, "variables_without_value", {"error": str(error)}) from error

    return committee

setup_committee_generation(features, people, number_people_wanted, check_same_address_columns)

Set up the integer linear program for committee generation.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns to check for same address, or empty list if not checking addresses.

required

Returns:

Type Description
tuple[Model, dict[str, Var]]

tuple of (MIP model, dict mapping person_id to binary variables)

Raises:

Type Description
InfeasibleQuotasError

If quotas are infeasible, includes suggested relaxations

SelectionError

If solver fails for other reasons

Source code in src/sortition_algorithms/committee_generation/common.py
def setup_committee_generation(
    features: FeatureCollection, people: People, number_people_wanted: int, check_same_address_columns: list[str]
) -> tuple[mip.model.Model, dict[str, mip.entities.Var]]:
    """Set up the integer linear program for committee generation.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns to check for same address, or empty list if
                                    not checking addresses.

    Returns:
        tuple of (MIP model, dict mapping person_id to binary variables)

    Raises:
        InfeasibleQuotasError: If quotas are infeasible, includes suggested relaxations
        SelectionError: If solver fails for other reasons
    """
    model = mip.Model(sense=mip.MAXIMIZE)
    model.verbose = 0  # TODO: get debug level from settings

    # Binary variable for each person (selected/not selected)
    agent_vars = {person_id: model.add_var(var_type=mip.BINARY) for person_id in people}

    # Must select exactly the desired number of people
    model.add_constr(mip.xsum(agent_vars.values()) == number_people_wanted)

    # Respect min/max quotas for each feature value
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
        # Count people with this feature-value who are selected
        number_feature_value_agents = mip.xsum(
            agent_vars[person_id]
            for person_id, person_data in people.items()
            if person_data[feature_name].lower() == fvalue_name.lower()
        )

        # Add min/max constraints
        model.add_constr(number_feature_value_agents >= fv_minmax.min)
        model.add_constr(number_feature_value_agents <= fv_minmax.max)

    # Household constraints: at most 1 person per household
    if check_same_address_columns:
        for housemates in people.households(check_same_address_columns).values():
            if len(housemates) > 1:
                model.add_constr(mip.xsum(agent_vars[member_id] for member_id in housemates) <= 1)

    # Test feasibility by optimizing once
    status = model.optimize()
    if status == mip.OptimizationStatus.INFEASIBLE:
        relaxed_features, output_lines = _relax_infeasible_quotas(
            features, people, number_people_wanted, check_same_address_columns
        )
        raise errors.InfeasibleQuotasError(relaxed_features, output_lines)
    if status != mip.OptimizationStatus.OPTIMAL:
        msg = (
            f"No feasible committees found, solver returns code {status} (see "
            "https://docs.python-mip.com/en/latest/classes.html#optimizationstatus)."
        )
        raise errors.SelectionError(msg)

    return model, agent_vars

Selection algorithms for stratified sampling.

find_random_sample_legacy(people, features, number_people_wanted, check_same_address_columns=None)

Legacy stratified random selection algorithm.

Implements the original algorithm that uses greedy selection based on priority ratios. Always selects from the most urgently needed category first (highest ratio of (min-selected)/remaining), then randomly picks within that category.

Parameters:

Name Type Description Default
people People

People collection

required
features FeatureCollection

Feature definitions with min/max targets

required
number_people_wanted int

Number of people to select

required
check_same_address_columns list[str] | None

Address columns for household identification, or empty if no address checking to be done.

None

Returns:

Type Description
list[frozenset[str]]

Tuple of (selected_committees, output_messages) where:

RunReport
  • selected_committees: List containing one frozenset of selected person IDs
tuple[list[frozenset[str]], RunReport]
  • report: report containing log messages about the selection process

Raises:

Type Description
SelectionError

If selection becomes impossible (not enough people, etc.)

Source code in src/sortition_algorithms/committee_generation/legacy.py
def find_random_sample_legacy(
    people: People,
    features: FeatureCollection,
    number_people_wanted: int,
    check_same_address_columns: list[str] | None = None,
) -> tuple[list[frozenset[str]], RunReport]:
    """
    Legacy stratified random selection algorithm.

    Implements the original algorithm that uses greedy selection based on priority ratios.
    Always selects from the most urgently needed category first (highest ratio of
    (min-selected)/remaining), then randomly picks within that category.

    Args:
        people: People collection
        features: Feature definitions with min/max targets
        number_people_wanted: Number of people to select
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        Tuple of (selected_committees, output_messages) where:
        - selected_committees: List containing one frozenset of selected person IDs
        - report: report containing log messages about the selection process

    Raises:
        SelectionError: If selection becomes impossible (not enough people, etc.)
    """
    report = RunReport()
    report.add_message("using_legacy_algorithm")
    people_selected: set[str] = set()

    # Create PeopleFeatures and initialize
    people_features = PeopleFeatures(people, features, check_same_address_columns or [])
    people_features.update_all_features_remaining()
    people_features.prune_for_feature_max_0()

    # Main selection loop
    for count in range(number_people_wanted):
        # Find the category with highest priority ratio
        try:
            ratio_result = people_features.find_max_ratio_category()
        except errors.SelectionError as e:
            msg = f"Selection failed on iteration {count + 1}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Find the randomly selected person within that category
        target_feature = ratio_result.feature_name
        target_value = ratio_result.feature_value
        random_position = ratio_result.random_person_index

        selected_person_key = people_features.people.find_person_by_position_in_category(
            target_feature, target_value, random_position
        )

        # Should never select the same person twice
        assert selected_person_key not in people_selected, f"Person {selected_person_key} was already selected"

        # Select the person (this also removes household members if configured)
        people_selected.add(selected_person_key)
        selected_person_data = people_features.people.get_person_dict(selected_person_key)
        household_members_removed = people_features.select_person(selected_person_key)

        # Add output messages about household member removal
        if household_members_removed:
            report.add_line(
                f"Selected {selected_person_key}, also removed household members: "
                f"{', '.join(household_members_removed)}"
            )

        # Handle any categories that are now full after this selection
        try:
            category_report = people_features.handle_category_full_deletions(selected_person_data)
            report.add_report(category_report)
        except errors.SelectionError as e:
            msg = f"Selection failed after selecting {selected_person_key}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Check if we're about to run out of people (but not on the last iteration)
        if count < (number_people_wanted - 1) and people_features.people.count == 0:
            msg = "Selection failed: Ran out of people before completing selection"
            raise errors.RetryableSelectionError(msg)

    # Return in legacy format: list containing single frozenset
    return [frozenset(people_selected)], report

find_distribution_leximin(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the third-lowest probability and so forth.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

Raises:

Type Description
RuntimeError

If Gurobi is not available

Source code in src/sortition_algorithms/committee_generation/leximin.py
def find_distribution_leximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected
    (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the
    third-lowest probability and so forth.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    Raises:
        RuntimeError: If Gurobi is not available
    """
    if not GUROBI_AVAILABLE:
        msg = "Leximin algorithm requires Gurobi solver which is not available"
        raise RuntimeError(msg, "gurobi_not_available", {})

    report = RunReport()
    report.add_message_and_log("using_leximin_algorithm", logging.INFO)
    grb.setParam("OutputFlag", 0)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_report = generate_initial_committees(
        new_committee_model, agent_vars, 3 * people.count
    )
    report.add_report(initial_report)

    # Run the main leximin optimization loop to fix agent probabilities
    fixed_probabilities = _run_leximin_main_loop(new_committee_model, agent_vars, committees, people, report)

    # Convert fixed agent probabilities to committee probabilities
    probabilities_normalised = _solve_leximin_primal_for_final_probabilities(committees, fixed_probabilities)

    return list(committees), probabilities_normalised, report

find_distribution_maximin(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings
Source code in src/sortition_algorithms/committee_generation/maximin.py
def find_distribution_maximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings
    """
    report = RunReport()
    report.add_message_and_log("using_maximin_algorithm", logging.INFO)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, init_report = generate_initial_committees(new_committee_model, agent_vars, people.count)
    report.add_report(init_report)

    # Set up the incremental LP model for column generation
    incremental_model, incr_agent_vars, upper_bound_var = _setup_maximin_incremental_model(committees, covered_agents)

    # Run the main optimization loop
    return _run_maximin_optimization_loop(
        new_committee_model,
        agent_vars,
        incremental_model,
        incr_agent_vars,
        upper_bound_var,
        committees,
        covered_agents,
        report,
    )

find_distribution_nash(features, people, number_people_wanted, check_same_address_columns)

Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of selection probabilities over all persons.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.

Source code in src/sortition_algorithms/committee_generation/nash.py
def find_distribution_nash(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of
    selection probabilities over all persons.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing
    log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and
    this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.
    """
    report = RunReport()
    report.add_message_and_log("using_nash_algorithm", logging.INFO)

    # Set up an ILP used for discovering new feasible committees
    new_committee_model, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns
    )

    # Find initial committees that include every possible agent
    committee_set, covered_agents, initial_report = generate_initial_committees(
        new_committee_model, agent_vars, 2 * people.count
    )
    committees = list(committee_set)
    report.add_report(initial_report)

    # Map the covered agents to indices in a list for easier matrix representation
    entitlements, contributes_to_entitlement = _define_entitlements(covered_agents)

    # Run the main Nash welfare optimization loop
    return _run_nash_optimization_loop(
        new_committee_model,
        agent_vars,
        committees,
        entitlements,
        contributes_to_entitlement,
        covered_agents,
        number_people_wanted,
        report,
    )

find_random_sample(features, people, number_people_wanted, check_same_address_columns, selection_algorithm='maximin', test_selection=False, number_selections=1)

Main algorithm to find one or multiple random committees.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns for the address to check, or empty list if no check required

required
selection_algorithm str

one of "legacy", "maximin", "leximin", or "nash"

'maximin'
test_selection bool

if set, do not do a random selection, but just return some valid panel. Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!

False
number_selections int

how many panels to return. Most of the time, this should be set to 1, which means that a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n, containing multiple panels (some panels might be repeated in the list). In this case the eventual panel should be drawn uniformly at random from the returned list.

1

Returns:

Type Description
list[frozenset[str]]

tuple of (committee_lottery, report)

RunReport
  • committee_lottery: list of committees, where each committee is a frozen set of pool member ids
tuple[list[frozenset[str]], RunReport]
  • report: report with debug strings

Raises:

Type Description
InfeasibleQuotasError

if the quotas cannot be satisfied, which includes a suggestion for how to modify them

SelectionError

in multiple other failure cases

ValueError

for invalid parameters

RuntimeError

if required solver is not available

Source code in src/sortition_algorithms/core.py
def find_random_sample(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    selection_algorithm: str = "maximin",
    test_selection: bool = False,
    number_selections: int = 1,
) -> tuple[list[frozenset[str]], RunReport]:
    """Main algorithm to find one or multiple random committees.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns for the address to check, or empty list if no check required
        selection_algorithm: one of "legacy", "maximin", "leximin", or "nash"
        test_selection: if set, do not do a random selection, but just return some valid panel.
            Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!
        number_selections: how many panels to return. Most of the time, this should be set to 1, which means that
            a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n,
            containing multiple panels (some panels might be repeated in the list). In this case the eventual panel
            should be drawn uniformly at random from the returned list.

    Returns:
        tuple of (committee_lottery, report)
        - committee_lottery: list of committees, where each committee is a frozen set of pool member ids
        - report: report with debug strings

    Raises:
        InfeasibleQuotasError: if the quotas cannot be satisfied, which includes a suggestion for how to modify them
        SelectionError: in multiple other failure cases
        ValueError: for invalid parameters
        RuntimeError: if required solver is not available
    """
    # Input validation
    if test_selection and number_selections != 1:
        msg = (
            "Running the test selection does not support generating a transparent lottery, so, if "
            "`test_selection` is true, `number_selections` must be 1."
        )
        raise ValueError(msg, "test_selection_multiple_selections", {})

    if selection_algorithm == "legacy" and number_selections != 1:
        msg = (
            "Currently, the legacy algorithm does not support generating a transparent lottery, "
            "so `number_selections` must be set to 1."
        )
        raise ValueError(msg, "legacy_multiple_selections", {})

    # Quick test selection using find_any_committee
    if test_selection:
        logger.info("Running test selection.")
        return find_any_committee(features, people, number_people_wanted, check_same_address_columns)

    report = RunReport()

    # Check if Gurobi is available for leximin
    if selection_algorithm == "leximin" and not GUROBI_AVAILABLE:
        report.add_message("gurobi_unavailable_switching")
        selection_algorithm = "maximin"

    # Route to appropriate algorithm
    if selection_algorithm == "legacy":
        return find_random_sample_legacy(
            people,
            features,
            number_people_wanted,
            check_same_address_columns,
        )
    elif selection_algorithm == "leximin":
        committees, probabilities, new_report = find_distribution_leximin(
            features, people, number_people_wanted, check_same_address_columns
        )
    elif selection_algorithm == "maximin":
        committees, probabilities, new_report = find_distribution_maximin(
            features, people, number_people_wanted, check_same_address_columns
        )
    elif selection_algorithm == "nash":
        committees, probabilities, new_report = find_distribution_nash(
            features, people, number_people_wanted, check_same_address_columns
        )
    else:
        msg = (
            f"Unknown selection algorithm {selection_algorithm!r}, must be either 'legacy', 'leximin', "
            f"'maximin', or 'nash'."
        )
        raise ValueError(msg, "unknown_selection_algorithm", {"algorithm": selection_algorithm})

    report.add_report(new_report)

    # Post-process the distribution
    committees, probabilities = standardize_distribution(committees, probabilities)
    if len(committees) > people.count:
        report.add_message_and_log(
            "basic_solution_warning",
            logging.WARNING,
            algorithm=selection_algorithm,
            num_panels=len(committees),
            num_agents=people.count,
            min_probs=min(probabilities),
        )

    assert len(set(committees)) == len(committees)

    stats_report = _distribution_stats(people, committees, probabilities)
    report.add_report(stats_report)

    # Convert to lottery
    committee_lottery = lottery_rounding(committees, probabilities, number_selections)

    return committee_lottery, report

lottery_rounding(committees, probabilities, number_selections)

Convert probability distribution over committees to a discrete lottery.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities (must sum to 1)

required
number_selections int

number of committees to return

required

Returns:

Type Description
list[frozenset[str]]

list of committees (may contain duplicates) of length number_selections

Source code in src/sortition_algorithms/core.py
def lottery_rounding(
    committees: list[frozenset[str]],
    probabilities: list[float],
    number_selections: int,
) -> list[frozenset[str]]:
    """Convert probability distribution over committees to a discrete lottery.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities (must sum to 1)
        number_selections: number of committees to return

    Returns:
        list of committees (may contain duplicates) of length number_selections
    """
    assert len(committees) == len(probabilities)
    assert number_selections >= 1

    num_copies: list[int] = []
    residuals: list[float] = []
    for _, prob in zip(committees, probabilities, strict=False):
        scaled_prob = prob * number_selections
        num_copies.append(int(scaled_prob))  # give lower quotas
        residuals.append(scaled_prob - int(scaled_prob))

    rounded_up_indices = pipage_rounding(list(enumerate(residuals)))
    for committee_index in rounded_up_indices:
        num_copies[committee_index] += 1

    committee_lottery: list[frozenset[str]] = []
    for committee, committee_copies in zip(committees, num_copies, strict=False):
        committee_lottery += [committee for _ in range(committee_copies)]

    return committee_lottery

pipage_rounding(marginals)

Pipage rounding algorithm for converting fractional solutions to integer solutions.

Takes a list of (object, probability) pairs and randomly rounds them to a set of objects such that the expected number of times each object appears equals its probability.

Parameters:

Name Type Description Default
marginals list[tuple[int, float]]

list of (object, probability) pairs where probabilities sum to an integer

required

Returns:

Type Description
list[int]

list of objects that were selected

Source code in src/sortition_algorithms/core.py
def pipage_rounding(marginals: list[tuple[int, float]]) -> list[int]:
    """Pipage rounding algorithm for converting fractional solutions to integer solutions.

    Takes a list of (object, probability) pairs and randomly rounds them to a set of objects
    such that the expected number of times each object appears equals its probability.

    Args:
        marginals: list of (object, probability) pairs where probabilities sum to an integer

    Returns:
        list of objects that were selected
    """
    assert all(0.0 <= p <= 1.0 for _, p in marginals)

    outcomes: list[int] = []
    while True:
        if len(marginals) == 0:
            return outcomes
        if len(marginals) == 1:
            obj, prob = marginals[0]
            if random_provider().uniform(0.0, 1.0) < prob:
                outcomes.append(obj)
            marginals = []
        else:
            obj0, prob0 = marginals[0]
            if prob0 > 1.0 - EPS2:
                outcomes.append(obj0)
                marginals = marginals[1:]
                continue
            if prob0 < EPS2:
                marginals = marginals[1:]
                continue

            obj1, prob1 = marginals[1]
            if prob1 > 1.0 - EPS2:
                outcomes.append(obj1)
                marginals = [marginals[0]] + marginals[2:]
                continue
            if prob1 < EPS2:
                marginals = [marginals[0]] + marginals[2:]
                continue

            inc0_dec1_amount = min(
                1.0 - prob0, prob1
            )  # maximal amount that prob0 can be increased and prob1 can be decreased
            dec0_inc1_amount = min(prob0, 1.0 - prob1)
            choice_probability = dec0_inc1_amount / (inc0_dec1_amount + dec0_inc1_amount)

            if random_provider().uniform(0.0, 1.0) < choice_probability:  # increase prob0 and decrease prob1
                prob0 += inc0_dec1_amount
                prob1 -= inc0_dec1_amount
            else:
                prob0 -= dec0_inc1_amount
                prob1 += dec0_inc1_amount
            marginals = [(obj0, prob0), (obj1, prob1)] + marginals[2:]

run_stratification(features, people, number_people_wanted, settings, *, test_selection=False, number_selections=1, already_selected=None)

Run stratified random selection with retry logic.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas for each feature value

required
people People

People object containing the pool of candidates

required
number_people_wanted int

Desired size of the panel

required
settings Settings

Settings object containing configuration

required
test_selection bool

If True, don't randomize (for testing only)

False
number_selections int

Number of panels to return (default: 1)

1
already_selected People | None

People who have already been selected (optional)

None

Returns:

Type Description
bool

Tuple of (success, selected_committees, report)

list[frozenset[str]]
  • success: Whether selection succeeded within max attempts
RunReport
  • selected_committees: List of committees (frozensets of person IDs)
tuple[bool, list[frozenset[str]], RunReport]
  • report: contains debug and status messages

Raises:

Type Description
Exception

If number_people_wanted is outside valid range for any feature

ValueError

For invalid parameters

RuntimeError

If required solver is not available

InfeasibleQuotasError

If quotas cannot be satisfied

Source code in src/sortition_algorithms/core.py
def run_stratification(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
    *,
    test_selection: bool = False,
    number_selections: int = 1,
    already_selected: People | None = None,
) -> tuple[bool, list[frozenset[str]], RunReport]:
    """Run stratified random selection with retry logic.

    Args:
        features: FeatureCollection with min/max quotas for each feature value
        people: People object containing the pool of candidates
        number_people_wanted: Desired size of the panel
        settings: Settings object containing configuration
        test_selection: If True, don't randomize (for testing only)
        number_selections: Number of panels to return (default: 1)
        already_selected: People who have already been selected (optional)

    Returns:
        Tuple of (success, selected_committees, report)
        - success: Whether selection succeeded within max attempts
        - selected_committees: List of committees (frozensets of person IDs)
        - report: contains debug and status messages

    Raises:
        Exception: If number_people_wanted is outside valid range for any feature
        ValueError: For invalid parameters
        RuntimeError: If required solver is not available
        InfeasibleQuotasError: If quotas cannot be satisfied
    """
    success = False
    report = RunReport()
    people_selected: list[frozenset[str]] = []

    try:
        working_people = exclude_matching_selected_addresses(people, already_selected, settings)
        dropped_count = people.count - working_people.count
        if dropped_count:
            report.add_line_and_log(
                f"Dropped {dropped_count} people who have an address matching a selected person.", logging.INFO
            )
        # Check if desired number is within feature constraints
        check_desired(features, number_people_wanted)
        check_enough_people_for_every_feature_value(features, working_people)
    except errors.SelectionError as error:
        report.add_error(error)
        return False, people_selected, report

    # Set random seed if specified
    # If the seed is zero or None, we use the secrets module, as it is better
    # from a security point of view
    set_random_provider(settings.random_number_seed)

    if test_selection:
        report.add_message("test_selection_warning", ReportLevel.CRITICAL)

    report.add_message("initial_state", ReportLevel.IMPORTANT)
    report.add_report(_initial_category_info_table(features, working_people))

    tries = 0
    for tries in range(settings.max_attempts):
        people_selected = []

        report.add_message_and_log("trial_number", logging.WARNING, trial=tries + 1)

        try:
            people_selected, new_report = find_random_sample(
                features,
                working_people,
                number_people_wanted,
                settings.normalised_address_columns,
                settings.selection_algorithm,
                test_selection,
                number_selections,
            )
            report.add_report(new_report)

            # Check if targets were met (only works for number_selections = 1)
            # This raises an error if we did not select properly
            _check_category_selected(features, working_people, people_selected, number_selections)

            report.add_message("selection_success", ReportLevel.IMPORTANT)
            report.add_report(_category_info_table(features, working_people, people_selected, number_people_wanted))
            success = True
            break

        except errors.SelectionError as serr:
            if serr.is_retryable:
                report.add_error(serr, is_fatal=False)
                report.add_message("retry_after_error", error=str(serr))
                # we do not break here, we try again.
            else:
                report.add_error(serr)
                break
        # these are all fatal errors
        except (ValueError, RuntimeError, errors.InfeasibleQuotasCantRelaxError) as err:
            report.add_error(err)
            break

    if not success:
        report.add_message("selection_failed", ReportLevel.IMPORTANT, attempts=tries + 1)

    return success, people_selected, report

selected_remaining_tables(full_people, people_selected, features, settings, already_selected=None, exclude_matching_addresses=True)

write some text

people_selected is a single frozenset[str] - it must be unwrapped before being passed to this function.

Source code in src/sortition_algorithms/core.py
def selected_remaining_tables(
    full_people: People,
    people_selected: frozenset[str],
    features: FeatureCollection,
    settings: Settings,
    already_selected: People | None = None,
    exclude_matching_addresses: bool = True,
) -> tuple[list[list[str]], list[list[str]], list[str]]:
    """
    write some text

    people_selected is a single frozenset[str] - it must be unwrapped before being passed
    to this function.
    """
    people_working = deepcopy(full_people)
    output_lines: list[str] = []

    # this function only reads from people, so pass in full_people
    people_selected_rows = person_list_to_table(people_selected, full_people, features, settings)

    # now delete the selected people
    for pkey in people_selected:
        people_working.remove(pkey)

    # now delete the people at the same address as a selected person
    # TODO: consider retiring this code once we use already_selected everywhere
    # and then also drop exclude_matching_addresses variable
    num_same_address_deleted = 0
    if (
        exclude_matching_addresses
        and settings.check_same_address
        and (already_selected is None or not already_selected.count)
    ):
        for pkey in people_selected:
            pkey_to_delete = list(full_people.matching_address(pkey, settings.check_same_address_columns))
            num_same_address_deleted += len(pkey_to_delete)
            # then delete this/these people at the same address from the reserve/remaining pool
            people_working.remove_many(pkey_to_delete)

    # add the columns to keep into remaining people
    # as above all these values are all in people_working but this is tidier...
    # this function only reads from people, so pass in full_people
    people_remaining_rows = person_list_to_table(people_working, full_people, features, settings)
    return people_selected_rows, people_remaining_rows, output_lines

BadDataError

Bases: SortitionBaseError

Error for when bad data is found while reading things in

Source code in src/sortition_algorithms/errors.py
class BadDataError(SortitionBaseError):
    """Error for when bad data is found while reading things in"""

InfeasibleQuotasCantRelaxError

Bases: SortitionBaseError

The quotas can't be met, and no feasible relaxation was found

Source code in src/sortition_algorithms/errors.py
class InfeasibleQuotasCantRelaxError(SortitionBaseError):
    """The quotas can't be met, and no feasible relaxation was found"""

InfeasibleQuotasError

Bases: SelectionMultilineError

The quotas can't be met, and a feasible relaxation was found.

The details of what relaxations are recommended are included in the error.

Source code in src/sortition_algorithms/errors.py
class InfeasibleQuotasError(SelectionMultilineError):
    """
    The quotas can't be met, and a feasible relaxation was found.

    The details of what relaxations are recommended are included in the error.
    """

    def __init__(self, features: "FeatureCollection", output: list[str]) -> None:
        self.features = features
        super().__init__(lines=["The quotas are infeasible:", *output])

ParseErrorsCollector

Class that we can add errors to, but errors with empty messages will be dropped

Source code in src/sortition_algorithms/errors.py
class ParseErrorsCollector:
    """Class that we can add errors to, but errors with empty messages will be dropped"""

    def __init__(self) -> None:
        self.errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] = []

    def __len__(self) -> int:
        """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
        return len(self.errors)

    def add(
        self,
        msg: str,
        key: str,
        value: str,
        row: int,
        row_name: str,
        error_code: str = "",
        error_params: dict[str, str] | dict[str, str | int] | None = None,
    ) -> None:
        if msg:
            self.errors.append(
                ParseTableErrorMsg(
                    row=row,
                    row_name=row_name,
                    key=key,
                    value=value,
                    msg=msg,
                    error_code=error_code,
                    error_params=error_params or {},
                )
            )

    def add_multi_value(
        self,
        msg: str,
        keys: list[str],
        values: list[str],
        row: int,
        row_name: str,
        error_code: str = "",
        error_params: dict[str, str | int] | None = None,
    ) -> None:
        if msg:
            self.errors.append(
                ParseTableMultiValueErrorMsg(
                    row=row,
                    row_name=row_name,
                    keys=keys,
                    values=values,
                    msg=msg,
                    error_code=error_code,
                    error_params=error_params or {},
                )
            )

    def to_error(self) -> ParseTableMultiError:
        return ParseTableMultiError(self.errors)

__len__()

This means that we will be falsy if len is 0, so is effectively a bool as well

Source code in src/sortition_algorithms/errors.py
def __len__(self) -> int:
    """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
    return len(self.errors)

ParseTableMultiError

Bases: SelectionMultilineError

Specifically for collecting errors from parsing a table

This has information that can be collected at a low level. Then higher level code can read the errors and make a SelectionMultilineError instance with strings with more context, relating to a CSV file, Spreadsheet etc.

Source code in src/sortition_algorithms/errors.py
class ParseTableMultiError(SelectionMultilineError):
    """
    Specifically for collecting errors from parsing a table

    This has information that can be collected at a low level. Then higher level code can read
    the errors and make a SelectionMultilineError instance with strings with more context,
    relating to a CSV file, Spreadsheet etc.
    """

    def __init__(self, errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] | None = None) -> None:
        self.all_errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] = errors or []

    def __len__(self) -> int:
        """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
        return len(self.all_errors)

    def lines(self) -> list[str]:
        return [str(e) for e in self.all_errors]

    def combine(self, other: SelectionMultilineError) -> None:
        """Add all the lines from the other error to this one."""
        assert isinstance(other, ParseTableMultiError)
        self.all_errors += other.all_errors

__len__()

This means that we will be falsy if len is 0, so is effectively a bool as well

Source code in src/sortition_algorithms/errors.py
def __len__(self) -> int:
    """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
    return len(self.all_errors)

combine(other)

Add all the lines from the other error to this one.

Source code in src/sortition_algorithms/errors.py
def combine(self, other: SelectionMultilineError) -> None:
    """Add all the lines from the other error to this one."""
    assert isinstance(other, ParseTableMultiError)
    self.all_errors += other.all_errors

RetryableSelectionError

Bases: SelectionError

For errors where the selection should be retried.

The main case is when the legacy selection algorithm fails, it can be worth retrying as it might find something the next time around.

Source code in src/sortition_algorithms/errors.py
class RetryableSelectionError(SelectionError):
    """
    For errors where the selection should be retried.

    The main case is when the legacy selection algorithm fails, it can be worth
    retrying as it might find something the next time around.
    """

    is_retryable: bool = True

SelectionError

Bases: SortitionBaseError

Generic error for things that happen in selection

Source code in src/sortition_algorithms/errors.py
class SelectionError(SortitionBaseError):
    """Generic error for things that happen in selection"""

SelectionMultilineError

Bases: SelectionError

Generic error for things that happen in selection - multiline

Source code in src/sortition_algorithms/errors.py
class SelectionMultilineError(SelectionError):
    """Generic error for things that happen in selection - multiline"""

    def __init__(
        self,
        lines: list[str],
        is_retryable: bool = False,
        error_code: str = "",
        error_params: dict[str, str | int] | None = None,
    ) -> None:
        message = "\n".join(lines)
        super().__init__(message=message, error_code=error_code, error_params=error_params)
        self.all_lines = lines
        self.is_retryable = is_retryable

    def __str__(self) -> str:
        return "\n".join(self.lines())

    def to_html(self) -> str:
        return "<br />".join(html.escape(line) for line in self.lines())

    def lines(self) -> list[str]:
        return self.all_lines

    def combine(self, other: "SelectionMultilineError") -> None:
        """Add all the lines from the other error to this one."""
        self.all_lines += other.lines()

combine(other)

Add all the lines from the other error to this one.

Source code in src/sortition_algorithms/errors.py
def combine(self, other: "SelectionMultilineError") -> None:
    """Add all the lines from the other error to this one."""
    self.all_lines += other.lines()

SortitionBaseError

Bases: Exception

A base class that allows all errors to be caught easily.

Source code in src/sortition_algorithms/errors.py
class SortitionBaseError(Exception):
    """A base class that allows all errors to be caught easily."""

    is_retryable: bool = False

    def __init__(
        self, message: str = "", error_code: str = "", error_params: dict[str, str | int] | None = None
    ) -> None:
        super().__init__(message)
        self.error_code = error_code
        self.error_params = error_params or {}
        self.message = message

    def to_html(self) -> str:
        return html.escape(str(self))

check_desired(fc, desired_number)

Check if the desired number of people is within the min/max of every feature.

Source code in src/sortition_algorithms/features.py
def check_desired(fc: FeatureCollection, desired_number: int) -> None:
    """
    Check if the desired number of people is within the min/max of every feature.
    """
    errors: list[str] = []
    for feature_name, fvalues in fc.items():
        if desired_number < _fv_minimum_selection(fvalues) or desired_number > _fv_maximum_selection(fvalues):
            errors.append(
                f"The number of people to select ({desired_number}) is out of the range of "
                f"the numbers of people in the {feature_name} feature. It should be within "
                f"[{_fv_minimum_selection(fvalues)}, {_fv_maximum_selection(fvalues)}]."
            )
    if errors:
        raise SelectionMultilineError(errors)

check_min_max(fc, number_to_select=0, feature_column_name='feature')

If the min is bigger than the max we're in trouble i.e. there's an input error

Source code in src/sortition_algorithms/features.py
def check_min_max(fc: FeatureCollection, number_to_select: int = 0, feature_column_name: str = "feature") -> None:
    """
    If the min is bigger than the max we're in trouble i.e. there's an input error
    """
    errors: list[str] = []
    if minimum_selection(fc) > maximum_selection(fc):
        errors += report_min_max_error_details(fc, feature_column_name)
    if number_to_select:
        errors += report_min_max_against_number_to_select(fc, number_to_select, feature_column_name)
    if errors:
        raise SelectionMultilineError(errors)

iterate_feature_collection(features)

Helper function to iterate over feature collection.

Source code in src/sortition_algorithms/features.py
def iterate_feature_collection(features: FeatureCollection) -> Generator[tuple[str, str, FeatureValueMinMax]]:
    """Helper function to iterate over feature collection."""
    for feature_name, feature_values in features.items():
        for value_name, fv_minmax in feature_values.items():
            yield feature_name, value_name, fv_minmax

maximum_selection(fc)

The maximum selection for this set of features is the smallest maximum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
def maximum_selection(fc: FeatureCollection) -> int:
    """
    The maximum selection for this set of features is the smallest maximum selection
    of any individual feature.
    """
    if not fc:
        return 0

    return min(_fv_maximum_selection(fv) for fv in fc.values())

minimum_selection(fc)

The minimum selection for this set of features is the largest minimum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
def minimum_selection(fc: FeatureCollection) -> int:
    """
    The minimum selection for this set of features is the largest minimum selection
    of any individual feature.
    """
    if not fc:
        return 0

    return max(_fv_minimum_selection(fv) for fv in fc.values())

read_in_features(features_head, features_body, number_to_select=0)

Read in stratified selection features and values

Note we do want features_head to ensure we don't have multiple columns with the same name

Source code in src/sortition_algorithms/features.py
def read_in_features(
    features_head: Iterable[str], features_body: Iterable[dict[str, str]], number_to_select: int = 0
) -> tuple[FeatureCollection, str, str]:
    """
    Read in stratified selection features and values

    Note we do want features_head to ensure we don't have multiple columns with the same name
    """
    features: FeatureCollection = CaseInsensitiveDict()
    features_flex, filtered_headers = _feature_headers_flex(list(features_head))
    combined_error = ParseTableMultiError()
    feature_column_name = "feature"
    feature_value_column_name = "value"
    # row 1 is the header, so the body starts on row 2
    for row_number, row in enumerate(features_body, start=2):
        if row_number == 2:
            _, feature_column_name = _get_feature_from_row(row)
            _, feature_value_column_name = _get_feature_value_from_row(row)
        # check the set of keys in the row are the same as the headers
        assert set(filtered_headers) <= set(row.keys())
        stripped_row = utils.normalise_dict(row)
        fname, _ = _get_feature_from_row(row)
        if not fname:
            continue
        try:
            fname, fvalue, fv_minmax = _clean_row(stripped_row, features_flex, row_number)
        except ParseTableMultiError as error:
            # add all the lines into one large error, so we report all the errors in one go
            combined_error.combine(error)
        else:
            if fname not in features:
                features[fname] = CaseInsensitiveDict()
            features[fname][fvalue] = fv_minmax

    # if we got any errors in the above loop, raise the combined error.
    if combined_error:
        raise combined_error

    check_min_max(features, number_to_select=number_to_select, feature_column_name=feature_column_name)
    # check feature_flex to see if we need to set the max here
    # this only changes the max_flex value if these (optional) flex values are NOT set already
    set_default_max_flex(features)
    return CaseInsensitiveDict(features), feature_column_name, feature_value_column_name

report_min_max_against_number_to_select(fc, number_to_select, feature_column_name)

If any combined minimum is > number_to_select we have a problem. If any combined maximum is < number_to_select we have a problem.

Source code in src/sortition_algorithms/features.py
def report_min_max_against_number_to_select(
    fc: FeatureCollection, number_to_select: int, feature_column_name: str
) -> list[str]:
    """
    If any combined minimum is > number_to_select we have a problem.
    If any combined maximum is < number_to_select we have a problem.
    """
    if not fc:
        return []
    errors: list[str] = []
    for key, fv in fc.items():
        feature_minimum = _fv_minimum_selection(fv)
        feature_maximum = _fv_maximum_selection(fv)
        if feature_minimum > number_to_select:
            errors.append(
                f"Minimum for {feature_column_name} {key} ({feature_minimum}) "
                f"is more than number to select ({number_to_select})"
            )
        if feature_maximum < number_to_select:
            errors.append(
                f"Maximum for {feature_column_name} {key} ({feature_maximum}) "
                f"is less than number to select ({number_to_select})"
            )
    return errors

report_min_max_error_details(fc, feature_column_name='feature')

Return a list of problems in detail, so that the user can debug the errors in detail

Source code in src/sortition_algorithms/features.py
def report_min_max_error_details(fc: FeatureCollection, feature_column_name: str = "feature") -> list[str]:
    """
    Return a list of problems in detail, so that the user can debug the errors in detail
    """
    if not fc:
        return []

    max_feature, max_val = min(((key, _fv_maximum_selection(fv)) for key, fv in fc.items()), key=lambda x: x[1])
    min_feature, min_val = max(((key, _fv_minimum_selection(fv)) for key, fv in fc.items()), key=lambda x: x[1])
    return [
        f"Inconsistent numbers in min and max in the {feature_column_name} input:",
        f"The smallest maximum is {max_val} for {feature_column_name} '{max_feature}'",
        f"The largest minimum is {min_val} for {feature_column_name} '{min_feature}'",
        f"You need to reduce the minimums for {min_feature} or increase the maximums for {max_feature}.",
    ]

set_default_max_flex(fc)

Note this only sets it if left at the default value

Source code in src/sortition_algorithms/features.py
def set_default_max_flex(fc: FeatureCollection) -> None:
    """Note this only sets it if left at the default value"""
    max_flex = _safe_max_flex_val(fc)
    for feature_values in fc.values():
        for fv_minmax in feature_values.values():
            fv_minmax.set_default_max_flex(max_flex)

MaxRatioResult

Result from finding the category with maximum selection ratio.

Source code in src/sortition_algorithms/people_features.py
@define(kw_only=True, slots=True)
class MaxRatioResult:
    """Result from finding the category with maximum selection ratio."""

    feature_name: str
    feature_value: str
    random_person_index: int

PeopleFeatures

This class manipulates people and features together, making a deepcopy on init.

It is only used by the legacy method.

Source code in src/sortition_algorithms/people_features.py
class PeopleFeatures:
    """
    This class manipulates people and features together, making a deepcopy on init.

    It is only used by the legacy method.
    """

    # TODO: consider naming: maybe SelectionState
    # TODO: consider moving into committee_generation/legacy.py

    def __init__(
        self,
        people: People,
        features: FeatureCollection,
        check_same_address_columns: list[str] | None = None,
    ) -> None:
        self.people = deepcopy(people)
        self.features = features
        self.select_collection = select_from_feature_collection(self.features)
        self.check_same_address_columns = check_same_address_columns or []

    def update_features_remaining(self, person_key: str) -> None:
        # this will blow up if the person does not exist
        person = self.people.get_person_dict(person_key)
        for feature_name in self.features:
            feature_value = person[feature_name]
            self.select_collection[feature_name][feature_value].add_remaining()

    def update_all_features_remaining(self) -> None:
        for person_key in self.people:
            self.update_features_remaining(person_key)

    def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
        """
        When a feature/value is "full" we delete everyone else in it.
        "Full" means that the number selected equals the "max" amount - that
        is detected elsewhere and then this method is called.
        Returns count of those deleted, and count of those left
        """
        # when a category is full we want to delete everyone in it
        people_to_delete: list[str] = []
        for pkey, person in self.people.items():
            if person[feature_name].lower() == feature_value.lower():
                people_to_delete.append(pkey)
                for feature in self.features:
                    current_feature_value = person[feature]
                    try:
                        self.select_collection[feature][current_feature_value].remove_remaining()
                    except errors.SelectionError as e:
                        msg = (
                            f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                            f"as after previous deletion no one/not enough left in {feature} "
                            f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                        )
                        raise errors.RetryableSelectionError(msg) from e

        self.people.remove_many(people_to_delete)
        # return the number of people deleted and the number of people left
        return len(people_to_delete), self.people.count

    def prune_for_feature_max_0(self) -> list[str]:
        """
        Check if any feature_value.max is set to zero. if so delete everyone with that feature value
        NOT DONE: could then check if anyone is left.
        """
        msg: list[str] = []
        msg.append(f"Number of people: {self.people.count}.")
        total_num_deleted = 0
        for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(self.features):
            if fv_minmax.max == 0:  # we don't want any of these people
                # pass the message in as deleting them might throw an exception
                msg.append(f"Feature/value {feature_name}/{fvalue_name} full - deleting people...")
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
                # if no exception was thrown above add this bit to the end of the previous message
                msg[-1] += f" Deleted {num_deleted}, {num_left} left."
                total_num_deleted += num_deleted
        # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
        # the 'remaining' file will be useless - remind the user of this!
        if total_num_deleted >= self.people.count / 2:
            msg.append(
                ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
                "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
            )
        return msg

    def select_person(self, person_key: str) -> list[str]:
        """
        Selecting a person means:
        - remove the person from our copy of People
        - update the `selected` and `remaining` counts of the FeatureCollection
        - if check_same_address_columns has columns, also remove household members (without adding to selected)

        Returns:
            List of additional people removed due to same address (empty if check_same_address_columns is empty)
        """
        # First, find household members if address checking is enabled (before removing the person)
        household_members_removed = []
        if self.check_same_address_columns:
            household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

        # Handle the main person selection
        person = self.people.get_person_dict(person_key)
        for feature_name in self.features:
            feature_value = person[feature_name]
            self.select_collection[feature_name][feature_value].remove_remaining()
            self.select_collection[feature_name][feature_value].add_selected()
        self.people.remove(person_key)

        # Then remove household members if any were found
        for household_member_key in household_members_removed:
            household_member = self.people.get_person_dict(household_member_key)
            for feature_name in self.features:
                feature_value = household_member[feature_name]
                self.select_collection[feature_name][feature_value].remove_remaining()
                # Note: we don't call add_selected() for household members
            self.people.remove(household_member_key)

        return household_members_removed

    def find_max_ratio_category(self) -> MaxRatioResult:
        """
        Find the feature/value combination with the highest selection ratio.

        The ratio is calculated as: (min - selected) / remaining
        This represents how urgently we need people from this category.
        Higher ratio = more urgent need (fewer people available relative to what we still need).

        Returns:
            MaxRatioResult containing the feature name, value, and a random person index

        Raises:
            SelectionError: If insufficient people remain to meet minimum requirements
        """
        max_ratio = -100.0
        result_feature_name = ""
        result_feature_value = ""
        random_person_index = -1

        for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
            # Check if we have insufficient people to meet minimum requirements
            if not select_counts.sufficient_people():
                msg = (
                    f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{fvalue_name}. "
                    f"Need {select_counts.people_still_needed} more, but only {select_counts.remaining} remaining."
                )
                raise errors.SelectionError(msg)

            # Skip categories with no remaining people or max = 0
            if select_counts.remaining == 0 or select_counts.min_max.max == 0:
                continue

            # Calculate the priority ratio
            ratio = select_counts.people_still_needed / float(select_counts.remaining)

            # Track the highest ratio category
            if ratio > max_ratio:
                max_ratio = ratio
                result_feature_name = feature_name
                result_feature_value = fvalue_name
                # from 1 to remaining
                random_person_index = random_provider().randbelow(select_counts.remaining) + 1

        # If no valid category found, all categories must be at their max or have max=0
        if not result_feature_name:
            msg = "No valid categories found - all may be at maximum or have max=0"
            raise errors.SelectionError(msg)

        return MaxRatioResult(
            feature_name=result_feature_name,
            feature_value=result_feature_value,
            random_person_index=random_person_index,
        )

    def handle_category_full_deletions(self, selected_person_data: MutableMapping[str, str]) -> RunReport:
        """
        Check if any categories are now full after a selection and delete remaining people.

        When a person is selected, some categories may reach their maximum quota.
        This method identifies such categories and removes all remaining people from them.

        Args:
            selected_person_data: Dictionary of the selected person's feature values

        Returns:
            RunReport containing messages about categories that became full and people deleted

        Raises:
            SelectionError: If deletions would violate minimum constraints
        """
        report = RunReport()

        for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
            if (
                fvalue_name.lower() == selected_person_data[feature_name].lower()
                and select_counts.selected == select_counts.min_max.max
            ):
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
                if num_deleted > 0:
                    report.add_line(
                        f"Category {feature_name}/{fvalue_name} full - deleted {num_deleted} people, {num_left} left."
                    )

        return report

delete_all_with_feature_value(feature_name, feature_value)

When a feature/value is "full" we delete everyone else in it. "Full" means that the number selected equals the "max" amount - that is detected elsewhere and then this method is called. Returns count of those deleted, and count of those left

Source code in src/sortition_algorithms/people_features.py
def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
    """
    When a feature/value is "full" we delete everyone else in it.
    "Full" means that the number selected equals the "max" amount - that
    is detected elsewhere and then this method is called.
    Returns count of those deleted, and count of those left
    """
    # when a category is full we want to delete everyone in it
    people_to_delete: list[str] = []
    for pkey, person in self.people.items():
        if person[feature_name].lower() == feature_value.lower():
            people_to_delete.append(pkey)
            for feature in self.features:
                current_feature_value = person[feature]
                try:
                    self.select_collection[feature][current_feature_value].remove_remaining()
                except errors.SelectionError as e:
                    msg = (
                        f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                        f"as after previous deletion no one/not enough left in {feature} "
                        f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                    )
                    raise errors.RetryableSelectionError(msg) from e

    self.people.remove_many(people_to_delete)
    # return the number of people deleted and the number of people left
    return len(people_to_delete), self.people.count

find_max_ratio_category()

Find the feature/value combination with the highest selection ratio.

The ratio is calculated as: (min - selected) / remaining This represents how urgently we need people from this category. Higher ratio = more urgent need (fewer people available relative to what we still need).

Returns:

Type Description
MaxRatioResult

MaxRatioResult containing the feature name, value, and a random person index

Raises:

Type Description
SelectionError

If insufficient people remain to meet minimum requirements

Source code in src/sortition_algorithms/people_features.py
def find_max_ratio_category(self) -> MaxRatioResult:
    """
    Find the feature/value combination with the highest selection ratio.

    The ratio is calculated as: (min - selected) / remaining
    This represents how urgently we need people from this category.
    Higher ratio = more urgent need (fewer people available relative to what we still need).

    Returns:
        MaxRatioResult containing the feature name, value, and a random person index

    Raises:
        SelectionError: If insufficient people remain to meet minimum requirements
    """
    max_ratio = -100.0
    result_feature_name = ""
    result_feature_value = ""
    random_person_index = -1

    for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
        # Check if we have insufficient people to meet minimum requirements
        if not select_counts.sufficient_people():
            msg = (
                f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{fvalue_name}. "
                f"Need {select_counts.people_still_needed} more, but only {select_counts.remaining} remaining."
            )
            raise errors.SelectionError(msg)

        # Skip categories with no remaining people or max = 0
        if select_counts.remaining == 0 or select_counts.min_max.max == 0:
            continue

        # Calculate the priority ratio
        ratio = select_counts.people_still_needed / float(select_counts.remaining)

        # Track the highest ratio category
        if ratio > max_ratio:
            max_ratio = ratio
            result_feature_name = feature_name
            result_feature_value = fvalue_name
            # from 1 to remaining
            random_person_index = random_provider().randbelow(select_counts.remaining) + 1

    # If no valid category found, all categories must be at their max or have max=0
    if not result_feature_name:
        msg = "No valid categories found - all may be at maximum or have max=0"
        raise errors.SelectionError(msg)

    return MaxRatioResult(
        feature_name=result_feature_name,
        feature_value=result_feature_value,
        random_person_index=random_person_index,
    )

handle_category_full_deletions(selected_person_data)

Check if any categories are now full after a selection and delete remaining people.

When a person is selected, some categories may reach their maximum quota. This method identifies such categories and removes all remaining people from them.

Parameters:

Name Type Description Default
selected_person_data MutableMapping[str, str]

Dictionary of the selected person's feature values

required

Returns:

Type Description
RunReport

RunReport containing messages about categories that became full and people deleted

Raises:

Type Description
SelectionError

If deletions would violate minimum constraints

Source code in src/sortition_algorithms/people_features.py
def handle_category_full_deletions(self, selected_person_data: MutableMapping[str, str]) -> RunReport:
    """
    Check if any categories are now full after a selection and delete remaining people.

    When a person is selected, some categories may reach their maximum quota.
    This method identifies such categories and removes all remaining people from them.

    Args:
        selected_person_data: Dictionary of the selected person's feature values

    Returns:
        RunReport containing messages about categories that became full and people deleted

    Raises:
        SelectionError: If deletions would violate minimum constraints
    """
    report = RunReport()

    for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
        if (
            fvalue_name.lower() == selected_person_data[feature_name].lower()
            and select_counts.selected == select_counts.min_max.max
        ):
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
            if num_deleted > 0:
                report.add_line(
                    f"Category {feature_name}/{fvalue_name} full - deleted {num_deleted} people, {num_left} left."
                )

    return report

prune_for_feature_max_0()

Check if any feature_value.max is set to zero. if so delete everyone with that feature value NOT DONE: could then check if anyone is left.

Source code in src/sortition_algorithms/people_features.py
def prune_for_feature_max_0(self) -> list[str]:
    """
    Check if any feature_value.max is set to zero. if so delete everyone with that feature value
    NOT DONE: could then check if anyone is left.
    """
    msg: list[str] = []
    msg.append(f"Number of people: {self.people.count}.")
    total_num_deleted = 0
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(self.features):
        if fv_minmax.max == 0:  # we don't want any of these people
            # pass the message in as deleting them might throw an exception
            msg.append(f"Feature/value {feature_name}/{fvalue_name} full - deleting people...")
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
            # if no exception was thrown above add this bit to the end of the previous message
            msg[-1] += f" Deleted {num_deleted}, {num_left} left."
            total_num_deleted += num_deleted
    # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
    # the 'remaining' file will be useless - remind the user of this!
    if total_num_deleted >= self.people.count / 2:
        msg.append(
            ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
            "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
        )
    return msg

select_person(person_key)

Selecting a person means: - remove the person from our copy of People - update the selected and remaining counts of the FeatureCollection - if check_same_address_columns has columns, also remove household members (without adding to selected)

Returns:

Type Description
list[str]

List of additional people removed due to same address (empty if check_same_address_columns is empty)

Source code in src/sortition_algorithms/people_features.py
def select_person(self, person_key: str) -> list[str]:
    """
    Selecting a person means:
    - remove the person from our copy of People
    - update the `selected` and `remaining` counts of the FeatureCollection
    - if check_same_address_columns has columns, also remove household members (without adding to selected)

    Returns:
        List of additional people removed due to same address (empty if check_same_address_columns is empty)
    """
    # First, find household members if address checking is enabled (before removing the person)
    household_members_removed = []
    if self.check_same_address_columns:
        household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

    # Handle the main person selection
    person = self.people.get_person_dict(person_key)
    for feature_name in self.features:
        feature_value = person[feature_name]
        self.select_collection[feature_name][feature_value].remove_remaining()
        self.select_collection[feature_name][feature_value].add_selected()
    self.people.remove(person_key)

    # Then remove household members if any were found
    for household_member_key in household_members_removed:
        household_member = self.people.get_person_dict(household_member_key)
        for feature_name in self.features:
            feature_value = household_member[feature_name]
            self.select_collection[feature_name][feature_value].remove_remaining()
            # Note: we don't call add_selected() for household members
        self.people.remove(household_member_key)

    return household_members_removed

SelectCounts

Source code in src/sortition_algorithms/people_features.py
@define(kw_only=True, slots=True)
class SelectCounts:
    min_max: FeatureValueMinMax
    selected: int = 0
    remaining: int = 0

    def add_remaining(self) -> None:
        self.remaining += 1

    def add_selected(self) -> None:
        self.selected += 1

    def remove_remaining(self) -> None:
        self.remaining -= 1
        if self.remaining == 0 and self.selected < self.min_max.min:
            msg = "SELECTION IMPOSSIBLE: FAIL - no one/not enough left after deletion."
            raise errors.RetryableSelectionError(msg)

    @property
    def hit_target(self) -> bool:
        """Return true if selected is between min and max (inclusive)"""
        return self.selected >= self.min_max.min and self.selected <= self.min_max.max

    def percent_selected(self, number_people_wanted: int) -> float:
        return self.selected * 100 / float(number_people_wanted)

    @property
    def people_still_needed(self) -> int:
        """The number of extra people to select to get to the minimum - it should never be negative"""
        return max(self.min_max.min - self.selected, 0)

    def sufficient_people(self) -> bool:
        """
        Return true if we can still make the minimum. So either:
        - we have already selected at least the minimum, or
        - the remaining number is at least as big as the number still required
        """
        return self.selected >= self.min_max.min or self.remaining >= self.people_still_needed

hit_target property

Return true if selected is between min and max (inclusive)

people_still_needed property

The number of extra people to select to get to the minimum - it should never be negative

sufficient_people()

Return true if we can still make the minimum. So either: - we have already selected at least the minimum, or - the remaining number is at least as big as the number still required

Source code in src/sortition_algorithms/people_features.py
def sufficient_people(self) -> bool:
    """
    Return true if we can still make the minimum. So either:
    - we have already selected at least the minimum, or
    - the remaining number is at least as big as the number still required
    """
    return self.selected >= self.min_max.min or self.remaining >= self.people_still_needed

WeightedSample

Source code in src/sortition_algorithms/people_features.py
class WeightedSample:
    def __init__(self, features: FeatureCollection) -> None:
        """
        This produces a set of lists of feature values for each feature.  Each value
        is in the list `fv_minmax.max` times - so a random choice with represent the max.

        So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
        "black" with max 2 we'd get:

        ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

        Then making random choices from that list produces a weighted sample.
        """
        self.weighted: dict[str, list[str]] = defaultdict(list)
        for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
            self.weighted[feature_name] += [fvalue_name] * fv_minmax.max

    def value_for(self, feature_name: str) -> str:
        # S311 is random numbers for crypto - but this is just for a sample file
        return random_provider().choice(self.weighted[feature_name])

__init__(features)

This produces a set of lists of feature values for each feature. Each value is in the list fv_minmax.max times - so a random choice with represent the max.

So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and "black" with max 2 we'd get:

["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

Then making random choices from that list produces a weighted sample.

Source code in src/sortition_algorithms/people_features.py
def __init__(self, features: FeatureCollection) -> None:
    """
    This produces a set of lists of feature values for each feature.  Each value
    is in the list `fv_minmax.max` times - so a random choice with represent the max.

    So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
    "black" with max 2 we'd get:

    ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

    Then making random choices from that list produces a weighted sample.
    """
    self.weighted: dict[str, list[str]] = defaultdict(list)
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
        self.weighted[feature_name] += [fvalue_name] * fv_minmax.max

iterate_select_collection(select_collection)

Helper function to iterate over select_collection.

Source code in src/sortition_algorithms/people_features.py
def iterate_select_collection(select_collection: SelectCollection) -> Generator[tuple[str, str, SelectCounts]]:
    """Helper function to iterate over select_collection."""
    for feature_name, feature_values in select_collection.items():
        for value_name, fv_counts in feature_values.items():
            yield feature_name, value_name, fv_counts

simple_add_selected(person_keys, people, features)

Just add the person to the selected counts for the feature values for that person. Don't do the more complex handling of the full PeopleFeatures.add_selected()

Source code in src/sortition_algorithms/people_features.py
def simple_add_selected(person_keys: Iterable[str], people: People, features: SelectCollection) -> None:
    """
    Just add the person to the selected counts for the feature values for that person.
    Don't do the more complex handling of the full PeopleFeatures.add_selected()
    """
    for person_key in person_keys:
        person = people.get_person_dict(person_key)
        for feature_name in features:
            feature_value = person[feature_name]
            features[feature_name][feature_value].add_selected()

People

Source code in src/sortition_algorithms/people.py
class People:
    def __init__(self, columns_to_keep: list[str]) -> None:
        self._columns_to_keep = columns_to_keep
        self._full_data: dict[str, MutableMapping[str, str]] = {}

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self._full_data == other._full_data and self._columns_to_keep == self._columns_to_keep

    @property
    def count(self) -> int:
        return len(self._full_data)

    def __iter__(self) -> Iterator[str]:
        return iter(self._full_data)

    def items(self) -> ItemsView[str, MutableMapping[str, str]]:
        return self._full_data.items()

    def add(
        self,
        person_key: str,
        data: MutableMapping[str, str],
        features: FeatureCollection,
        row_number: int,
        feature_column_name: str = "feature",
    ) -> None:
        person_full_data: MutableMapping[str, str] = CaseInsensitiveDict()
        errors = ParseErrorsCollector()
        # get the feature values: these are the most important and we must check them
        for feature_name, feature_values in features.items():
            # check for input errors here - if it's not in the list of feature values...
            # allow for some unclean data - at least strip empty space, but only if a str!
            # (some values will can be numbers)
            p_value = data[feature_name]
            if p_value in feature_values:
                person_full_data[feature_name] = p_value
            else:
                if p_value:
                    msg = f"Value '{p_value}' not in {feature_column_name} {feature_name}"
                    error_code = "value_not_in_feature"
                    error_params = {
                        "value": p_value,
                        "feature_column_name": feature_column_name,
                        "feature_name": feature_name,
                    }
                else:
                    msg = f"Empty value in {feature_column_name} {feature_name}"
                    error_code = "empty_value_in_feature"
                    error_params = {"feature_column_name": feature_column_name, "feature_name": feature_name}
                errors.add(
                    msg=msg,
                    key=feature_name,
                    value=p_value,
                    row=row_number,
                    row_name=person_key,
                    error_code=error_code,
                    error_params=error_params,
                )
        if errors:
            raise errors.to_error()
        # then get the other column values we need
        # this is address, name etc that we need to keep for output file
        # we don't check anything here - it's just for user convenience
        for col in self._columns_to_keep:
            person_full_data[col] = data[col]

        # add all the data to our people object
        self._full_data[person_key] = person_full_data

    def remove(self, person_key: str) -> None:
        del self._full_data[person_key]

    def remove_many(self, person_keys: Iterable[str]) -> None:
        for key in person_keys:
            self.remove(key)

    def get_person_dict(self, person_key: str) -> MutableMapping[str, str]:
        return self._full_data[person_key]

    @staticmethod
    def _address_tuple(person_dict: MutableMapping[str, str], address_columns: Iterable[str]) -> tuple[str, ...]:
        return tuple(person_dict[col].lower() for col in address_columns)

    def get_address(self, person_key: str, address_columns: Iterable[str]) -> tuple[str, ...]:
        return self._address_tuple(self._full_data[person_key], address_columns)

    def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
        """
        Generates a dict with:
        - keys: a tuple containing the address strings
        - values: a list of person_key for each person at that address
        """
        households = defaultdict(list)
        for person_key, person in self._full_data.items():
            households[self._address_tuple(person, address_columns)].append(person_key)
        return households

    def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
        """
        Returns a list of person keys for all people who have an address matching
        the address of the person passed in.
        """
        person = self._full_data[person_key]
        person_address = self._address_tuple(person, address_columns)
        for loop_key, loop_person in self._full_data.items():
            if loop_key == person_key:
                continue  # skip the person we've been given
            if person_address == self._address_tuple(loop_person, address_columns):
                yield loop_key

    def _iter_matching(self, feature_name: str, feature_value: str) -> Generator[str]:
        for person_key, person_dict in self._full_data.items():
            if person_dict[feature_name].lower() == feature_value.lower():
                yield person_key

    def count_feature_value(self, feature_name: str, feature_value: str) -> int:
        return len(list(self._iter_matching(feature_name, feature_value)))

    def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
        """
        Find the nth person (1-indexed) in a specific feature category.

        Args:
            feature_name: Name of the feature (e.g., "gender")
            feature_value: Value of the feature (e.g., "male")
            position: 1-indexed position within the category

        Returns:
            Person key of the person at the specified position

        Raises:
            SelectionError: If no person is found at the specified position
        """
        people_in_category = list(self._iter_matching(feature_name, feature_value))
        try:
            return people_in_category[position - 1]
        except IndexError:
            # Should always find someone if position is valid
            # If we hit this line it is a bug
            msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
            raise SelectionError(
                message=msg,
                error_code="person_not_found",
                error_params={"position": position, "feature_name": feature_name, "feature_value": feature_value},
            ) from None

find_person_by_position_in_category(feature_name, feature_value, position)

Find the nth person (1-indexed) in a specific feature category.

Parameters:

Name Type Description Default
feature_name str

Name of the feature (e.g., "gender")

required
feature_value str

Value of the feature (e.g., "male")

required
position int

1-indexed position within the category

required

Returns:

Type Description
str

Person key of the person at the specified position

Raises:

Type Description
SelectionError

If no person is found at the specified position

Source code in src/sortition_algorithms/people.py
def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
    """
    Find the nth person (1-indexed) in a specific feature category.

    Args:
        feature_name: Name of the feature (e.g., "gender")
        feature_value: Value of the feature (e.g., "male")
        position: 1-indexed position within the category

    Returns:
        Person key of the person at the specified position

    Raises:
        SelectionError: If no person is found at the specified position
    """
    people_in_category = list(self._iter_matching(feature_name, feature_value))
    try:
        return people_in_category[position - 1]
    except IndexError:
        # Should always find someone if position is valid
        # If we hit this line it is a bug
        msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
        raise SelectionError(
            message=msg,
            error_code="person_not_found",
            error_params={"position": position, "feature_name": feature_name, "feature_value": feature_value},
        ) from None

households(address_columns)

Generates a dict with: - keys: a tuple containing the address strings - values: a list of person_key for each person at that address

Source code in src/sortition_algorithms/people.py
def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
    """
    Generates a dict with:
    - keys: a tuple containing the address strings
    - values: a list of person_key for each person at that address
    """
    households = defaultdict(list)
    for person_key, person in self._full_data.items():
        households[self._address_tuple(person, address_columns)].append(person_key)
    return households

matching_address(person_key, address_columns)

Returns a list of person keys for all people who have an address matching the address of the person passed in.

Source code in src/sortition_algorithms/people.py
def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
    """
    Returns a list of person keys for all people who have an address matching
    the address of the person passed in.
    """
    person = self._full_data[person_key]
    person_address = self._address_tuple(person, address_columns)
    for loop_key, loop_person in self._full_data.items():
        if loop_key == person_key:
            continue  # skip the person we've been given
        if person_address == self._address_tuple(loop_person, address_columns):
            yield loop_key

check_enough_people_for_every_feature_value(features, people)

For each feature/value, if the min>0, check there are enough people who have that feature/value

Source code in src/sortition_algorithms/people.py
def check_enough_people_for_every_feature_value(features: FeatureCollection, people: People) -> None:
    """For each feature/value, if the min>0, check there are enough people who have that feature/value"""
    error_list: list[str] = []
    for fname, fvalue, fv_minmax in iterate_feature_collection(features):
        matching_count = people.count_feature_value(fname, fvalue)
        if matching_count < fv_minmax.min:
            error_list.append(
                f"Not enough people with the value '{fvalue}' in category '{fname}' - "
                f"the minimum is {fv_minmax.min} but we only have {matching_count}"
            )
    if error_list:
        raise SelectionMultilineError(error_list)

check_for_duplicate_people(people_body, settings)

If we have rows with duplicate IDs things are going to go bad. First check for any duplicate IDs. If we find any, check if the duplicates are identical.

Returns:

Type Description
RunReport

RunReport containing warnings about duplicate people

Raises:

Type Description
SelectionMultilineError

If duplicate IDs have different data

Source code in src/sortition_algorithms/people.py
def check_for_duplicate_people(people_body: Iterable[MutableMapping[str, str]], settings: Settings) -> RunReport:
    """
    If we have rows with duplicate IDs things are going to go bad.
    First check for any duplicate IDs. If we find any, check if the duplicates are identical.

    Returns:
        RunReport containing warnings about duplicate people

    Raises:
        SelectionMultilineError: If duplicate IDs have different data
    """
    report = RunReport()

    # first check for any duplicate_ids
    id_counter = Counter(row[settings.id_column] for row in people_body)
    duplicate_ids = {k for k, v in id_counter.items() if v > 1}
    if not duplicate_ids:
        return report

    # find the duplicate rows
    duplicate_rows: dict[str, list[MutableMapping[str, str]]] = defaultdict(list)
    for row in people_body:
        pkey = row[settings.id_column]
        if pkey in duplicate_ids:
            duplicate_rows[pkey].append(row)

    report.add_message("duplicate_ids_found", count=len(duplicate_rows))
    report.add_message("duplicate_ids_list", ids=" ".join(duplicate_rows))

    # find rows where everything is not equal
    duplicate_differing_rows: dict[str, list[MutableMapping[str, str]]] = {}
    for key, value in duplicate_rows.items():
        if not _all_in_list_equal(value):
            duplicate_differing_rows[key] = value
    if not duplicate_differing_rows:
        report.add_message("duplicate_rows_identical")
        return report

    # Build error message with full context
    output: list[str] = []
    output.append(f"Found {len(duplicate_rows)} IDs that have more than one row")
    output.append(f"Duplicated IDs are: {' '.join(duplicate_rows)}")
    output.append(f"Found {len(duplicate_differing_rows)} IDs that have more than one row with different data")
    for key, value in duplicate_differing_rows.items():
        for row in value:
            output.append(f"For ID '{key}' one row of data is: {row}")
    raise SelectionMultilineError(output)

exclude_matching_selected_addresses(people, already_selected, settings)

If we are checking the same addresses, then we should start by excluding people who have the same address as someone who is already selected.

Source code in src/sortition_algorithms/people.py
def exclude_matching_selected_addresses(people: People, already_selected: People | None, settings: Settings) -> People:
    """
    If we are checking the same addresses, then we should start by excluding people
    who have the same address as someone who is already selected.
    """
    if already_selected is None or not already_selected.count or not settings.check_same_address:
        return people
    selected_addresses = {
        already_selected.get_address(pkey, settings.check_same_address_columns) for pkey in already_selected
    }
    new_people = deepcopy(people)
    for person_key in people:
        if people.get_address(person_key, settings.check_same_address_columns) in selected_addresses:
            new_people.remove(person_key)
    return new_people

Settings

Settings to use when selecting committees. Note that only the first two are required. A minimal example would be:

Settings(id_column="my_id", columns_to_keep=["name", "email"])

Source code in src/sortition_algorithms/settings.py
@define
class Settings:
    """
    Settings to use when selecting committees. Note that only the first two are required.
    A minimal example would be:

    Settings(id_column="my_id", columns_to_keep=["name", "email"])
    """

    # required
    id_column: str = field(validator=validators.instance_of(str))
    columns_to_keep: list[str] = field()

    # fields with defaults
    check_same_address: bool = field(validator=validators.instance_of(bool), default=False)
    check_same_address_columns: list[str] = field(validator=check_columns_for_same_address, factory=list)
    max_attempts: int = field(validator=validators.instance_of(int), default=100)
    selection_algorithm: str = field(default="maximin")
    random_number_seed: int = field(validator=validators.instance_of(int), default=0)

    @columns_to_keep.validator
    def check_columns_to_keep(self, attribute: Any, value: Any) -> None:
        if not isinstance(value, list):
            raise TypeError("columns_to_keep must be a LIST of strings", "columns_to_keep_not_list", {})
        if not all(isinstance(element, str) for element in value):
            raise TypeError("columns_to_keep must be a list of STRINGS", "columns_to_keep_not_strings", {})

    @selection_algorithm.validator
    def check_selection_algorithm(self, attribute: Any, value: str) -> None:
        if value not in SELECTION_ALGORITHMS:
            algorithms_str = ", ".join(SELECTION_ALGORITHMS)
            raise ValueError(
                f"selection_algorithm {value} is not one of: {algorithms_str}",
                "invalid_selection_algorithm",
                {"algorithm": value, "valid_algorithms": algorithms_str},
            )

    @property
    def normalised_address_columns(self) -> list[str]:
        """
        Returns an empty list if address columns should not be checked (or if the columns
        specified was an empty list). Otherwise return the columns. That way other code can
        just check if the columns are empty rather than checking the bool separately.
        """
        return self.check_same_address_columns if self.check_same_address else []

    @property
    def full_columns_to_keep(self) -> list[str]:
        """
        We always need to keep the address columns, so in case they are not listed in
        self.columns_to_keep we have this property to have the combined list.
        """
        extra_address_columns = [col for col in self.check_same_address_columns if col not in self.columns_to_keep]
        return [*self.columns_to_keep, *extra_address_columns]

    def as_dict(self) -> dict[str, Any]:
        dict_repr = unstructure(self)
        assert isinstance(dict_repr, dict)
        assert all(isinstance(key, str) for key in dict_repr)
        return dict_repr

    @classmethod
    def load_from_file(
        cls,
        settings_file_path: Path,
    ) -> tuple["Settings", RunReport]:
        report = RunReport()
        if not settings_file_path.is_file():
            with open(settings_file_path, "w", encoding="utf-8") as settings_file:
                settings_file.write(DEFAULT_SETTINGS)
            report.add_message("wrote_default_settings", file_path=str(settings_file_path.absolute()))
        with open(settings_file_path, "rb") as settings_file:
            settings = tomllib.load(settings_file)
        # you can't check an address if there is no info about which columns to check...
        if settings["check_same_address"] is False:
            report.add_message("address_checking_disabled_warning", level=ReportLevel.IMPORTANT)
            settings["check_same_address_columns"] = []
        return structure(settings, cls), report

full_columns_to_keep property

We always need to keep the address columns, so in case they are not listed in self.columns_to_keep we have this property to have the combined list.

normalised_address_columns property

Returns an empty list if address columns should not be checked (or if the columns specified was an empty list). Otherwise return the columns. That way other code can just check if the columns are empty rather than checking the bool separately.

RandomProvider

Bases: ABC

This is something of a hack. Mostly we want to use the secrets module. But for repeatable testing we might want to set the random.seed sometimes.

So we have a global _random_provider which can be switched between an instance of this class that uses the secrets module and an instance that uses random with a seed. The switch is done by the set_random_provider() function.

Then every time we want some randomness, we call random_provider() to get the current version of the global.

Source code in src/sortition_algorithms/utils.py
class RandomProvider(ABC):
    """
    This is something of a hack. Mostly we want to use the `secrets` module.
    But for repeatable testing we might want to set the random.seed sometimes.

    So we have a global `_random_provider` which can be switched between an
    instance of this class that uses the `secrets` module and an instance that
    uses `random` with a seed. The switch is done by the `set_random_provider()`
    function.

    Then every time we want some randomness, we call `random_provider()` to get
    the current version of the global.
    """

    @classmethod
    @abstractmethod
    def uniform(cls, lower: float, upper: float) -> float: ...

    @classmethod
    @abstractmethod
    def randbelow(cls, upper: int) -> int: ...

    @classmethod
    @abstractmethod
    def choice(cls, seq: "SupportsLenAndGetItem[str]") -> str: ...

RunReport

A class to hold a report to show to the user at the end

Source code in src/sortition_algorithms/utils.py
@define(eq=True)
class RunReport:
    """A class to hold a report to show to the user at the end"""

    _data: list[RunLineLevel | RunTable | RunError] = field(factory=list)

    def __bool__(self) -> bool:
        """
        Basically, False is the report is empty, or True if there is some content. So you can do
        things like

        ```
        if run_report:
            print(f"Run Report\n\n{run_report.as_text()}")
        ```
        """
        return self.has_content()

    def serialize(self) -> dict[str, Any]:
        return _converter.unstructure(self)  # type: ignore[no-any-return]

    @classmethod
    def deserialize(cls, serialized_data: dict[str, Any]) -> "RunReport":
        return _converter.structure(serialized_data, cls)

    def has_content(self) -> bool:
        """
        False is the report is empty, or True if there is some content. So you can do
        things like

        ```
        if run_report.has_content():
            print(f"Run Report\n\n{run_report.as_text()}")
        ```
        """
        return bool(self._data)

    def add_line(
        self,
        line: str,
        level: ReportLevel = ReportLevel.NORMAL,
        message_code: str | None = None,
        message_params: dict[str, Any] | None = None,
    ) -> None:
        """
        Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

        Args:
            line: The English message text (for backward compatibility and standalone use)
            level: Importance level of the message
            message_code: Optional translation key for i18n (e.g., "loading_features_from_file")
            message_params: Optional parameters for message translation (e.g., {"file_path": "features.csv"})
        """
        self._data.append(RunLineLevel(line, level, message_code=message_code, message_params=message_params or {}))

    def add_line_and_log(
        self,
        line: str,
        log_level: int,
        message_code: str | None = None,
        message_params: dict[str, Any] | None = None,
    ) -> None:
        """
        Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

        This method will also log the message to the `user_logger`. This message can be shown to the user as
        the run is happening, so the user has feedback on what is going on while the run is in progress.

        When generating the report we can skip those messages, to avoid duplication. But if the user_logger
        has not been set up to be shown to the user during the run, we do want those messages to be in the
        final report.

        Args:
            line: The English message text (for backward compatibility and standalone use)
            log_level: Logging level for the message
            message_code: Optional translation key for i18n (e.g., "trial_number")
            message_params: Optional parameters for message translation (e.g., {"trial": 3})
        """
        self._data.append(
            RunLineLevel(
                line, ReportLevel.NORMAL, log_level, message_code=message_code, message_params=message_params or {}
            )
        )
        user_logger.log(level=log_level, msg=line)

    def add_message(self, code: str, level: ReportLevel = ReportLevel.NORMAL, **params: Any) -> None:
        """
        Add a translatable message using a message code and parameters.

        This is a convenience method that combines get_message() and add_line() in one call,
        making it simpler to add messages with translation support.

        Args:
            code: The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")
            level: Importance level of the message
            **params: Parameters to substitute into the message template

        Example:
            >>> report.add_message("features_found", count=5)
            >>> report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)
        """
        message = get_message(code, **params)
        self.add_line(message, level=level, message_code=code, message_params=params)

    def add_message_and_log(self, code: str, log_level: int, **params: Any) -> None:
        """
        Add a translatable message using a message code and parameters, and log it.

        This is a convenience method that combines get_message() and add_line_and_log() in one call,
        making it simpler to add messages with translation support that are also logged.

        Args:
            code: The message code from REPORT_MESSAGES (e.g., "trial_number")
            log_level: Logging level for the message
            **params: Parameters to substitute into the message template

        Example:
            >>> report.add_message_and_log("trial_number", logging.WARNING, trial=3)
            >>> report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)
        """
        message = get_message(code, **params)
        self.add_line_and_log(message, log_level, message_code=code, message_params=params)

    def add_lines(self, lines: Iterable[str], level: ReportLevel = ReportLevel.NORMAL) -> None:
        """
        Add multiple lines of text with the same level.

        .. deprecated:: (next version)
            This method is deprecated. Functions should return RunReport instead of list[str],
            and callers should use add_report() to merge them. This provides better support
            for translation and structured reporting.
        """
        warnings.warn(
            "add_lines() is deprecated. Functions should return RunReport instead of list[str], "
            "and use add_report() to merge them.",
            DeprecationWarning,
            stacklevel=2,
        )
        for line in lines:
            self._data.append(RunLineLevel(line, level))

    def add_table(self, table_headings: list[str], table_data: list[list[str | int | float]]) -> None:
        self._data.append(RunTable(table_headings, table_data))

    def add_error(self, error: Exception, is_fatal: bool = True) -> None:
        self._data.append(RunError(error, is_fatal=is_fatal))

    def add_report(self, other: "RunReport") -> None:
        self._data += other._data

    def _element_to_text(self, element: RunLineLevel | RunTable | RunError, include_logged: bool) -> str | None:
        if isinstance(element, RunLineLevel):
            # we might want to skip lines that were already logged
            if include_logged or element.log_level == logging.NOTSET:
                return element.line
            else:
                # sometimes we want empty strings for blank lines, so here we return None
                # instead so the logged lines can be filtered out
                return None
        elif isinstance(element, RunTable):
            table_text = tabulate(element.data, headers=element.headers, tablefmt="simple")
            # we want a blank line before and after the table.
            return f"\n{table_text}\n"
        else:
            return str(element.error)

    def as_text(self, include_logged: bool = True) -> str:
        parts = [self._element_to_text(element, include_logged) for element in self._data]
        return "\n".join(p for p in parts if p is not None)

    def _line_to_html(self, line_level: RunLineLevel) -> str:
        tags = {
            ReportLevel.NORMAL: ("", ""),
            ReportLevel.IMPORTANT: ("<b>", "</b>"),
            ReportLevel.CRITICAL: ('<b style="color: red">', "</b>"),
        }
        start_tag, end_tag = tags[line_level.level]
        escaped_line = html.escape(line_level.line)
        return f"{start_tag}{escaped_line}{end_tag}"

    def _error_to_html(self, run_error: RunError) -> str:
        start_tag, end_tag = ("<b>", "</b>") if run_error.is_fatal else ("", "")
        if isinstance(run_error.error, errors.SortitionBaseError):
            return f"{start_tag}{run_error.error.to_html()}{end_tag}"
        # default to the string representation
        return f"{start_tag}{run_error.error}{end_tag}"

    def _element_to_html(self, element: RunLineLevel | RunTable | RunError, include_logged: bool) -> str | None:
        if isinstance(element, RunLineLevel):
            if include_logged or element.log_level == logging.NOTSET:
                return self._line_to_html(element)
            else:
                return None
        elif isinstance(element, RunTable):
            return tabulate(element.data, headers=element.headers, tablefmt="html")
        else:
            return self._error_to_html(element)

    def as_html(self, include_logged: bool = True) -> str:
        parts = [self._element_to_html(element, include_logged) for element in self._data]
        return "<br />\n".join(p for p in parts if p is not None)

    def last_error(self) -> Exception | None:
        for element in reversed(self._data):
            if isinstance(element, RunError):
                return element.error
        return None

__bool__()

    Basically, False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report:
        print(f"Run Report

{run_report.as_text()}") ```

Source code in src/sortition_algorithms/utils.py
def __bool__(self) -> bool:
    """
    Basically, False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report:
        print(f"Run Report\n\n{run_report.as_text()}")
    ```
    """
    return self.has_content()

add_line(line, level=ReportLevel.NORMAL, message_code=None, message_params=None)

Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

Parameters:

Name Type Description Default
line str

The English message text (for backward compatibility and standalone use)

required
level ReportLevel

Importance level of the message

NORMAL
message_code str | None

Optional translation key for i18n (e.g., "loading_features_from_file")

None
message_params dict[str, Any] | None

Optional parameters for message translation (e.g., {"file_path": "features.csv"})

None
Source code in src/sortition_algorithms/utils.py
def add_line(
    self,
    line: str,
    level: ReportLevel = ReportLevel.NORMAL,
    message_code: str | None = None,
    message_params: dict[str, Any] | None = None,
) -> None:
    """
    Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

    Args:
        line: The English message text (for backward compatibility and standalone use)
        level: Importance level of the message
        message_code: Optional translation key for i18n (e.g., "loading_features_from_file")
        message_params: Optional parameters for message translation (e.g., {"file_path": "features.csv"})
    """
    self._data.append(RunLineLevel(line, level, message_code=message_code, message_params=message_params or {}))

add_line_and_log(line, log_level, message_code=None, message_params=None)

Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

This method will also log the message to the user_logger. This message can be shown to the user as the run is happening, so the user has feedback on what is going on while the run is in progress.

When generating the report we can skip those messages, to avoid duplication. But if the user_logger has not been set up to be shown to the user during the run, we do want those messages to be in the final report.

Parameters:

Name Type Description Default
line str

The English message text (for backward compatibility and standalone use)

required
log_level int

Logging level for the message

required
message_code str | None

Optional translation key for i18n (e.g., "trial_number")

None
message_params dict[str, Any] | None

Optional parameters for message translation (e.g., {"trial": 3})

None
Source code in src/sortition_algorithms/utils.py
def add_line_and_log(
    self,
    line: str,
    log_level: int,
    message_code: str | None = None,
    message_params: dict[str, Any] | None = None,
) -> None:
    """
    Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

    This method will also log the message to the `user_logger`. This message can be shown to the user as
    the run is happening, so the user has feedback on what is going on while the run is in progress.

    When generating the report we can skip those messages, to avoid duplication. But if the user_logger
    has not been set up to be shown to the user during the run, we do want those messages to be in the
    final report.

    Args:
        line: The English message text (for backward compatibility and standalone use)
        log_level: Logging level for the message
        message_code: Optional translation key for i18n (e.g., "trial_number")
        message_params: Optional parameters for message translation (e.g., {"trial": 3})
    """
    self._data.append(
        RunLineLevel(
            line, ReportLevel.NORMAL, log_level, message_code=message_code, message_params=message_params or {}
        )
    )
    user_logger.log(level=log_level, msg=line)

add_lines(lines, level=ReportLevel.NORMAL)

Add multiple lines of text with the same level.

.. deprecated:: (next version) This method is deprecated. Functions should return RunReport instead of list[str], and callers should use add_report() to merge them. This provides better support for translation and structured reporting.

Source code in src/sortition_algorithms/utils.py
def add_lines(self, lines: Iterable[str], level: ReportLevel = ReportLevel.NORMAL) -> None:
    """
    Add multiple lines of text with the same level.

    .. deprecated:: (next version)
        This method is deprecated. Functions should return RunReport instead of list[str],
        and callers should use add_report() to merge them. This provides better support
        for translation and structured reporting.
    """
    warnings.warn(
        "add_lines() is deprecated. Functions should return RunReport instead of list[str], "
        "and use add_report() to merge them.",
        DeprecationWarning,
        stacklevel=2,
    )
    for line in lines:
        self._data.append(RunLineLevel(line, level))

add_message(code, level=ReportLevel.NORMAL, **params)

Add a translatable message using a message code and parameters.

This is a convenience method that combines get_message() and add_line() in one call, making it simpler to add messages with translation support.

Parameters:

Name Type Description Default
code str

The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")

required
level ReportLevel

Importance level of the message

NORMAL
**params Any

Parameters to substitute into the message template

{}
Example

report.add_message("features_found", count=5) report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)

Source code in src/sortition_algorithms/utils.py
def add_message(self, code: str, level: ReportLevel = ReportLevel.NORMAL, **params: Any) -> None:
    """
    Add a translatable message using a message code and parameters.

    This is a convenience method that combines get_message() and add_line() in one call,
    making it simpler to add messages with translation support.

    Args:
        code: The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")
        level: Importance level of the message
        **params: Parameters to substitute into the message template

    Example:
        >>> report.add_message("features_found", count=5)
        >>> report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)
    """
    message = get_message(code, **params)
    self.add_line(message, level=level, message_code=code, message_params=params)

add_message_and_log(code, log_level, **params)

Add a translatable message using a message code and parameters, and log it.

This is a convenience method that combines get_message() and add_line_and_log() in one call, making it simpler to add messages with translation support that are also logged.

Parameters:

Name Type Description Default
code str

The message code from REPORT_MESSAGES (e.g., "trial_number")

required
log_level int

Logging level for the message

required
**params Any

Parameters to substitute into the message template

{}
Example

report.add_message_and_log("trial_number", logging.WARNING, trial=3) report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)

Source code in src/sortition_algorithms/utils.py
def add_message_and_log(self, code: str, log_level: int, **params: Any) -> None:
    """
    Add a translatable message using a message code and parameters, and log it.

    This is a convenience method that combines get_message() and add_line_and_log() in one call,
    making it simpler to add messages with translation support that are also logged.

    Args:
        code: The message code from REPORT_MESSAGES (e.g., "trial_number")
        log_level: Logging level for the message
        **params: Parameters to substitute into the message template

    Example:
        >>> report.add_message_and_log("trial_number", logging.WARNING, trial=3)
        >>> report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)
    """
    message = get_message(code, **params)
    self.add_line_and_log(message, log_level, message_code=code, message_params=params)

has_content()

    False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report.has_content():
        print(f"Run Report

{run_report.as_text()}") ```

Source code in src/sortition_algorithms/utils.py
def has_content(self) -> bool:
    """
    False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report.has_content():
        print(f"Run Report\n\n{run_report.as_text()}")
    ```
    """
    return bool(self._data)

default_logging_setup()

Set both logger and user_logger to send output to stdout

Source code in src/sortition_algorithms/utils.py
def default_logging_setup() -> tuple[logging.Logger, logging.Logger]:
    """Set both logger and user_logger to send output to stdout"""
    # we have two loggers
    # - user_logger is used for messages that any user should see
    # - logger is used for messages that only a developer or admin should need to see
    user_logger = logging.getLogger("sortition_algorithms_user")
    user_logger.setLevel(logging.INFO)
    if not user_logger.handlers:
        # no set up has been done yet - so we do it here
        user_logger.addHandler(logging.StreamHandler(sys.stdout))
    logger = logging.getLogger("sortition_algorithms")
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        # no set up has been done yet - so we do it here
        # this logger just goes straight to stdout - no timestamps or anything
        logger.addHandler(logging.StreamHandler(sys.stdout))
    return user_logger, logger

get_cell_name(row, col_name, headers)

Given the column_name, get the spreadsheet cell name, eg "A5"

Source code in src/sortition_algorithms/utils.py
def get_cell_name(row: int, col_name: str, headers: Sequence[str]) -> str:
    """Given the column_name, get the spreadsheet cell name, eg "A5" """
    col_index = headers.index(col_name)
    if col_index > 25:
        col1 = ["", *string.ascii_uppercase][col_index // 26]
        col2 = string.ascii_uppercase[col_index % 26]
        col_name = f"{col1}{col2}"
    else:
        col_name = string.ascii_uppercase[col_index]
    return f"{col_name}{row}"

normalise_dict(original)

Wraps a dict, and whenever we get a value from it, we convert to str and strip() whitespace

Source code in src/sortition_algorithms/utils.py
def normalise_dict(original: Mapping[str, str] | Mapping[str, str | int]) -> MutableMapping[str, str]:
    """
    Wraps a dict, and whenever we get a value from it, we convert to str and
    strip() whitespace
    """
    new_dict: MutableMapping[str, str] = CaseInsensitiveDict()
    for key, original_value in original.items():
        new_dict[key] = strip_str_int(original_value)
    return new_dict

override_logging_handlers(user_logger_handlers, logger_handlers)

Replace the default handlers with other ones

Source code in src/sortition_algorithms/utils.py
def override_logging_handlers(
    user_logger_handlers: list[logging.Handler], logger_handlers: list[logging.Handler]
) -> None:
    """Replace the default handlers with other ones"""
    if user_logger_handlers:
        _override_handlers_for(logging.getLogger("sortition_algorithms_user"), user_logger_handlers)
    if logger_handlers:
        _override_handlers_for(logging.getLogger("sortition_algorithms"), logger_handlers)