Skip to content

Modules

Adapters for loading and saving data.

Initially we have CSV files locally, and Google Docs Spreadsheets.

CSVFileDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
class CSVFileDataSource(AbstractDataSource):
    def __init__(
        self,
        *,
        features_file: Path,
        people_file: Path,
        already_selected_file: Path | None,
        selected_file: Path,
        remaining_file: Path,
    ) -> None:
        self.features_file = features_file
        self.people_file = people_file
        self.already_selected_file = already_selected_file
        self.selected_file = selected_file
        self.remaining_file = remaining_file

    @property
    def people_data_container(self) -> str:
        return f" CSV file '{self.people_file}'"

    @property
    def already_selected_data_container(self) -> str:
        return f" CSV file '{self.already_selected_file}'"

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_features_from_file", file_path=str(self.features_file))
        with open(self.features_file, newline="") as csv_file:
            feature_reader = csv.DictReader(csv_file, strict=True)
            assert feature_reader.fieldnames is not None
            yield list(feature_reader.fieldnames), feature_reader

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_people_from_file", file_path=str(self.people_file))
        with open(self.people_file, newline="") as csv_file:
            people_reader = csv.DictReader(csv_file, strict=True)
            assert people_reader.fieldnames is not None
            yield list(people_reader.fieldnames), people_reader

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        if self.already_selected_file is None or not self.already_selected_file.exists():
            report.add_message("no_already_selected_file")
            yield [], []
            return
        report.add_message("loading_already_selected_from_file", file_path=str(self.already_selected_file))
        with open(self.already_selected_file, newline="") as csv_file:
            already_selected_reader = csv.DictReader(csv_file, strict=True)
            assert already_selected_reader.fieldnames is not None
            yield list(already_selected_reader.fieldnames), already_selected_reader

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        report.add_message_and_log("writing_selected_csv", logging.INFO, file_path=self.selected_file)
        with open(self.selected_file, "w", newline="") as csv_file:
            _write_csv_rows(csv_file, selected)

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        report.add_message_and_log("writing_remaining_csv", logging.INFO, file_path=self.remaining_file)
        with open(self.remaining_file, "w", newline="") as csv_file:
            _write_csv_rows(csv_file, remaining)

    def highlight_dupes(self, dupes: list[int]) -> None:
        """Cannot highlight a CSV file"""

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading features from {self.features_file}",
            *[str(e) for e in error.all_errors],
        ])

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from {self.people_file}",
            *[str(e) for e in error.all_errors],
        ])

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading already selected people from {self.already_selected_file}",
            *[str(e) for e in error.all_errors],
        ])

highlight_dupes(dupes)

Cannot highlight a CSV file

Source code in src/sortition_algorithms/adapters.py
def highlight_dupes(self, dupes: list[int]) -> None:
    """Cannot highlight a CSV file"""

CSVStringDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
class CSVStringDataSource(AbstractDataSource):
    def __init__(self, features_data: str, people_data: str, already_selected_data: str = "") -> None:
        self.features_data = features_data
        self.people_data = people_data
        self.already_selected_data = already_selected_data
        self.selected_file = StringIO()
        self.remaining_file = StringIO()
        self.selected_file_written = False
        self.remaining_file_written = False

    @property
    def people_data_container(self) -> str:
        return "people CSV data"

    @property
    def already_selected_data_container(self) -> str:
        return "already selected CSV data"

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_features_from_string")
        feature_reader = csv.DictReader(StringIO(self.features_data), strict=True)
        assert feature_reader.fieldnames is not None
        yield list(feature_reader.fieldnames), feature_reader

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        report.add_message("loading_people_from_string")
        people_reader = csv.DictReader(StringIO(self.people_data), strict=True)
        assert people_reader.fieldnames is not None
        yield list(people_reader.fieldnames), people_reader

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        if not self.already_selected_data or not self.already_selected_data.strip():
            report.add_message("no_already_selected_data")
            yield [], []
            return
        report.add_message("loading_already_selected_from_string")
        already_selected_reader = csv.DictReader(StringIO(self.already_selected_data), strict=True)
        assert already_selected_reader.fieldnames is not None
        yield list(already_selected_reader.fieldnames), already_selected_reader

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        _write_csv_rows(self.selected_file, selected)
        self.selected_file_written = True

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        _write_csv_rows(self.remaining_file, remaining)
        self.remaining_file_written = True

    def highlight_dupes(self, dupes: list[int]) -> None:
        """Cannot highlight a CSV file"""

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        # given the info is in strings, we can't usefully add anything
        return error

highlight_dupes(dupes)

Cannot highlight a CSV file

Source code in src/sortition_algorithms/adapters.py
def highlight_dupes(self, dupes: list[int]) -> None:
    """Cannot highlight a CSV file"""

GSheetDataSource

Bases: AbstractDataSource

Source code in src/sortition_algorithms/adapters.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
class GSheetDataSource(AbstractDataSource):
    scope: ClassVar = [
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/drive",
    ]
    hl_light_blue: ClassVar = {
        "backgroundColor": {
            "red": 153 / 255,
            "green": 204 / 255,
            "blue": 255 / 255,
        }
    }
    hl_orange: ClassVar = {"backgroundColor": {"red": 5, "green": 2.5, "blue": 0}}

    def __init__(
        self,
        *,
        feature_tab_name: str,
        people_tab_name: str,
        already_selected_tab_name: str = "",
        id_column: str = "not_set",
        auth_json_path: Path,
    ) -> None:
        """
        Args:
        - feature_tab_name - the name of the tab/worksheet containing the features (aka categories)
        - people_tab_name - the name of the tab/worksheet containing the people to select from
        - already_selected_tab_name (optional) - the name of the tab/worksheet containing people who have already been selected
        - id_column (optional) - the name of the column containing the ID. Only required if already_selected_tab_name is set
        - auth_json_path - path to the file containing the google service account details.
        """
        self.feature_tab_name = feature_tab_name
        self.people_tab_name = people_tab_name
        self.already_selected_tab_name = already_selected_tab_name
        self.id_column = id_column
        self.auth_json_path = auth_json_path
        self._client: gspread.client.Client | None = None
        self._spreadsheet: gspread.Spreadsheet | None = None
        self.new_tab_default_size_rows = 2
        self.new_tab_default_size_cols = 40
        self._g_sheet_name = ""
        self._open_g_sheet_name = ""
        self.selected_tab_name = ""
        self.remaining_tab_name = ""
        self.tab_namer = GSheetTabNamer()
        self._report = RunReport()

    @property
    def people_data_container(self) -> str:
        return f"'{self.people_tab_name}' tab"

    @property
    def already_selected_data_container(self) -> str:
        return f"'{self.already_selected_tab_name}' tab"

    @property
    def client(self) -> gspread.client.Client:
        if self._client is None:
            creds = ServiceAccountCredentials.from_json_keyfile_name(str(self.auth_json_path), self.scope)
            # if we're getting rate limited, go slower!
            # by using the BackOffHTTPClient, that will sleep and retry
            # if it gets an error related to API usage rate limits.
            self._client = gspread.authorize(creds, http_client=gspread.BackOffHTTPClient)
        return self._client

    @property
    def spreadsheet(self) -> gspread.Spreadsheet:
        if self._open_g_sheet_name != self._g_sheet_name:
            # reset the spreadsheet if the name changed
            self._spreadsheet = None
            self.tab_namer.reset()
        if self._spreadsheet is None:
            if self._g_sheet_name.startswith("https://"):
                self._spreadsheet = self.client.open_by_url(self._g_sheet_name)
            else:
                self._spreadsheet = self.client.open(self._g_sheet_name)
            self._open_g_sheet_name = self._g_sheet_name
            self._report.add_message_and_log("opened_gsheet", logging.INFO, title=self._spreadsheet.title)
        return self._spreadsheet

    def _get_tab(self, tab_name: str) -> gspread.Worksheet | None:
        if not self._g_sheet_name:
            return None
        tab_list = self.spreadsheet.worksheets()
        try:
            return next(tab for tab in tab_list if tab.title == tab_name)
        except StopIteration:
            return None

    def _tab_exists(self, tab_name: str) -> bool:
        return bool(self._get_tab(tab_name))

    def _get_tab_titles(self) -> list[str]:
        if not self._g_sheet_name:
            return []
        return [tab.title for tab in self.spreadsheet.worksheets()]

    def _create_tab(self, tab_name: str) -> gspread.Worksheet:
        return self.spreadsheet.add_worksheet(
            title=tab_name,
            rows=self.new_tab_default_size_rows,
            cols=self.new_tab_default_size_cols,
        )

    def set_g_sheet_name(self, g_sheet_name: str) -> None:
        # if we're changing spreadsheet, reset the spreadsheet object
        if self._g_sheet_name != g_sheet_name:
            self._spreadsheet = None
            self._g_sheet_name = g_sheet_name
            self.tab_namer.reset()

    def get_title(self) -> str:
        try:
            return self.spreadsheet.title
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}."
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

    @contextmanager
    def read_feature_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        try:
            if not self._tab_exists(self.feature_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.feature_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={"tab_name": self.feature_tab_name, "spreadsheet_title": self.spreadsheet.title},
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}."
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err
        tab_features = self.spreadsheet.worksheet(self.feature_tab_name)
        feature_head = tab_features.row_values(1)
        feature_body = _stringify_records(tab_features.get_all_records(expected_headers=[]))
        yield feature_head, feature_body

    @contextmanager
    def read_people_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        try:
            if not self._tab_exists(self.people_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.people_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={"tab_name": self.people_tab_name, "spreadsheet_title": self.spreadsheet.title},
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}. "
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

        tab_people = self.spreadsheet.worksheet(self.people_tab_name)
        # if we don't read this in here we can't check if there are 2 columns with the same name
        people_head = tab_people.row_values(1)
        # the numericise_ignore doesn't convert the phone numbers to ints...
        # 1 Oct 2024: the final argument with expected_headers is to deal with the fact that
        # updated versions of gspread can't cope with duplicate headers
        people_body = _stringify_records(
            tab_people.get_all_records(
                numericise_ignore=["all"],
                expected_headers=[],
            )
        )
        self._report.add_message("reading_gsheet_tab", tab_name=self.people_tab_name)
        yield people_head, people_body

    @staticmethod
    def find_header_row(
        all_values: gspread.ValueRange | list[list[Any]], id_column: str, min_headers: int = 5
    ) -> tuple[int, list[str]]:
        """
        Find the first row that looks like a header row in the worksheet.

        A row is considered a header row if it has at least min_headers non-empty cells.
        This helps skip over title rows, empty rows, etc.

        Args:
            all_values: The values from the worksheet to search
            id_column: The text for the id_column - from settings
            min_headers: Minimum number of non-empty cells to consider a row as headers (default: 3)

        Returns:
            Tuple of (row_number, header_values) where row_number is 1-indexed
        """
        # TODO: some logging of non-empty rows that we skipped
        # Find the first row with enough non-empty cells to be a header row
        for row_idx, row in enumerate(all_values, start=1):
            # Count non-empty cells in this row
            non_empty_cells = _non_empty_cells(row)
            # also check if the id_column is in this row
            if len(non_empty_cells) >= min_headers and id_column in non_empty_cells:
                return row_idx, list(row)

        # If no suitable header row found, return empty
        return 1, []

    @staticmethod
    def get_valid_people_rows(
        entire_sheet: list[list[str]], header_row_num: int, min_values: int = 5
    ) -> Generator[list[str]]:
        """
        Find all rows under the header row that contain at least 3 values.

        Sometimes in the spreadsheet there will be a comment in a cell underneath all the valid rows.
        We should ignore such rows rather than attempt to convert them into an element of People.
        """
        # TODO: some logging of non-empty rows that we skipped
        all_rows_under_header = entire_sheet[header_row_num:]
        for row in all_rows_under_header:
            # Count non-empty cells in this row
            non_empty_cells = _non_empty_cells(row)
            if len(non_empty_cells) > min_values:
                yield row

    @staticmethod
    def check_header_duplicates(already_selected_head: list[str], already_selected_tab_name: str) -> None:
        counts = Counter(already_selected_head)
        # note that we ignore empty strings when checking for duplicates
        header_dupes = [item for item in counts if item and counts[item] > 1]
        if header_dupes:
            msg = f"the header row in the {already_selected_tab_name} tab contains duplicates: {header_dupes}"
            raise SelectionError(
                message=msg,
                error_code="already_selected_duplicate_headers",
                error_params={"tab_name": already_selected_tab_name, "duplicates": ", ".join(header_dupes)},
            )

    @contextmanager
    def read_already_selected_data(
        self, report: RunReport
    ) -> Generator[tuple[Iterable[str], Iterable[dict[str, str]]], None, None]:
        self._report = report
        if not self.already_selected_tab_name:
            # If no tab name provided, return empty data
            self._report.add_message("no_already_selected_tab")
            yield [], []
            return

        try:
            if not self._tab_exists(self.already_selected_tab_name):
                msg = (
                    f"Error in Google sheet: no tab called '{self.already_selected_tab_name}' "
                    f"found in spreadsheet '{self.spreadsheet.title}'."
                )
                raise SelectionError(
                    message=msg,
                    error_code="tab_not_found",
                    error_params={
                        "tab_name": self.already_selected_tab_name,
                        "spreadsheet_title": self.spreadsheet.title,
                    },
                )
        except gspread.SpreadsheetNotFound as err:
            msg = f"Google spreadsheet not found: {self._g_sheet_name}. "
            raise SelectionError(
                message=msg, error_code="spreadsheet_not_found", error_params={"spreadsheet_name": self._g_sheet_name}
            ) from err

        tab_already_selected = self.spreadsheet.worksheet(self.already_selected_tab_name)

        # Get all values from the sheet
        all_values = tab_already_selected.get(value_render_option=ValueRenderOption.unformatted, pad_values=True)

        # Find which row contains the headers
        header_row_num, already_selected_head = self.find_header_row(all_values, self.id_column)

        # If no header row found or it's empty, return empty data
        if not already_selected_head or not any(cell.strip() for cell in already_selected_head):
            self._report.add_line(
                f"Tab '{self.already_selected_tab_name}' is empty or has no valid header row, using empty data."
            )
            yield [], []
            return

        self.check_header_duplicates(already_selected_head, self.already_selected_tab_name)

        already_selected_body = _stringify_records([
            dict(zip(already_selected_head, row, strict=False))
            for row in self.get_valid_people_rows(all_values, header_row_num)
        ])
        self._report.add_message(
            "reading_already_selected_tab", tab_name=self.already_selected_tab_name, header_row=header_row_num
        )
        yield already_selected_head, already_selected_body

    def write_selected(self, selected: list[list[str]], report: RunReport) -> None:
        self.tab_namer.find_unused_tab_suffix(self._get_tab_titles())
        tab_selected = self._create_tab(self.tab_namer.selected_tab_name())
        report.add_message_and_log("writing_selected_tab", logging.INFO, tab_name=tab_selected.title)
        self.selected_tab_name = tab_selected.title
        tab_selected.update(selected)
        tab_selected.format("A1:U1", self.hl_light_blue)
        user_logger.info(f"Selected people written to {tab_selected.title} tab")

    def write_remaining(self, remaining: list[list[str]], report: RunReport) -> None:
        # the number is selected during write_selected(), so we reuse it here
        tab_remaining = self._create_tab(self.tab_namer.remaining_tab_name())
        report.add_message_and_log("writing_remaining_tab", logging.INFO, tab_name=tab_remaining.title)
        self.remaining_tab_name = tab_remaining.title
        tab_remaining.update(remaining)
        tab_remaining.format("A1:U1", self.hl_light_blue)

    def highlight_dupes(self, dupes: list[int]) -> None:
        if not dupes:
            return
        tab_remaining = self._get_tab(self.tab_namer.remaining_tab_name())
        assert tab_remaining is not None, "highlight_dupes() has been called without first calling write_remaining()"
        # note that the indexes we have produced start at 0, but the row indexes start at 1
        # so we need to add 1 to the indexes.
        row_strings = [f"A{index + 1}:U{index + 1}" for index in dupes]
        tab_remaining.format(row_strings, self.hl_orange)

    def delete_old_output_tabs(self, dry_run: bool = False) -> list[str]:
        """
        Find and delete all tabs with names starting with the tab stubs for selected or remaining

        Args:
            dry_run: If True, report what would be deleted without actually deleting.

        Returns:
            List of tab names that were deleted (or would be deleted in dry_run mode).
        """
        if not self._g_sheet_name:
            return []

        all_tabs = self.spreadsheet.worksheets()
        tabs_to_delete: list[gspread.Worksheet] = []

        for tab in all_tabs:
            if self.tab_namer.matches_stubs(tab.title):
                tabs_to_delete.append(tab)

        deleted_names: list[str] = []
        for tab in tabs_to_delete:
            deleted_names.append(tab.title)
            if not dry_run:
                self.spreadsheet.del_worksheet(tab)

        return deleted_names

    def _annotate_parse_errors_with_cell_names(self, error: ParseTableMultiError, headers: Sequence[str]) -> list[str]:
        msgs: list[str] = []
        for sub_error in error.all_errors:
            if isinstance(sub_error, ParseTableErrorMsg):
                cell_name = get_cell_name(sub_error.row, sub_error.key, headers)
                msgs.append(f"{sub_error.msg} - see cell {cell_name}")
            else:
                cell_names = [get_cell_name(sub_error.row, key, headers) for key in sub_error.keys]
                msgs.append(f"{sub_error.msg} - see cells {' '.join(cell_names)}")
        return msgs

    def customise_features_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading features from '{self.feature_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

    def customise_people_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from '{self.people_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

    def customise_already_selected_parse_error(
        self, error: ParseTableMultiError, headers: Sequence[str]
    ) -> SelectionMultilineError:
        return SelectionMultilineError([
            f"Parser error(s) while reading people from '{self.already_selected_tab_name}' worksheet",
            *self._annotate_parse_errors_with_cell_names(error, headers),
        ])

__init__(*, feature_tab_name, people_tab_name, already_selected_tab_name='', id_column='not_set', auth_json_path)

Args: - feature_tab_name - the name of the tab/worksheet containing the features (aka categories) - people_tab_name - the name of the tab/worksheet containing the people to select from - already_selected_tab_name (optional) - the name of the tab/worksheet containing people who have already been selected - id_column (optional) - the name of the column containing the ID. Only required if already_selected_tab_name is set - auth_json_path - path to the file containing the google service account details.

Source code in src/sortition_algorithms/adapters.py
def __init__(
    self,
    *,
    feature_tab_name: str,
    people_tab_name: str,
    already_selected_tab_name: str = "",
    id_column: str = "not_set",
    auth_json_path: Path,
) -> None:
    """
    Args:
    - feature_tab_name - the name of the tab/worksheet containing the features (aka categories)
    - people_tab_name - the name of the tab/worksheet containing the people to select from
    - already_selected_tab_name (optional) - the name of the tab/worksheet containing people who have already been selected
    - id_column (optional) - the name of the column containing the ID. Only required if already_selected_tab_name is set
    - auth_json_path - path to the file containing the google service account details.
    """
    self.feature_tab_name = feature_tab_name
    self.people_tab_name = people_tab_name
    self.already_selected_tab_name = already_selected_tab_name
    self.id_column = id_column
    self.auth_json_path = auth_json_path
    self._client: gspread.client.Client | None = None
    self._spreadsheet: gspread.Spreadsheet | None = None
    self.new_tab_default_size_rows = 2
    self.new_tab_default_size_cols = 40
    self._g_sheet_name = ""
    self._open_g_sheet_name = ""
    self.selected_tab_name = ""
    self.remaining_tab_name = ""
    self.tab_namer = GSheetTabNamer()
    self._report = RunReport()

delete_old_output_tabs(dry_run=False)

Find and delete all tabs with names starting with the tab stubs for selected or remaining

Parameters:

Name Type Description Default
dry_run bool

If True, report what would be deleted without actually deleting.

False

Returns:

Type Description
list[str]

List of tab names that were deleted (or would be deleted in dry_run mode).

Source code in src/sortition_algorithms/adapters.py
def delete_old_output_tabs(self, dry_run: bool = False) -> list[str]:
    """
    Find and delete all tabs with names starting with the tab stubs for selected or remaining

    Args:
        dry_run: If True, report what would be deleted without actually deleting.

    Returns:
        List of tab names that were deleted (or would be deleted in dry_run mode).
    """
    if not self._g_sheet_name:
        return []

    all_tabs = self.spreadsheet.worksheets()
    tabs_to_delete: list[gspread.Worksheet] = []

    for tab in all_tabs:
        if self.tab_namer.matches_stubs(tab.title):
            tabs_to_delete.append(tab)

    deleted_names: list[str] = []
    for tab in tabs_to_delete:
        deleted_names.append(tab.title)
        if not dry_run:
            self.spreadsheet.del_worksheet(tab)

    return deleted_names

find_header_row(all_values, id_column, min_headers=5) staticmethod

Find the first row that looks like a header row in the worksheet.

A row is considered a header row if it has at least min_headers non-empty cells. This helps skip over title rows, empty rows, etc.

Parameters:

Name Type Description Default
all_values ValueRange | list[list[Any]]

The values from the worksheet to search

required
id_column str

The text for the id_column - from settings

required
min_headers int

Minimum number of non-empty cells to consider a row as headers (default: 3)

5

Returns:

Type Description
tuple[int, list[str]]

Tuple of (row_number, header_values) where row_number is 1-indexed

Source code in src/sortition_algorithms/adapters.py
@staticmethod
def find_header_row(
    all_values: gspread.ValueRange | list[list[Any]], id_column: str, min_headers: int = 5
) -> tuple[int, list[str]]:
    """
    Find the first row that looks like a header row in the worksheet.

    A row is considered a header row if it has at least min_headers non-empty cells.
    This helps skip over title rows, empty rows, etc.

    Args:
        all_values: The values from the worksheet to search
        id_column: The text for the id_column - from settings
        min_headers: Minimum number of non-empty cells to consider a row as headers (default: 3)

    Returns:
        Tuple of (row_number, header_values) where row_number is 1-indexed
    """
    # TODO: some logging of non-empty rows that we skipped
    # Find the first row with enough non-empty cells to be a header row
    for row_idx, row in enumerate(all_values, start=1):
        # Count non-empty cells in this row
        non_empty_cells = _non_empty_cells(row)
        # also check if the id_column is in this row
        if len(non_empty_cells) >= min_headers and id_column in non_empty_cells:
            return row_idx, list(row)

    # If no suitable header row found, return empty
    return 1, []

get_valid_people_rows(entire_sheet, header_row_num, min_values=5) staticmethod

Find all rows under the header row that contain at least 3 values.

Sometimes in the spreadsheet there will be a comment in a cell underneath all the valid rows. We should ignore such rows rather than attempt to convert them into an element of People.

Source code in src/sortition_algorithms/adapters.py
@staticmethod
def get_valid_people_rows(
    entire_sheet: list[list[str]], header_row_num: int, min_values: int = 5
) -> Generator[list[str]]:
    """
    Find all rows under the header row that contain at least 3 values.

    Sometimes in the spreadsheet there will be a comment in a cell underneath all the valid rows.
    We should ignore such rows rather than attempt to convert them into an element of People.
    """
    # TODO: some logging of non-empty rows that we skipped
    all_rows_under_header = entire_sheet[header_row_num:]
    for row in all_rows_under_header:
        # Count non-empty cells in this row
        non_empty_cells = _non_empty_cells(row)
        if len(non_empty_cells) > min_values:
            yield row

generate_dupes(people_remaining_rows, people_selected_rows, settings, already_selected=None)

Generate a list of indexes of people who share an address with someone else in this set of rows.

Note that the first row of people_remaining_rows is the column headers. The indexes generated are for the rows in this table, so the index takes account of the first row being the header.

So if we had people_remaining_rows:

id,name,address_line_1,postcode 1,Alice,33 Acacia Avenue,W1A 1AA 1,Bob,31 Acacia Avenue,W1A 1AA 1,Charlotte,33 Acacia Avenue,W1A 1AA 1,David,33 Acacia Avenue,W1B 1BB

And settings with check_same_address_columns = ["address_line_1", "postcode"]

Then we should return [1, 3]

Source code in src/sortition_algorithms/adapters.py
def generate_dupes(
    people_remaining_rows: list[list[str]],
    people_selected_rows: list[list[str]],
    settings: Settings,
    already_selected: People | None = None,
) -> list[int]:
    """
    Generate a list of indexes of people who share an address with someone else in this set of rows.

    Note that the first row of people_remaining_rows is the column headers.  The indexes generated
    are for the rows in this table, so the index takes account of the first row being the header.

    So if we had people_remaining_rows:

    id,name,address_line_1,postcode
    1,Alice,33 Acacia Avenue,W1A 1AA
    1,Bob,31 Acacia Avenue,W1A 1AA
    1,Charlotte,33 Acacia Avenue,W1A 1AA
    1,David,33 Acacia Avenue,W1B 1BB

    And settings with `check_same_address_columns = ["address_line_1", "postcode"]`

    Then we should return [1, 3]
    """
    if not settings.check_same_address:
        return []

    table_col_names = people_remaining_rows[0]
    address_col_indexes: list[int] = [
        index for index, col in enumerate(table_col_names) if col in settings.check_same_address_columns
    ]

    def _address_from_row(person: list[str]) -> tuple[str, ...]:
        return tuple(col.lower() for col_index, col in enumerate(person) if col_index in address_col_indexes)

    # first, we assemble a dict with the key being the address, the value being the list of
    # indexes of people at that address
    address_remaining_index: dict[tuple[str, ...], list[int]] = defaultdict(list)
    for person_index, person in enumerate(people_remaining_rows[1:], start=1):  # skip the header row
        address_remaining_index[_address_from_row(person)].append(person_index)

    # now extract all those people where the number of people at their address is more than one
    dupes: set[int] = set()
    for persons_at_address in address_remaining_index.values():
        if len(persons_at_address) > 1:
            dupes.update(persons_at_address)

    # Now we assemble the list of all selected addresses.
    already_selected_addresses: set[tuple[str, ...]] = set()
    # First those selected in this round of selection
    for person in people_selected_rows[1:]:  # skip the header row
        already_selected_addresses.add(_address_from_row(person))
    # Then those previously selected (if supplied)
    if already_selected:
        for selected_key in already_selected:
            already_selected_addresses.add(
                already_selected.get_address(selected_key, settings.check_same_address_columns)
            )

    # and check if any of the remaining people has an address matching
    # those selected in this round or previous rounds
    for person_index, person in enumerate(people_remaining_rows[1:], start=1):  # skip the header row
        if _address_from_row(person) in already_selected_addresses:
            dupes.add(person_index)

    return sorted(dupes)

find_any_committee(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find any single feasible committee that satisfies the quotas.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns to check for same address, or empty list if not checking addresses.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
tuple[list[frozenset[str]], RunReport]

tuple of (list containing one committee as frozenset of person_ids, empty report)

Raises:

Type Description
InfeasibleQuotasError

If quotas are infeasible

SelectionError

If solver fails for other reasons

Source code in src/sortition_algorithms/committee_generation/__init__.py
def find_any_committee(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], RunReport]:
    """Find any single feasible committee that satisfies the quotas.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns to check for same address, or empty list if
                                    not checking addresses.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (list containing one committee as frozenset of person_ids, empty report)

    Raises:
        InfeasibleQuotasError: If quotas are infeasible
        SelectionError: If solver fails for other reasons
    """
    solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )
    committee = ilp_results_to_committee(solver, agent_vars)
    return [committee], RunReport()

find_distribution_leximin(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the third-lowest probability and so forth.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list Note: The dual LP still uses Gurobi.

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

Raises:

Type Description
RuntimeError

If Gurobi is not available

Source code in src/sortition_algorithms/committee_generation/leximin.py
def find_distribution_leximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected
    (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the
    third-lowest probability and so forth.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list
                        Note: The dual LP still uses Gurobi.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    Raises:
        RuntimeError: If Gurobi is not available
    """
    if not GUROBI_AVAILABLE:
        msg = "Leximin algorithm requires Gurobi solver which is not available"
        raise RuntimeError(msg, "gurobi_not_available", {})

    report = RunReport()
    report.add_message_and_log("using_leximin_algorithm", logging.INFO)
    grb.setParam("OutputFlag", 0)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_report = generate_initial_committees(
        new_committee_solver, agent_vars, 3 * people.count
    )
    report.add_report(initial_report)

    # Run the main leximin optimization loop to fix agent probabilities
    fixed_probabilities = _run_leximin_main_loop(new_committee_solver, agent_vars, committees, people, report)

    # Convert fixed agent probabilities to committee probabilities
    probabilities_normalised = _solve_leximin_primal_for_final_probabilities(committees, fixed_probabilities)

    return list(committees), probabilities_normalised, report

find_distribution_maximin(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings
Source code in src/sortition_algorithms/committee_generation/maximin.py
def find_distribution_maximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings
    """
    report = RunReport()
    report.add_message_and_log("using_maximin_algorithm", logging.INFO)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, init_report = generate_initial_committees(
        new_committee_solver, agent_vars, people.count
    )
    report.add_report(init_report)

    # Set up the incremental LP model for column generation
    incremental_solver, incr_agent_vars, upper_bound_var = _setup_maximin_incremental_model(
        committees, covered_agents, solver_backend
    )

    # Run the main optimization loop
    return _run_maximin_optimization_loop(
        new_committee_solver,
        agent_vars,
        incremental_solver,
        incr_agent_vars,
        upper_bound_var,
        committees,
        covered_agents,
        report,
        solver_backend,
    )

find_distribution_nash(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of selection probabilities over all persons.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.

Source code in src/sortition_algorithms/committee_generation/nash.py
def find_distribution_nash(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of
    selection probabilities over all persons.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing
    log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and
    this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.
    """
    report = RunReport()
    report.add_message_and_log("using_nash_algorithm", logging.INFO)

    # Set up an ILP used for discovering new feasible committees
    solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that include every possible agent
    committee_set, covered_agents, initial_report = generate_initial_committees(solver, agent_vars, 2 * people.count)
    committees = list(committee_set)
    report.add_report(initial_report)

    # Map the covered agents to indices in a list for easier matrix representation
    entitlements, contributes_to_entitlement = _define_entitlements(covered_agents)

    # Run the main Nash welfare optimization loop
    return _run_nash_optimization_loop(
        solver,
        agent_vars,
        committees,
        entitlements,
        contributes_to_entitlement,
        covered_agents,
        number_people_wanted,
        report,
    )

find_random_sample_legacy(people, features, number_people_wanted, check_same_address_columns=None)

Legacy stratified random selection algorithm.

Implements the original algorithm that uses greedy selection based on priority ratios. Always selects from the most urgently needed category first (highest ratio of (min-selected)/remaining), then randomly picks within that category.

Parameters:

Name Type Description Default
people People

People collection

required
features FeatureCollection

Feature definitions with min/max targets

required
number_people_wanted int

Number of people to select

required
check_same_address_columns list[str] | None

Address columns for household identification, or empty if no address checking to be done.

None

Returns:

Type Description
list[frozenset[str]]

Tuple of (selected_committees, output_messages) where:

RunReport
  • selected_committees: List containing one frozenset of selected person IDs
tuple[list[frozenset[str]], RunReport]
  • report: report containing log messages about the selection process

Raises:

Type Description
SelectionError

If selection becomes impossible (not enough people, etc.)

Source code in src/sortition_algorithms/committee_generation/legacy.py
def find_random_sample_legacy(
    people: People,
    features: FeatureCollection,
    number_people_wanted: int,
    check_same_address_columns: list[str] | None = None,
) -> tuple[list[frozenset[str]], RunReport]:
    """
    Legacy stratified random selection algorithm.

    Implements the original algorithm that uses greedy selection based on priority ratios.
    Always selects from the most urgently needed category first (highest ratio of
    (min-selected)/remaining), then randomly picks within that category.

    Args:
        people: People collection
        features: Feature definitions with min/max targets
        number_people_wanted: Number of people to select
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        Tuple of (selected_committees, output_messages) where:
        - selected_committees: List containing one frozenset of selected person IDs
        - report: report containing log messages about the selection process

    Raises:
        SelectionError: If selection becomes impossible (not enough people, etc.)
    """
    report = RunReport()
    report.add_message("using_legacy_algorithm")
    people_selected: set[str] = set()

    # Create PeopleFeatures and initialize
    people_features = PeopleFeatures(people, features, check_same_address_columns or [])
    people_features.update_all_features_remaining()
    people_features.prune_for_feature_max_0()

    # Main selection loop
    for count in range(number_people_wanted):
        # Find the category with highest priority ratio
        try:
            ratio_result = people_features.find_max_ratio_category()
        except errors.SelectionError as e:
            msg = f"Selection failed on iteration {count + 1}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Find the randomly selected person within that category
        target_feature = ratio_result.feature_name
        target_value = ratio_result.feature_value
        random_position = ratio_result.random_person_index

        selected_person_key = people_features.people.find_person_by_position_in_category(
            target_feature, target_value, random_position
        )

        # Should never select the same person twice
        assert selected_person_key not in people_selected, f"Person {selected_person_key} was already selected"

        # Select the person (this also removes household members if configured)
        people_selected.add(selected_person_key)
        selected_person_data = people_features.people.get_person_dict(selected_person_key)
        household_members_removed = people_features.select_person(selected_person_key)

        # Add output messages about household member removal
        if household_members_removed:
            report.add_line(
                f"Selected {selected_person_key}, also removed household members: "
                f"{', '.join(household_members_removed)}"
            )

        # Handle any categories that are now full after this selection
        try:
            category_report = people_features.handle_category_full_deletions(selected_person_data)
            report.add_report(category_report)
        except errors.SelectionError as e:
            msg = f"Selection failed after selecting {selected_person_key}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Check if we're about to run out of people (but not on the last iteration)
        if count < (number_people_wanted - 1) and people_features.people.count == 0:
            msg = "Selection failed: Ran out of people before completing selection"
            raise errors.RetryableSelectionError(msg)

    # Return in legacy format: list containing single frozenset
    return [frozenset(people_selected)], report

standardize_distribution(committees, probabilities)

Remove committees with zero probability and renormalize.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities

required

Returns:

Type Description
tuple[list[frozenset[str]], list[float]]

tuple of (filtered_committees, normalized_probabilities)

Source code in src/sortition_algorithms/committee_generation/__init__.py
def standardize_distribution(
    committees: list[frozenset[str]],
    probabilities: list[float],
) -> tuple[list[frozenset[str]], list[float]]:
    """Remove committees with zero probability and renormalize.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities

    Returns:
        tuple of (filtered_committees, normalized_probabilities)
    """
    assert len(committees) == len(probabilities)
    new_committees = []
    new_probabilities = []
    for committee, prob in zip(committees, probabilities, strict=False):
        if prob >= EPS2:
            new_committees.append(committee)
            new_probabilities.append(prob)
    prob_sum = sum(new_probabilities)
    new_probabilities = [prob / prob_sum for prob in new_probabilities]
    return new_committees, new_probabilities

generate_initial_committees(solver, agent_vars, multiplicative_weights_rounds)

To speed up the main iteration of the maximin and Nash algorithms, start from a diverse set of feasible committees. In particular, each agent that can be included in any committee will be included in at least one of these committees.

Parameters:

Name Type Description Default
solver Solver

Solver for finding committees

required
agent_vars dict[str, Any]

dict mapping agent_id to binary variables

required
multiplicative_weights_rounds int

number of rounds for the multiplicative weights phase

required

Returns:

Type Description
set[frozenset[str]]

tuple of (committees, covered_agents, output_lines)

frozenset[str]
  • committees: set of feasible committees discovered
RunReport
  • covered_agents: frozenset of all agents included in some committee
tuple[set[frozenset[str]], frozenset[str], RunReport]
  • report: run report
tuple[set[frozenset[str]], frozenset[str], RunReport]
  • output_lines: list of debug messages
Source code in src/sortition_algorithms/committee_generation/common.py
def generate_initial_committees(
    solver: Solver,
    agent_vars: dict[str, Any],
    multiplicative_weights_rounds: int,
) -> tuple[set[frozenset[str]], frozenset[str], RunReport]:
    """To speed up the main iteration of the maximin and Nash algorithms, start from a diverse set of feasible
    committees. In particular, each agent that can be included in any committee will be included in at least one of
    these committees.

    Args:
        solver: Solver for finding committees
        agent_vars: dict mapping agent_id to binary variables
        multiplicative_weights_rounds: number of rounds for the multiplicative weights phase

    Returns:
        tuple of (committees, covered_agents, output_lines)
        - committees: set of feasible committees discovered
        - covered_agents: frozenset of all agents included in some committee
        - report: run report
        - output_lines: list of debug messages
    """
    report = RunReport()

    # Phase 1: Use multiplicative weights algorithm to find diverse committees
    committees, covered_agents = _run_multiplicative_weights_phase(solver, agent_vars, multiplicative_weights_rounds)

    # Phase 2: Find committees for any agents not yet covered
    additional_committees, covered_agents, coverage_report = _find_committees_for_uncovered_agents(
        solver, agent_vars, covered_agents
    )
    committees.update(additional_committees)
    report.add_report(coverage_report)

    # Validation and final output
    assert len(committees) >= 1  # We assume quotas are feasible at this stage

    if len(covered_agents) == len(agent_vars):
        report.add_message_and_log("all_agents_in_feasible_committees", logging.INFO)

    return committees, frozenset(covered_agents), report

ilp_results_to_committee(solver, variables)

Extract the selected committee from ILP solver variables.

Parameters:

Name Type Description Default
solver Solver

Solver with solved model

required
variables dict[str, Any]

dict mapping person_id to binary variables

required

Returns:

Type Description
frozenset[str]

frozenset of person_ids who are selected (have variable value > 0.5)

Raises:

Type Description
ValueError

If variables don't have values (solver failed)

Source code in src/sortition_algorithms/committee_generation/common.py
def ilp_results_to_committee(solver: Solver, variables: dict[str, Any]) -> frozenset[str]:
    """Extract the selected committee from ILP solver variables.

    Args:
        solver: Solver with solved model
        variables: dict mapping person_id to binary variables

    Returns:
        frozenset of person_ids who are selected (have variable value > 0.5)

    Raises:
        ValueError: If variables don't have values (solver failed)
    """
    try:
        committee = frozenset(person_id for person_id in variables if solver.get_value(variables[person_id]) > 0.5)
    # unfortunately, solvers sometimes throw generic Exceptions rather than a subclass
    except Exception as error:
        msg = f"It seems like some variables do not have a value. Original exception: {error}."
        raise SelectionError(
            message=msg, error_code="variables_without_value", error_params={"error": str(error)}
        ) from error

    return committee

setup_committee_generation(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Set up the integer linear program for committee generation.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns to check for same address, or empty list if not checking addresses.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
tuple[Solver, dict[str, Any]]

tuple of (Solver, dict mapping person_id to binary variables)

Raises:

Type Description
InfeasibleQuotasError

If quotas are infeasible, includes suggested relaxations

SelectionError

If solver fails for other reasons

Source code in src/sortition_algorithms/committee_generation/common.py
def setup_committee_generation(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[Solver, dict[str, Any]]:
    """Set up the integer linear program for committee generation.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns to check for same address, or empty list if
                                    not checking addresses.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (Solver, dict mapping person_id to binary variables)

    Raises:
        InfeasibleQuotasError: If quotas are infeasible, includes suggested relaxations
        SelectionError: If solver fails for other reasons
    """
    solver = create_solver(backend=solver_backend)

    # Binary variable for each person (selected/not selected)
    agent_vars = {person_id: solver.add_binary_var() for person_id in people}

    # Must select exactly the desired number of people
    solver.add_constr(solver_sum(agent_vars.values()) == number_people_wanted)

    # Respect min/max quotas for each feature value
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
        # Count people with this feature-value who are selected
        number_feature_value_agents = solver_sum(
            agent_vars[person_id]
            for person_id, person_data in people.items()
            if person_data[feature_name].lower() == fvalue_name.lower()
        )

        # Add min/max constraints
        solver.add_constr(number_feature_value_agents >= fv_minmax.min)
        solver.add_constr(number_feature_value_agents <= fv_minmax.max)

    # Household constraints: at most 1 person per household
    if check_same_address_columns:
        for housemates in people.households(check_same_address_columns).values():
            if len(housemates) > 1:
                solver.add_constr(solver_sum(agent_vars[member_id] for member_id in housemates) <= 1)

    # Test feasibility by optimizing once (objective doesn't matter for feasibility check)
    solver.set_objective(solver_sum(agent_vars.values()), SolverSense.MAXIMIZE)
    status = solver.optimize()
    if status == SolverStatus.INFEASIBLE:
        relaxed_features, output_lines = _relax_infeasible_quotas(
            features, people, number_people_wanted, check_same_address_columns, solver_backend=solver_backend
        )
        raise errors.InfeasibleQuotasError(relaxed_features, output_lines)
    if status != SolverStatus.OPTIMAL:
        msg = f"No feasible committees found, solver returns status {status}."
        raise errors.SelectionError(msg)

    return solver, agent_vars

DiversityOptimizer

Source code in src/sortition_algorithms/committee_generation/diversimax.py
class DiversityOptimizer:
    def __init__(
        self,
        people: People,
        features: FeatureCollection,
        panel_size: int,
        check_same_address_columns: list[str],
        solver_backend: str = DEFAULT_BACKEND,
    ):
        if not DIVERSIMAX_AVAILABLE:
            msg = "Diversimax algorithm requires the optional 'diversimax' dependencies, which are not available"
            raise RuntimeError(msg, "diversimax_not_available", {})

        self.people = people
        # convert people to dataframe
        people_df = pd.DataFrame.from_dict(dict(people.items()), orient="index")
        people_df = people_df.rename(columns=str.lower)
        people_df = people_df.map(lambda x: x.lower() if isinstance(x, str) else x)  # normalize to lower case

        self.pool_members_df = people_df[(k.lower() for k in features)]  # keep only feature columns
        self.features = features
        self.panel_size = panel_size
        self.check_same_address_columns = check_same_address_columns
        self.solver_backend = solver_backend
        self.intersections_data: AllIntersectionsData = self.prepare_all_data()
        self.all_ohe = self.create_all_one_hot_encodings()

    @staticmethod
    def _intersection_name(category_names: Iterable[str]) -> str:
        return "__".join(category_names)

    def _prepare_features_data(self, features_names: InteractionNamesTuple) -> IntersectionData:
        """
        Prepares the data for a given set of features.
        Makes all the possible intersections between the features' category names (e.g. male__18-24__highschool)
        Then for each person, figures out which intersection they belong to.
        """
        # Take all the combinations between the features' category names
        all_combinations = itertools.product(*[self.pool_members_df[d].unique() for d in features_names])
        intersections_names = [self._intersection_name(combination) for combination in all_combinations]
        # make the member value (join with __) for each person
        each_member_value = (
            self.pool_members_df.loc[:, features_names]
            .apply(lambda row: self._intersection_name([row[d] for d in features_names]), axis=1)
            .to_numpy()
        )
        return IntersectionData(intersections_names=intersections_names, intersection_member_values=each_member_value)

    def prepare_all_data(self) -> AllIntersectionsData:
        """
        For each combination of features, prepares the intersection data.
        The data is a dict where key is the combination of category names (e.g. (age, income, education level))
        and value is the IntersectionData for that combination.
        all_dims_combs is a list of all the intersections of all categories of all features.
        (e.g. [(age,), (income,), (education level,), (income, education level), (age, income, education level)])
        """
        all_dims_combs_iterators = [
            itertools.combinations(self.pool_members_df.columns, r=i)
            for i in range(1, self.pool_members_df.shape[1] + 1)
        ]
        all_dims_combs: list[InteractionNamesTuple] = [x for y in all_dims_combs_iterators for x in y]
        # randomize order within each size, so if we cutoff intersections at len(all_ohe) >= 50, we get a random sample
        rng = np.random.default_rng(random_provider().randint())
        rng.shuffle(all_dims_combs)
        all_dims_combs = sorted(all_dims_combs, key=len)
        data: dict[InteractionNamesTuple, IntersectionData] = {}
        for features_intersections in all_dims_combs:
            try:
                data[features_intersections] = self._prepare_features_data(features_intersections)
            except Exception as e:
                raise Exception(f"Error with {features_intersections}") from e
        return AllIntersectionsData(data=data, all_dims_combs=all_dims_combs)

    def create_all_one_hot_encodings(self) -> list[np.ndarray]:
        """
        For every intersection of features - one hot encode who is in which intersection.
        Rows are the different people. Columns are the different possible intersections.
        The values are 0/1 if the person is in that intersection or not.
        i.e. The number of columns is the number of combinations we have between the features: product of their sizes.
        """
        all_ohe: list[np.ndarray] = []
        for dims in self.intersections_data.all_dims_combs:
            intersection_data: IntersectionData = self.intersections_data.data[dims]
            possible_profiles = intersection_data.intersections_names
            # When there are too many intersections, each will have 0/1 people, and optimization is pointless + too slow
            if len(possible_profiles) > self.panel_size:
                continue
            if len(self.pool_members_df) / len(possible_profiles) < 2:
                continue
            if len(all_ohe) >= 50:  # limit to 50 different intersections to avoid too much computation
                break

            ohe = OneHotEncoder(categories=[possible_profiles], sparse_output=False)
            ohe_values = ohe.fit_transform(intersection_data.intersection_member_values.reshape(-1, 1))
            all_ohe.append(ohe_values)
        return all_ohe

    def optimize(self, max_seconds: int = 30, accepted_gap: float = 0.1) -> tuple[SolverStatus, frozenset[str], float]:
        """
        Uses solver to optimize based on the categories constraints

        For the optimization goal, for every dims intersection:
        Take the one hot encoded of who is in which intersection for these dims
        Take the binary vector of who is selected and multiply and sum to get the sizes of each intersection of categories
        Figure out the "best" value - if all intersections were of equal size
        Take the abs for each intersection from that value
        Minimize sum of abs

        Returns:
            tuple of (status, selected_ids, gap)
        """
        df = self.pool_members_df
        solver = create_solver(backend=self.solver_backend, time_limit=max_seconds, mip_gap=accepted_gap)

        # binary variable for each person - if they are selected or not
        # We store them by name so we can look them up later
        model_variables = {}
        for person_id in df.index:
            model_variables[str(person_id)] = solver.add_binary_var(name=str(person_id))
        model_variables_list = list(model_variables.values())

        # Household constraints: at most 1 person per household
        if self.check_same_address_columns:
            for housemates in self.people.households(self.check_same_address_columns).values():
                if len(housemates) > 1:
                    solver.add_constr(solver_sum(model_variables[str(member_id)] for member_id in housemates) <= 1)

        # the sum of all people in each category must be between the min and max specified
        for feature_name, value_name, fv_minmax in iterate_feature_collection(self.features):
            relevant_member_vars = [
                model_variables[str(person_id)]
                for person_id in df.index
                if df.loc[person_id, feature_name.lower()] == value_name.lower()
            ]
            rel_sum = solver_sum(relevant_member_vars)
            solver.add_constr(rel_sum >= fv_minmax.min)
            solver.add_constr(rel_sum <= fv_minmax.max)
        solver.add_constr(solver_sum(model_variables_list) == self.panel_size)  # cannot exceed panel size

        # define the optimization goal
        all_objectives = []
        for ohe in self.all_ohe:  # for every set of intersections (one hot encoded) of features
            # how many selected to each intersection
            # We need to build expressions for each intersection column
            for col_idx in range(ohe.shape[1]):
                # For this intersection column, sum up the binary vars weighted by ohe membership
                intersection_size = solver_sum(
                    model_variables_list[row_idx] * ohe[row_idx, col_idx]
                    for row_idx in range(len(model_variables_list))
                )
                # the best value is if all intersections were equal size
                best_val = self.panel_size / ohe.shape[1]

                # set support variable that is the abs diff from intersection size to best_val
                abs_diff = solver.add_continuous_var(lb=0.0, ub=float("inf"))
                # constrain this support variable to be the abs diff from the intersection size
                solver.add_constr(abs_diff >= (intersection_size - best_val))
                solver.add_constr(abs_diff >= (best_val - intersection_size))

                all_objectives.append(abs_diff)

        obj = solver_sum(all_objectives)
        solver.set_objective(obj, SolverSense.MINIMIZE)

        logger.info(f"Diversimax: {len(model_variables)} binary vars, {len(all_objectives)} abs-diff vars")

        status = solver.optimize()
        gap = solver.get_gap()

        if status in [SolverStatus.OPTIMAL, SolverStatus.FEASIBLE]:
            selected_ids = []
            for person_id in df.index:
                var = solver.get_var_by_name(str(person_id))
                if var is not None and solver.get_value(var) >= 0.99:  # ==1, for numerical stability
                    selected_ids.append(person_id)
            selected_ids_set = frozenset(selected_ids)
            return status, selected_ids_set, gap
        return status, frozenset(), gap

    def log_problem_stats(self) -> None:
        logger.info(f"Features: {len(self.pool_members_df.columns)}")
        logger.info(f"Feature combinations considered: {len(self.intersections_data.all_dims_combs)}")
        logger.info(f"  Number of OHE matrices: {len(self.all_ohe)}")

        total_intersections = 0
        for i, (dims, ohe) in enumerate(zip(self.intersections_data.all_dims_combs, self.all_ohe, strict=False)):
            if i < len(self.all_ohe):  # Only show those that passed filtering
                n_intersections = ohe.shape[1]
                total_intersections += n_intersections
                logger.info(f"  {dims}: {n_intersections} intersections")

        logger.info(f"TOTAL INTERSECTIONS: {total_intersections}")
        logger.info(f"BINARY VARIABLES: {len(self.pool_members_df)}")

create_all_one_hot_encodings()

For every intersection of features - one hot encode who is in which intersection. Rows are the different people. Columns are the different possible intersections. The values are 0/1 if the person is in that intersection or not. i.e. The number of columns is the number of combinations we have between the features: product of their sizes.

Source code in src/sortition_algorithms/committee_generation/diversimax.py
def create_all_one_hot_encodings(self) -> list[np.ndarray]:
    """
    For every intersection of features - one hot encode who is in which intersection.
    Rows are the different people. Columns are the different possible intersections.
    The values are 0/1 if the person is in that intersection or not.
    i.e. The number of columns is the number of combinations we have between the features: product of their sizes.
    """
    all_ohe: list[np.ndarray] = []
    for dims in self.intersections_data.all_dims_combs:
        intersection_data: IntersectionData = self.intersections_data.data[dims]
        possible_profiles = intersection_data.intersections_names
        # When there are too many intersections, each will have 0/1 people, and optimization is pointless + too slow
        if len(possible_profiles) > self.panel_size:
            continue
        if len(self.pool_members_df) / len(possible_profiles) < 2:
            continue
        if len(all_ohe) >= 50:  # limit to 50 different intersections to avoid too much computation
            break

        ohe = OneHotEncoder(categories=[possible_profiles], sparse_output=False)
        ohe_values = ohe.fit_transform(intersection_data.intersection_member_values.reshape(-1, 1))
        all_ohe.append(ohe_values)
    return all_ohe

optimize(max_seconds=30, accepted_gap=0.1)

Uses solver to optimize based on the categories constraints

For the optimization goal, for every dims intersection: Take the one hot encoded of who is in which intersection for these dims Take the binary vector of who is selected and multiply and sum to get the sizes of each intersection of categories Figure out the "best" value - if all intersections were of equal size Take the abs for each intersection from that value Minimize sum of abs

Returns:

Type Description
tuple[SolverStatus, frozenset[str], float]

tuple of (status, selected_ids, gap)

Source code in src/sortition_algorithms/committee_generation/diversimax.py
def optimize(self, max_seconds: int = 30, accepted_gap: float = 0.1) -> tuple[SolverStatus, frozenset[str], float]:
    """
    Uses solver to optimize based on the categories constraints

    For the optimization goal, for every dims intersection:
    Take the one hot encoded of who is in which intersection for these dims
    Take the binary vector of who is selected and multiply and sum to get the sizes of each intersection of categories
    Figure out the "best" value - if all intersections were of equal size
    Take the abs for each intersection from that value
    Minimize sum of abs

    Returns:
        tuple of (status, selected_ids, gap)
    """
    df = self.pool_members_df
    solver = create_solver(backend=self.solver_backend, time_limit=max_seconds, mip_gap=accepted_gap)

    # binary variable for each person - if they are selected or not
    # We store them by name so we can look them up later
    model_variables = {}
    for person_id in df.index:
        model_variables[str(person_id)] = solver.add_binary_var(name=str(person_id))
    model_variables_list = list(model_variables.values())

    # Household constraints: at most 1 person per household
    if self.check_same_address_columns:
        for housemates in self.people.households(self.check_same_address_columns).values():
            if len(housemates) > 1:
                solver.add_constr(solver_sum(model_variables[str(member_id)] for member_id in housemates) <= 1)

    # the sum of all people in each category must be between the min and max specified
    for feature_name, value_name, fv_minmax in iterate_feature_collection(self.features):
        relevant_member_vars = [
            model_variables[str(person_id)]
            for person_id in df.index
            if df.loc[person_id, feature_name.lower()] == value_name.lower()
        ]
        rel_sum = solver_sum(relevant_member_vars)
        solver.add_constr(rel_sum >= fv_minmax.min)
        solver.add_constr(rel_sum <= fv_minmax.max)
    solver.add_constr(solver_sum(model_variables_list) == self.panel_size)  # cannot exceed panel size

    # define the optimization goal
    all_objectives = []
    for ohe in self.all_ohe:  # for every set of intersections (one hot encoded) of features
        # how many selected to each intersection
        # We need to build expressions for each intersection column
        for col_idx in range(ohe.shape[1]):
            # For this intersection column, sum up the binary vars weighted by ohe membership
            intersection_size = solver_sum(
                model_variables_list[row_idx] * ohe[row_idx, col_idx]
                for row_idx in range(len(model_variables_list))
            )
            # the best value is if all intersections were equal size
            best_val = self.panel_size / ohe.shape[1]

            # set support variable that is the abs diff from intersection size to best_val
            abs_diff = solver.add_continuous_var(lb=0.0, ub=float("inf"))
            # constrain this support variable to be the abs diff from the intersection size
            solver.add_constr(abs_diff >= (intersection_size - best_val))
            solver.add_constr(abs_diff >= (best_val - intersection_size))

            all_objectives.append(abs_diff)

    obj = solver_sum(all_objectives)
    solver.set_objective(obj, SolverSense.MINIMIZE)

    logger.info(f"Diversimax: {len(model_variables)} binary vars, {len(all_objectives)} abs-diff vars")

    status = solver.optimize()
    gap = solver.get_gap()

    if status in [SolverStatus.OPTIMAL, SolverStatus.FEASIBLE]:
        selected_ids = []
        for person_id in df.index:
            var = solver.get_var_by_name(str(person_id))
            if var is not None and solver.get_value(var) >= 0.99:  # ==1, for numerical stability
                selected_ids.append(person_id)
        selected_ids_set = frozenset(selected_ids)
        return status, selected_ids_set, gap
    return status, frozenset(), gap

prepare_all_data()

For each combination of features, prepares the intersection data. The data is a dict where key is the combination of category names (e.g. (age, income, education level)) and value is the IntersectionData for that combination. all_dims_combs is a list of all the intersections of all categories of all features. (e.g. [(age,), (income,), (education level,), (income, education level), (age, income, education level)])

Source code in src/sortition_algorithms/committee_generation/diversimax.py
def prepare_all_data(self) -> AllIntersectionsData:
    """
    For each combination of features, prepares the intersection data.
    The data is a dict where key is the combination of category names (e.g. (age, income, education level))
    and value is the IntersectionData for that combination.
    all_dims_combs is a list of all the intersections of all categories of all features.
    (e.g. [(age,), (income,), (education level,), (income, education level), (age, income, education level)])
    """
    all_dims_combs_iterators = [
        itertools.combinations(self.pool_members_df.columns, r=i)
        for i in range(1, self.pool_members_df.shape[1] + 1)
    ]
    all_dims_combs: list[InteractionNamesTuple] = [x for y in all_dims_combs_iterators for x in y]
    # randomize order within each size, so if we cutoff intersections at len(all_ohe) >= 50, we get a random sample
    rng = np.random.default_rng(random_provider().randint())
    rng.shuffle(all_dims_combs)
    all_dims_combs = sorted(all_dims_combs, key=len)
    data: dict[InteractionNamesTuple, IntersectionData] = {}
    for features_intersections in all_dims_combs:
        try:
            data[features_intersections] = self._prepare_features_data(features_intersections)
        except Exception as e:
            raise Exception(f"Error with {features_intersections}") from e
    return AllIntersectionsData(data=data, all_dims_combs=all_dims_combs)

find_distribution_diversimax(features, people, number_people_wanted, check_same_address_columns, max_seconds=30, solver_backend=DEFAULT_BACKEND)

Find a committee using the Diversimax algorithm.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns to check for same address, or empty list if not checking addresses.

required
max_seconds int

maximum seconds to spend searching

30
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
tuple[frozenset[str], RunReport]

tuple of (selected_ids, report)

Source code in src/sortition_algorithms/committee_generation/diversimax.py
def find_distribution_diversimax(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    max_seconds: int = 30,
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[frozenset[str], RunReport]:
    """
    Find a committee using the Diversimax algorithm.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns to check for same address, or empty list if
                                    not checking addresses.
        max_seconds: maximum seconds to spend searching
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (selected_ids, report)
    """
    if not DIVERSIMAX_AVAILABLE:
        msg = "Diversimax algorithm requires the optional 'diversimax' dependencies, which are not available"
        raise RuntimeError(msg, "diversimax_not_available", {})

    report = RunReport()
    report.add_line_and_log("Using Diversimax algorithm.", log_level=logging.INFO)
    optimizer = DiversityOptimizer(people, features, number_people_wanted, check_same_address_columns, solver_backend)
    optimizer.log_problem_stats()

    status, selected_ids, gap = optimizer.optimize(max_seconds=max_seconds)

    # ===== HANDLE BOTH OPTIMAL AND FEASIBLE AS SUCCESS =====
    if status in [SolverStatus.OPTIMAL, SolverStatus.FEASIBLE]:
        if status == SolverStatus.OPTIMAL:
            report.add_line_and_log(
                f"Diversimax optimization successful (optimal). Selected {len(selected_ids)} participants.",
                log_level=logging.INFO,
            )
        else:  # FEASIBLE
            report.add_line_and_log(
                f"Diversimax optimization successful (feasible, gap {gap:.1%}). Selected {len(selected_ids)} participants.",
                log_level=logging.INFO,
            )
        return selected_ids, report

    elif status == SolverStatus.INFEASIBLE:
        relaxed_features, output_lines = _relax_infeasible_quotas(
            features, people, number_people_wanted, check_same_address_columns, solver_backend=solver_backend
        )
        raise errors.InfeasibleQuotasError(relaxed_features, output_lines)

    else:
        msg = f"No feasible committees found, solver returns status {status}."
        raise errors.SelectionError(msg)

Selection algorithms for stratified sampling.

MaxRatioResult

Result from finding the category with maximum selection ratio.

Source code in src/sortition_algorithms/committee_generation/legacy.py
@define(kw_only=True, slots=True)
class MaxRatioResult:
    """Result from finding the category with maximum selection ratio."""

    feature_name: str
    feature_value: str
    random_person_index: int

PeopleFeatures

This class manipulates people and features together, making a deepcopy on init.

It is only used by the legacy algorithm.

Source code in src/sortition_algorithms/committee_generation/legacy.py
class PeopleFeatures:
    """
    This class manipulates people and features together, making a deepcopy on init.

    It is only used by the legacy algorithm.
    """

    # TODO: consider naming: maybe SelectionState

    def __init__(
        self,
        people: People,
        features: FeatureCollection,
        check_same_address_columns: list[str] | None = None,
    ) -> None:
        self.people = deepcopy(people)
        self.features = features
        self.select_collection = select_from_feature_collection(self.features)
        self.check_same_address_columns = check_same_address_columns or []

    def update_features_remaining(self, person_key: str) -> None:
        # this will blow up if the person does not exist
        person = self.people.get_person_dict(person_key)
        for feature_name in self.features:
            feature_value = person[feature_name]
            self.select_collection[feature_name][feature_value].add_remaining()

    def update_all_features_remaining(self) -> None:
        for person_key in self.people:
            self.update_features_remaining(person_key)

    def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
        """
        When a feature/value is "full" we delete everyone else in it.
        "Full" means that the number selected equals the "max" amount - that
        is detected elsewhere and then this method is called.
        Returns count of those deleted, and count of those left
        """
        # when a category is full we want to delete everyone in it
        people_to_delete: list[str] = []
        for pkey, person in self.people.items():
            if person[feature_name].lower() == feature_value.lower():
                people_to_delete.append(pkey)
                for feature in self.features:
                    current_feature_value = person[feature]
                    try:
                        self.select_collection[feature][current_feature_value].remove_remaining()
                    except errors.SelectionError as e:
                        msg = (
                            f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                            f"as after previous deletion no one/not enough left in {feature} "
                            f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                        )
                        raise errors.RetryableSelectionError(msg) from e

        self.people.remove_many(people_to_delete)
        # return the number of people deleted and the number of people left
        return len(people_to_delete), self.people.count

    def prune_for_feature_max_0(self) -> list[str]:
        """
        Check if any feature_value.max is set to zero. if so delete everyone with that feature value
        NOT DONE: could then check if anyone is left.
        """
        msg: list[str] = []
        msg.append(f"Number of people: {self.people.count}.")
        total_num_deleted = 0
        for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(self.features):
            if fv_minmax.max == 0:  # we don't want any of these people
                # pass the message in as deleting them might throw an exception
                msg.append(f"Feature/value {feature_name}/{fvalue_name} full - deleting people...")
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
                # if no exception was thrown above add this bit to the end of the previous message
                msg[-1] += f" Deleted {num_deleted}, {num_left} left."
                total_num_deleted += num_deleted
        # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
        # the 'remaining' file will be useless - remind the user of this!
        if total_num_deleted >= self.people.count / 2:
            msg.append(
                ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
                "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
            )
        return msg

    def select_person(self, person_key: str) -> list[str]:
        """
        Selecting a person means:
        - remove the person from our copy of People
        - update the `selected` and `remaining` counts of the FeatureCollection
        - if check_same_address_columns has columns, also remove household members (without adding to selected)

        Returns:
            List of additional people removed due to same address (empty if check_same_address_columns is empty)
        """
        # First, find household members if address checking is enabled (before removing the person)
        household_members_removed = []
        if self.check_same_address_columns:
            household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

        # Handle the main person selection
        person = self.people.get_person_dict(person_key)
        for feature_name in self.features:
            feature_value = person[feature_name]
            self.select_collection[feature_name][feature_value].remove_remaining()
            self.select_collection[feature_name][feature_value].add_selected()
        self.people.remove(person_key)

        # Then remove household members if any were found
        for household_member_key in household_members_removed:
            household_member = self.people.get_person_dict(household_member_key)
            for feature_name in self.features:
                feature_value = household_member[feature_name]
                self.select_collection[feature_name][feature_value].remove_remaining()
                # Note: we don't call add_selected() for household members
            self.people.remove(household_member_key)

        return household_members_removed

    def find_max_ratio_category(self) -> MaxRatioResult:
        """
        Find the feature/value combination with the highest selection ratio.

        The ratio is calculated as: (min - selected) / remaining
        This represents how urgently we need people from this category.
        Higher ratio = more urgent need (fewer people available relative to what we still need).

        Returns:
            MaxRatioResult containing the feature name, value, and a random person index

        Raises:
            SelectionError: If insufficient people remain to meet minimum requirements
        """
        max_ratio = -100.0
        result_feature_name = ""
        result_feature_value = ""
        random_person_index = -1

        for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
            # Check if we have insufficient people to meet minimum requirements
            if not select_counts.sufficient_people():
                msg = (
                    f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{fvalue_name}. "
                    f"Need {select_counts.people_still_needed} more, but only {select_counts.remaining} remaining."
                )
                raise errors.SelectionError(msg)

            # Skip categories with no remaining people or max = 0
            if select_counts.remaining == 0 or select_counts.min_max.max == 0:
                continue

            # Calculate the priority ratio
            ratio = select_counts.people_still_needed / float(select_counts.remaining)

            # Track the highest ratio category
            if ratio > max_ratio:
                max_ratio = ratio
                result_feature_name = feature_name
                result_feature_value = fvalue_name
                # from 1 to remaining
                random_person_index = random_provider().randbelow(select_counts.remaining) + 1

        # If no valid category found, all categories must be at their max or have max=0
        if not result_feature_name:
            msg = "No valid categories found - all may be at maximum or have max=0"
            raise errors.SelectionError(msg)

        return MaxRatioResult(
            feature_name=result_feature_name,
            feature_value=result_feature_value,
            random_person_index=random_person_index,
        )

    def handle_category_full_deletions(self, selected_person_data: MutableMapping[str, str]) -> RunReport:
        """
        Check if any categories are now full after a selection and delete remaining people.

        When a person is selected, some categories may reach their maximum quota.
        This method identifies such categories and removes all remaining people from them.

        Args:
            selected_person_data: Dictionary of the selected person's feature values

        Returns:
            RunReport containing messages about categories that became full and people deleted

        Raises:
            SelectionError: If deletions would violate minimum constraints
        """
        report = RunReport()

        for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
            if (
                fvalue_name.lower() == selected_person_data[feature_name].lower()
                and select_counts.selected == select_counts.min_max.max
            ):
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
                if num_deleted > 0:
                    report.add_line(
                        f"Category {feature_name}/{fvalue_name} full - deleted {num_deleted} people, {num_left} left."
                    )

        return report

delete_all_with_feature_value(feature_name, feature_value)

When a feature/value is "full" we delete everyone else in it. "Full" means that the number selected equals the "max" amount - that is detected elsewhere and then this method is called. Returns count of those deleted, and count of those left

Source code in src/sortition_algorithms/committee_generation/legacy.py
def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
    """
    When a feature/value is "full" we delete everyone else in it.
    "Full" means that the number selected equals the "max" amount - that
    is detected elsewhere and then this method is called.
    Returns count of those deleted, and count of those left
    """
    # when a category is full we want to delete everyone in it
    people_to_delete: list[str] = []
    for pkey, person in self.people.items():
        if person[feature_name].lower() == feature_value.lower():
            people_to_delete.append(pkey)
            for feature in self.features:
                current_feature_value = person[feature]
                try:
                    self.select_collection[feature][current_feature_value].remove_remaining()
                except errors.SelectionError as e:
                    msg = (
                        f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                        f"as after previous deletion no one/not enough left in {feature} "
                        f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                    )
                    raise errors.RetryableSelectionError(msg) from e

    self.people.remove_many(people_to_delete)
    # return the number of people deleted and the number of people left
    return len(people_to_delete), self.people.count

find_max_ratio_category()

Find the feature/value combination with the highest selection ratio.

The ratio is calculated as: (min - selected) / remaining This represents how urgently we need people from this category. Higher ratio = more urgent need (fewer people available relative to what we still need).

Returns:

Type Description
MaxRatioResult

MaxRatioResult containing the feature name, value, and a random person index

Raises:

Type Description
SelectionError

If insufficient people remain to meet minimum requirements

Source code in src/sortition_algorithms/committee_generation/legacy.py
def find_max_ratio_category(self) -> MaxRatioResult:
    """
    Find the feature/value combination with the highest selection ratio.

    The ratio is calculated as: (min - selected) / remaining
    This represents how urgently we need people from this category.
    Higher ratio = more urgent need (fewer people available relative to what we still need).

    Returns:
        MaxRatioResult containing the feature name, value, and a random person index

    Raises:
        SelectionError: If insufficient people remain to meet minimum requirements
    """
    max_ratio = -100.0
    result_feature_name = ""
    result_feature_value = ""
    random_person_index = -1

    for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
        # Check if we have insufficient people to meet minimum requirements
        if not select_counts.sufficient_people():
            msg = (
                f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{fvalue_name}. "
                f"Need {select_counts.people_still_needed} more, but only {select_counts.remaining} remaining."
            )
            raise errors.SelectionError(msg)

        # Skip categories with no remaining people or max = 0
        if select_counts.remaining == 0 or select_counts.min_max.max == 0:
            continue

        # Calculate the priority ratio
        ratio = select_counts.people_still_needed / float(select_counts.remaining)

        # Track the highest ratio category
        if ratio > max_ratio:
            max_ratio = ratio
            result_feature_name = feature_name
            result_feature_value = fvalue_name
            # from 1 to remaining
            random_person_index = random_provider().randbelow(select_counts.remaining) + 1

    # If no valid category found, all categories must be at their max or have max=0
    if not result_feature_name:
        msg = "No valid categories found - all may be at maximum or have max=0"
        raise errors.SelectionError(msg)

    return MaxRatioResult(
        feature_name=result_feature_name,
        feature_value=result_feature_value,
        random_person_index=random_person_index,
    )

handle_category_full_deletions(selected_person_data)

Check if any categories are now full after a selection and delete remaining people.

When a person is selected, some categories may reach their maximum quota. This method identifies such categories and removes all remaining people from them.

Parameters:

Name Type Description Default
selected_person_data MutableMapping[str, str]

Dictionary of the selected person's feature values

required

Returns:

Type Description
RunReport

RunReport containing messages about categories that became full and people deleted

Raises:

Type Description
SelectionError

If deletions would violate minimum constraints

Source code in src/sortition_algorithms/committee_generation/legacy.py
def handle_category_full_deletions(self, selected_person_data: MutableMapping[str, str]) -> RunReport:
    """
    Check if any categories are now full after a selection and delete remaining people.

    When a person is selected, some categories may reach their maximum quota.
    This method identifies such categories and removes all remaining people from them.

    Args:
        selected_person_data: Dictionary of the selected person's feature values

    Returns:
        RunReport containing messages about categories that became full and people deleted

    Raises:
        SelectionError: If deletions would violate minimum constraints
    """
    report = RunReport()

    for feature_name, fvalue_name, select_counts in iterate_select_collection(self.select_collection):
        if (
            fvalue_name.lower() == selected_person_data[feature_name].lower()
            and select_counts.selected == select_counts.min_max.max
        ):
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
            if num_deleted > 0:
                report.add_line(
                    f"Category {feature_name}/{fvalue_name} full - deleted {num_deleted} people, {num_left} left."
                )

    return report

prune_for_feature_max_0()

Check if any feature_value.max is set to zero. if so delete everyone with that feature value NOT DONE: could then check if anyone is left.

Source code in src/sortition_algorithms/committee_generation/legacy.py
def prune_for_feature_max_0(self) -> list[str]:
    """
    Check if any feature_value.max is set to zero. if so delete everyone with that feature value
    NOT DONE: could then check if anyone is left.
    """
    msg: list[str] = []
    msg.append(f"Number of people: {self.people.count}.")
    total_num_deleted = 0
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(self.features):
        if fv_minmax.max == 0:  # we don't want any of these people
            # pass the message in as deleting them might throw an exception
            msg.append(f"Feature/value {feature_name}/{fvalue_name} full - deleting people...")
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, fvalue_name)
            # if no exception was thrown above add this bit to the end of the previous message
            msg[-1] += f" Deleted {num_deleted}, {num_left} left."
            total_num_deleted += num_deleted
    # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
    # the 'remaining' file will be useless - remind the user of this!
    if total_num_deleted >= self.people.count / 2:
        msg.append(
            ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
            "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
        )
    return msg

select_person(person_key)

Selecting a person means: - remove the person from our copy of People - update the selected and remaining counts of the FeatureCollection - if check_same_address_columns has columns, also remove household members (without adding to selected)

Returns:

Type Description
list[str]

List of additional people removed due to same address (empty if check_same_address_columns is empty)

Source code in src/sortition_algorithms/committee_generation/legacy.py
def select_person(self, person_key: str) -> list[str]:
    """
    Selecting a person means:
    - remove the person from our copy of People
    - update the `selected` and `remaining` counts of the FeatureCollection
    - if check_same_address_columns has columns, also remove household members (without adding to selected)

    Returns:
        List of additional people removed due to same address (empty if check_same_address_columns is empty)
    """
    # First, find household members if address checking is enabled (before removing the person)
    household_members_removed = []
    if self.check_same_address_columns:
        household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

    # Handle the main person selection
    person = self.people.get_person_dict(person_key)
    for feature_name in self.features:
        feature_value = person[feature_name]
        self.select_collection[feature_name][feature_value].remove_remaining()
        self.select_collection[feature_name][feature_value].add_selected()
    self.people.remove(person_key)

    # Then remove household members if any were found
    for household_member_key in household_members_removed:
        household_member = self.people.get_person_dict(household_member_key)
        for feature_name in self.features:
            feature_value = household_member[feature_name]
            self.select_collection[feature_name][feature_value].remove_remaining()
            # Note: we don't call add_selected() for household members
        self.people.remove(household_member_key)

    return household_members_removed

find_random_sample_legacy(people, features, number_people_wanted, check_same_address_columns=None)

Legacy stratified random selection algorithm.

Implements the original algorithm that uses greedy selection based on priority ratios. Always selects from the most urgently needed category first (highest ratio of (min-selected)/remaining), then randomly picks within that category.

Parameters:

Name Type Description Default
people People

People collection

required
features FeatureCollection

Feature definitions with min/max targets

required
number_people_wanted int

Number of people to select

required
check_same_address_columns list[str] | None

Address columns for household identification, or empty if no address checking to be done.

None

Returns:

Type Description
list[frozenset[str]]

Tuple of (selected_committees, output_messages) where:

RunReport
  • selected_committees: List containing one frozenset of selected person IDs
tuple[list[frozenset[str]], RunReport]
  • report: report containing log messages about the selection process

Raises:

Type Description
SelectionError

If selection becomes impossible (not enough people, etc.)

Source code in src/sortition_algorithms/committee_generation/legacy.py
def find_random_sample_legacy(
    people: People,
    features: FeatureCollection,
    number_people_wanted: int,
    check_same_address_columns: list[str] | None = None,
) -> tuple[list[frozenset[str]], RunReport]:
    """
    Legacy stratified random selection algorithm.

    Implements the original algorithm that uses greedy selection based on priority ratios.
    Always selects from the most urgently needed category first (highest ratio of
    (min-selected)/remaining), then randomly picks within that category.

    Args:
        people: People collection
        features: Feature definitions with min/max targets
        number_people_wanted: Number of people to select
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.

    Returns:
        Tuple of (selected_committees, output_messages) where:
        - selected_committees: List containing one frozenset of selected person IDs
        - report: report containing log messages about the selection process

    Raises:
        SelectionError: If selection becomes impossible (not enough people, etc.)
    """
    report = RunReport()
    report.add_message("using_legacy_algorithm")
    people_selected: set[str] = set()

    # Create PeopleFeatures and initialize
    people_features = PeopleFeatures(people, features, check_same_address_columns or [])
    people_features.update_all_features_remaining()
    people_features.prune_for_feature_max_0()

    # Main selection loop
    for count in range(number_people_wanted):
        # Find the category with highest priority ratio
        try:
            ratio_result = people_features.find_max_ratio_category()
        except errors.SelectionError as e:
            msg = f"Selection failed on iteration {count + 1}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Find the randomly selected person within that category
        target_feature = ratio_result.feature_name
        target_value = ratio_result.feature_value
        random_position = ratio_result.random_person_index

        selected_person_key = people_features.people.find_person_by_position_in_category(
            target_feature, target_value, random_position
        )

        # Should never select the same person twice
        assert selected_person_key not in people_selected, f"Person {selected_person_key} was already selected"

        # Select the person (this also removes household members if configured)
        people_selected.add(selected_person_key)
        selected_person_data = people_features.people.get_person_dict(selected_person_key)
        household_members_removed = people_features.select_person(selected_person_key)

        # Add output messages about household member removal
        if household_members_removed:
            report.add_line(
                f"Selected {selected_person_key}, also removed household members: "
                f"{', '.join(household_members_removed)}"
            )

        # Handle any categories that are now full after this selection
        try:
            category_report = people_features.handle_category_full_deletions(selected_person_data)
            report.add_report(category_report)
        except errors.SelectionError as e:
            msg = f"Selection failed after selecting {selected_person_key}: {e}"
            raise errors.RetryableSelectionError(msg) from e

        # Check if we're about to run out of people (but not on the last iteration)
        if count < (number_people_wanted - 1) and people_features.people.count == 0:
            msg = "Selection failed: Ran out of people before completing selection"
            raise errors.RetryableSelectionError(msg)

    # Return in legacy format: list containing single frozenset
    return [frozenset(people_selected)], report

find_distribution_leximin(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the third-lowest probability and so forth.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list Note: The dual LP still uses Gurobi.

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

Raises:

Type Description
RuntimeError

If Gurobi is not available

Source code in src/sortition_algorithms/committee_generation/leximin.py
def find_distribution_leximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected
    (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the
    third-lowest probability and so forth.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list
                        Note: The dual LP still uses Gurobi.

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    Raises:
        RuntimeError: If Gurobi is not available
    """
    if not GUROBI_AVAILABLE:
        msg = "Leximin algorithm requires Gurobi solver which is not available"
        raise RuntimeError(msg, "gurobi_not_available", {})

    report = RunReport()
    report.add_message_and_log("using_leximin_algorithm", logging.INFO)
    grb.setParam("OutputFlag", 0)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_report = generate_initial_committees(
        new_committee_solver, agent_vars, 3 * people.count
    )
    report.add_report(initial_report)

    # Run the main leximin optimization loop to fix agent probabilities
    fixed_probabilities = _run_leximin_main_loop(new_committee_solver, agent_vars, committees, people, report)

    # Convert fixed agent probabilities to committee probabilities
    probabilities_normalised = _solve_leximin_primal_for_final_probabilities(committees, fixed_probabilities)

    return list(committees), probabilities_normalised, report

find_distribution_maximin(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings
Source code in src/sortition_algorithms/committee_generation/maximin.py
def find_distribution_maximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings
    """
    report = RunReport()
    report.add_message_and_log("using_maximin_algorithm", logging.INFO)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that cover every possible agent
    committees, covered_agents, init_report = generate_initial_committees(
        new_committee_solver, agent_vars, people.count
    )
    report.add_report(init_report)

    # Set up the incremental LP model for column generation
    incremental_solver, incr_agent_vars, upper_bound_var = _setup_maximin_incremental_model(
        committees, covered_agents, solver_backend
    )

    # Run the main optimization loop
    return _run_maximin_optimization_loop(
        new_committee_solver,
        agent_vars,
        incremental_solver,
        incr_agent_vars,
        upper_bound_var,
        committees,
        covered_agents,
        report,
        solver_backend,
    )

find_distribution_nash(features, people, number_people_wanted, check_same_address_columns, solver_backend=DEFAULT_BACKEND)

Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of selection probabilities over all persons.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

Address columns for household identification, or empty if no address checking to be done.

required
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
RunReport
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], RunReport]
  • output_lines: list of debug strings

The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.

Source code in src/sortition_algorithms/committee_generation/nash.py
def find_distribution_nash(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    solver_backend: str = DEFAULT_BACKEND,
) -> tuple[list[frozenset[str]], list[float], RunReport]:
    """Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of
    selection probabilities over all persons.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: Address columns for household identification, or empty
                                    if no address checking to be done.
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing
    log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and
    this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.
    """
    report = RunReport()
    report.add_message_and_log("using_nash_algorithm", logging.INFO)

    # Set up an ILP used for discovering new feasible committees
    solver, agent_vars = setup_committee_generation(
        features, people, number_people_wanted, check_same_address_columns, solver_backend
    )

    # Find initial committees that include every possible agent
    committee_set, covered_agents, initial_report = generate_initial_committees(solver, agent_vars, 2 * people.count)
    committees = list(committee_set)
    report.add_report(initial_report)

    # Map the covered agents to indices in a list for easier matrix representation
    entitlements, contributes_to_entitlement = _define_entitlements(covered_agents)

    # Run the main Nash welfare optimization loop
    return _run_nash_optimization_loop(
        solver,
        agent_vars,
        committees,
        entitlements,
        contributes_to_entitlement,
        covered_agents,
        number_people_wanted,
        report,
    )

HighsSolver

Bases: Solver

HiGHS solver implementation using highspy.

Source code in src/sortition_algorithms/committee_generation/solver.py
class HighsSolver(Solver):
    """HiGHS solver implementation using highspy."""

    def __init__(
        self,
        verbose: bool = False,
        seed: int | None = None,
        time_limit: float | None = None,
        mip_gap: float | None = None,
    ):
        """Create a new HiGHS solver instance.

        Args:
            verbose: If True, enable solver output
            seed: Random seed for reproducibility
            time_limit: Maximum solve time in seconds
            mip_gap: Acceptable MIP gap (e.g., 0.1 for 10%)
        """
        import highspy

        self._highspy = highspy
        self._h = highspy.Highs()
        self._h.setOptionValue("output_flag", verbose)

        if seed is not None:
            self._h.setOptionValue("random_seed", seed)
        else:
            self._h.setOptionValue("random_seed", random_provider().randint())

        if time_limit is not None:
            self._h.setOptionValue("time_limit", time_limit)

        if mip_gap is not None:
            self._h.setOptionValue("mip_rel_gap", mip_gap)

        self._var_names: dict[str, Any] = {}
        self._objective_sense: SolverSense | None = None

    def add_binary_var(self, name: str = "") -> Any:
        var = self._h.addVariable(lb=0.0, ub=1.0, type=self._highspy.HighsVarType.kInteger)
        if name:
            self._var_names[name] = var
        return var

    def add_integer_var(self, lb: float = 0.0, ub: float | None = None, name: str = "") -> Any:
        if ub is None:
            ub = self._highspy.kHighsInf
        var = self._h.addVariable(lb=lb, ub=ub, type=self._highspy.HighsVarType.kInteger)
        if name:
            self._var_names[name] = var
        return var

    def add_continuous_var(self, lb: float = 0.0, ub: float = 1.0, name: str = "") -> Any:
        var = self._h.addVariable(lb=lb, ub=ub)
        if name:
            self._var_names[name] = var
        return var

    def add_constr(self, constraint: Any) -> None:
        # When a constraint evaluates to a plain boolean (e.g., 0 >= 0 -> True),
        # we need to handle it specially since highspy expects a constraint object.
        if constraint is True:
            # Trivially satisfied constraint, skip it
            return
        if constraint is False:
            # Trivially infeasible constraint - add a constraint that forces infeasibility
            # Create 0 <= -1 which is always false
            dummy = self._h.addVariable(lb=0.0, ub=0.0)
            self._h.addConstr(dummy >= 1)
            return
        self._h.addConstr(constraint)

    def set_objective(self, expr: Any, sense: SolverSense) -> None:
        self._objective_sense = sense
        if sense == SolverSense.MAXIMIZE:
            self._h.maximize(expr)
        else:
            self._h.minimize(expr)

    def optimize(self) -> SolverStatus:
        self._h.run()
        status = self._h.getModelStatus()

        if status == self._highspy.HighsModelStatus.kOptimal:
            return SolverStatus.OPTIMAL
        elif status == self._highspy.HighsModelStatus.kInfeasible:
            return SolverStatus.INFEASIBLE
        elif status == self._highspy.HighsModelStatus.kUnbounded:
            return SolverStatus.UNBOUNDED
        elif status in (
            self._highspy.HighsModelStatus.kObjectiveBound,
            self._highspy.HighsModelStatus.kObjectiveTarget,
            self._highspy.HighsModelStatus.kTimeLimit,
            self._highspy.HighsModelStatus.kIterationLimit,
            self._highspy.HighsModelStatus.kSolutionLimit,
        ):
            # These statuses indicate a feasible (but possibly not optimal) solution was found
            info = self._h.getInfo()
            if info.primal_solution_status == self._highspy.HighsSolutionStatus.kSolutionStatusFeasible:
                return SolverStatus.FEASIBLE
            return SolverStatus.ERROR
        else:
            return SolverStatus.ERROR

    def get_value(self, var: Any) -> float:
        return float(self._h.val(var))

    def get_objective_value(self) -> float:
        return float(self._h.getInfo().objective_function_value)

    def get_var_by_name(self, name: str) -> Any:
        return self._var_names.get(name)

    def get_gap(self) -> float:
        info = self._h.getInfo()
        return float(info.mip_gap) if hasattr(info, "mip_gap") else 0.0

__init__(verbose=False, seed=None, time_limit=None, mip_gap=None)

Create a new HiGHS solver instance.

Parameters:

Name Type Description Default
verbose bool

If True, enable solver output

False
seed int | None

Random seed for reproducibility

None
time_limit float | None

Maximum solve time in seconds

None
mip_gap float | None

Acceptable MIP gap (e.g., 0.1 for 10%)

None
Source code in src/sortition_algorithms/committee_generation/solver.py
def __init__(
    self,
    verbose: bool = False,
    seed: int | None = None,
    time_limit: float | None = None,
    mip_gap: float | None = None,
):
    """Create a new HiGHS solver instance.

    Args:
        verbose: If True, enable solver output
        seed: Random seed for reproducibility
        time_limit: Maximum solve time in seconds
        mip_gap: Acceptable MIP gap (e.g., 0.1 for 10%)
    """
    import highspy

    self._highspy = highspy
    self._h = highspy.Highs()
    self._h.setOptionValue("output_flag", verbose)

    if seed is not None:
        self._h.setOptionValue("random_seed", seed)
    else:
        self._h.setOptionValue("random_seed", random_provider().randint())

    if time_limit is not None:
        self._h.setOptionValue("time_limit", time_limit)

    if mip_gap is not None:
        self._h.setOptionValue("mip_rel_gap", mip_gap)

    self._var_names: dict[str, Any] = {}
    self._objective_sense: SolverSense | None = None

MipSolver

Bases: Solver

python-mip solver implementation.

Source code in src/sortition_algorithms/committee_generation/solver.py
class MipSolver(Solver):
    """python-mip solver implementation."""

    def __init__(
        self,
        verbose: bool = False,
        seed: int | None = None,
        time_limit: float | None = None,
        mip_gap: float | None = None,
        solver_name: str = "CBC",
    ):
        """Create a new python-mip solver instance.

        Args:
            verbose: If True, enable solver output
            seed: Random seed for reproducibility
            time_limit: Maximum solve time in seconds
            mip_gap: Acceptable MIP gap (e.g., 0.1 for 10%)
            solver_name: switch solver - options "CBC" or "HIGHS" or "GUROBI"

        Raises:
            RuntimeError: If python-mip is not installed
        """
        if not MIP_AVAILABLE:
            msg = "python-mip is not installed. Install it with: pip install mip"
            raise RuntimeError(msg)

        self._mip = _mip_module
        # Default to MAXIMIZE, will be changed when set_objective is called
        # As of mip 1.17, we can choose the HIGHS solver instead of the default CBC - solver name sets this
        self._model = self._mip.Model(solver_name=solver_name)

        # As of mip 1.17, we can choose the HIGHS solver instead of the default CBC
        # To do that, we would replace the above line with:
        # self._model = self._mip.Model(solver_name="HIGHS")

        self._model.verbose = 1 if verbose else 0

        if seed is not None:
            self._model.seed = seed
        else:
            self._model.seed = random_provider().randint()

        if time_limit is not None:
            self._model.max_seconds = time_limit

        if mip_gap is not None:
            self._model.max_mip_gap = mip_gap

        self._status: SolverStatus = SolverStatus.ERROR

    def add_binary_var(self, name: str = "") -> Any:
        return self._model.add_var(var_type=self._mip.BINARY, name=name if name else None)

    def add_integer_var(self, lb: float = 0.0, ub: float | None = None, name: str = "") -> Any:
        if ub is None:
            ub = self._mip.INF
        return self._model.add_var(var_type=self._mip.INTEGER, lb=lb, ub=ub, name=name if name else None)

    def add_continuous_var(self, lb: float = 0.0, ub: float = 1.0, name: str = "") -> Any:
        return self._model.add_var(var_type=self._mip.CONTINUOUS, lb=lb, ub=ub, name=name if name else None)

    def add_constr(self, constraint: Any) -> None:
        # When a constraint evaluates to a plain boolean (e.g., 0 >= 0 -> True),
        # we need to handle it specially.
        if constraint is True:
            # Trivially satisfied constraint, skip it
            return
        if constraint is False:
            # Trivially infeasible constraint - add a constraint that forces infeasibility
            dummy = self._model.add_var(var_type=self._mip.CONTINUOUS, lb=0.0, ub=0.0)
            self._model.add_constr(dummy >= 1)
            return
        self._model.add_constr(constraint)

    def set_objective(self, expr: Any, sense: SolverSense) -> None:
        if sense == SolverSense.MAXIMIZE:
            self._model.sense = self._mip.MAXIMIZE
        else:
            self._model.sense = self._mip.MINIMIZE
        self._model.objective = expr

    def optimize(self) -> SolverStatus:
        status = self._model.optimize()

        if status == self._mip.OptimizationStatus.OPTIMAL:
            self._status = SolverStatus.OPTIMAL
        elif status == self._mip.OptimizationStatus.INFEASIBLE:
            self._status = SolverStatus.INFEASIBLE
        elif status == self._mip.OptimizationStatus.UNBOUNDED:
            self._status = SolverStatus.UNBOUNDED
        elif status == self._mip.OptimizationStatus.FEASIBLE:
            self._status = SolverStatus.FEASIBLE
        else:
            self._status = SolverStatus.ERROR

        return self._status

    def get_value(self, var: Any) -> float:
        return float(var.x)

    def get_objective_value(self) -> float:
        return float(self._model.objective_value)

    def get_var_by_name(self, name: str) -> Any:
        return self._model.var_by_name(name)

    def get_gap(self) -> float:
        return float(self._model.gap)

__init__(verbose=False, seed=None, time_limit=None, mip_gap=None, solver_name='CBC')

Create a new python-mip solver instance.

Parameters:

Name Type Description Default
verbose bool

If True, enable solver output

False
seed int | None

Random seed for reproducibility

None
time_limit float | None

Maximum solve time in seconds

None
mip_gap float | None

Acceptable MIP gap (e.g., 0.1 for 10%)

None
solver_name str

switch solver - options "CBC" or "HIGHS" or "GUROBI"

'CBC'

Raises:

Type Description
RuntimeError

If python-mip is not installed

Source code in src/sortition_algorithms/committee_generation/solver.py
def __init__(
    self,
    verbose: bool = False,
    seed: int | None = None,
    time_limit: float | None = None,
    mip_gap: float | None = None,
    solver_name: str = "CBC",
):
    """Create a new python-mip solver instance.

    Args:
        verbose: If True, enable solver output
        seed: Random seed for reproducibility
        time_limit: Maximum solve time in seconds
        mip_gap: Acceptable MIP gap (e.g., 0.1 for 10%)
        solver_name: switch solver - options "CBC" or "HIGHS" or "GUROBI"

    Raises:
        RuntimeError: If python-mip is not installed
    """
    if not MIP_AVAILABLE:
        msg = "python-mip is not installed. Install it with: pip install mip"
        raise RuntimeError(msg)

    self._mip = _mip_module
    # Default to MAXIMIZE, will be changed when set_objective is called
    # As of mip 1.17, we can choose the HIGHS solver instead of the default CBC - solver name sets this
    self._model = self._mip.Model(solver_name=solver_name)

    # As of mip 1.17, we can choose the HIGHS solver instead of the default CBC
    # To do that, we would replace the above line with:
    # self._model = self._mip.Model(solver_name="HIGHS")

    self._model.verbose = 1 if verbose else 0

    if seed is not None:
        self._model.seed = seed
    else:
        self._model.seed = random_provider().randint()

    if time_limit is not None:
        self._model.max_seconds = time_limit

    if mip_gap is not None:
        self._model.max_mip_gap = mip_gap

    self._status: SolverStatus = SolverStatus.ERROR

Solver

Bases: ABC

Abstract base class for LP/MIP solvers.

Provides a unified interface for committee generation algorithms to use different solver backends (HiGHS, python-mip, Gurobi) interchangeably.

Source code in src/sortition_algorithms/committee_generation/solver.py
class Solver(ABC):
    """Abstract base class for LP/MIP solvers.

    Provides a unified interface for committee generation algorithms to use
    different solver backends (HiGHS, python-mip, Gurobi) interchangeably.
    """

    @abstractmethod
    def add_binary_var(self, name: str = "") -> Any:
        """Add a binary (0/1) variable to the model.

        Args:
            name: Optional name for the variable

        Returns:
            A variable object that can be used in constraints and objectives
        """
        pass

    @abstractmethod
    def add_integer_var(self, lb: float = 0.0, ub: float | None = None, name: str = "") -> Any:
        """Add an integer variable to the model.

        Args:
            lb: Lower bound (default 0.0)
            ub: Upper bound (default None means infinity)
            name: Optional name for the variable

        Returns:
            A variable object that can be used in constraints and objectives
        """
        pass

    @abstractmethod
    def add_continuous_var(self, lb: float = 0.0, ub: float = 1.0, name: str = "") -> Any:
        """Add a continuous variable to the model.

        Args:
            lb: Lower bound (default 0.0)
            ub: Upper bound (default 1.0)
            name: Optional name for the variable

        Returns:
            A variable object that can be used in constraints and objectives
        """
        pass

    @abstractmethod
    def add_constr(self, constraint: Any) -> None:
        """Add a constraint to the model.

        Args:
            constraint: A constraint expression (e.g., var1 + var2 <= 5)
        """
        pass

    @abstractmethod
    def set_objective(self, expr: Any, sense: SolverSense) -> None:
        """Set the objective function.

        Args:
            expr: The expression to optimize
            sense: MINIMIZE or MAXIMIZE
        """
        pass

    @abstractmethod
    def optimize(self) -> SolverStatus:
        """Solve the optimization problem.

        Returns:
            The optimization status
        """
        pass

    @abstractmethod
    def get_value(self, var: Any) -> float:
        """Get the value of a variable after optimization.

        Args:
            var: The variable to query

        Returns:
            The optimal value of the variable
        """
        pass

    @abstractmethod
    def get_objective_value(self) -> float:
        """Get the objective value after optimization.

        Returns:
            The optimal objective value
        """
        pass

    @abstractmethod
    def get_var_by_name(self, name: str) -> Any:
        """Get a variable by its name.

        Args:
            name: The name of the variable

        Returns:
            The variable object, or None if not found
        """
        pass

    @abstractmethod
    def get_gap(self) -> float:
        """Get the MIP gap after optimization.

        Returns:
            The optimality gap (0.0 for optimal, higher for feasible solutions)
        """
        pass

add_binary_var(name='') abstractmethod

Add a binary (0/1) variable to the model.

Parameters:

Name Type Description Default
name str

Optional name for the variable

''

Returns:

Type Description
Any

A variable object that can be used in constraints and objectives

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def add_binary_var(self, name: str = "") -> Any:
    """Add a binary (0/1) variable to the model.

    Args:
        name: Optional name for the variable

    Returns:
        A variable object that can be used in constraints and objectives
    """
    pass

add_constr(constraint) abstractmethod

Add a constraint to the model.

Parameters:

Name Type Description Default
constraint Any

A constraint expression (e.g., var1 + var2 <= 5)

required
Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def add_constr(self, constraint: Any) -> None:
    """Add a constraint to the model.

    Args:
        constraint: A constraint expression (e.g., var1 + var2 <= 5)
    """
    pass

add_continuous_var(lb=0.0, ub=1.0, name='') abstractmethod

Add a continuous variable to the model.

Parameters:

Name Type Description Default
lb float

Lower bound (default 0.0)

0.0
ub float

Upper bound (default 1.0)

1.0
name str

Optional name for the variable

''

Returns:

Type Description
Any

A variable object that can be used in constraints and objectives

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def add_continuous_var(self, lb: float = 0.0, ub: float = 1.0, name: str = "") -> Any:
    """Add a continuous variable to the model.

    Args:
        lb: Lower bound (default 0.0)
        ub: Upper bound (default 1.0)
        name: Optional name for the variable

    Returns:
        A variable object that can be used in constraints and objectives
    """
    pass

add_integer_var(lb=0.0, ub=None, name='') abstractmethod

Add an integer variable to the model.

Parameters:

Name Type Description Default
lb float

Lower bound (default 0.0)

0.0
ub float | None

Upper bound (default None means infinity)

None
name str

Optional name for the variable

''

Returns:

Type Description
Any

A variable object that can be used in constraints and objectives

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def add_integer_var(self, lb: float = 0.0, ub: float | None = None, name: str = "") -> Any:
    """Add an integer variable to the model.

    Args:
        lb: Lower bound (default 0.0)
        ub: Upper bound (default None means infinity)
        name: Optional name for the variable

    Returns:
        A variable object that can be used in constraints and objectives
    """
    pass

get_gap() abstractmethod

Get the MIP gap after optimization.

Returns:

Type Description
float

The optimality gap (0.0 for optimal, higher for feasible solutions)

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def get_gap(self) -> float:
    """Get the MIP gap after optimization.

    Returns:
        The optimality gap (0.0 for optimal, higher for feasible solutions)
    """
    pass

get_objective_value() abstractmethod

Get the objective value after optimization.

Returns:

Type Description
float

The optimal objective value

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def get_objective_value(self) -> float:
    """Get the objective value after optimization.

    Returns:
        The optimal objective value
    """
    pass

get_value(var) abstractmethod

Get the value of a variable after optimization.

Parameters:

Name Type Description Default
var Any

The variable to query

required

Returns:

Type Description
float

The optimal value of the variable

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def get_value(self, var: Any) -> float:
    """Get the value of a variable after optimization.

    Args:
        var: The variable to query

    Returns:
        The optimal value of the variable
    """
    pass

get_var_by_name(name) abstractmethod

Get a variable by its name.

Parameters:

Name Type Description Default
name str

The name of the variable

required

Returns:

Type Description
Any

The variable object, or None if not found

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def get_var_by_name(self, name: str) -> Any:
    """Get a variable by its name.

    Args:
        name: The name of the variable

    Returns:
        The variable object, or None if not found
    """
    pass

optimize() abstractmethod

Solve the optimization problem.

Returns:

Type Description
SolverStatus

The optimization status

Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def optimize(self) -> SolverStatus:
    """Solve the optimization problem.

    Returns:
        The optimization status
    """
    pass

set_objective(expr, sense) abstractmethod

Set the objective function.

Parameters:

Name Type Description Default
expr Any

The expression to optimize

required
sense SolverSense

MINIMIZE or MAXIMIZE

required
Source code in src/sortition_algorithms/committee_generation/solver.py
@abstractmethod
def set_objective(self, expr: Any, sense: SolverSense) -> None:
    """Set the objective function.

    Args:
        expr: The expression to optimize
        sense: MINIMIZE or MAXIMIZE
    """
    pass

SolverSense

Bases: Enum

Optimization direction.

Source code in src/sortition_algorithms/committee_generation/solver.py
class SolverSense(Enum):
    """Optimization direction."""

    MINIMIZE = auto()
    MAXIMIZE = auto()

SolverStatus

Bases: Enum

Status returned by solver after optimization.

Source code in src/sortition_algorithms/committee_generation/solver.py
class SolverStatus(Enum):
    """Status returned by solver after optimization."""

    OPTIMAL = auto()
    FEASIBLE = auto()
    INFEASIBLE = auto()
    UNBOUNDED = auto()
    ERROR = auto()

create_solver(backend=DEFAULT_BACKEND, verbose=False, seed=None, time_limit=None, mip_gap=None)

Create a solver instance with the specified backend.

Parameters:

Name Type Description Default
backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND
verbose bool

If True, enable solver output

False
seed int | None

Random seed for reproducibility

None
time_limit float | None

Maximum solve time in seconds

None
mip_gap float | None

Acceptable MIP gap (e.g., 0.1 for 10%)

None

Returns:

Type Description
Solver

A Solver instance

Raises:

Type Description
ValueError

If an unknown backend is specified

Source code in src/sortition_algorithms/committee_generation/solver.py
def create_solver(
    backend: str = DEFAULT_BACKEND,
    verbose: bool = False,
    seed: int | None = None,
    time_limit: float | None = None,
    mip_gap: float | None = None,
) -> Solver:
    """Create a solver instance with the specified backend.

    Args:
        backend: solver backend to use - see settings.SOLVER_BACKENDS for full list
        verbose: If True, enable solver output
        seed: Random seed for reproducibility
        time_limit: Maximum solve time in seconds
        mip_gap: Acceptable MIP gap (e.g., 0.1 for 10%)

    Returns:
        A Solver instance

    Raises:
        ValueError: If an unknown backend is specified
    """
    if backend == "highspy":
        return HighsSolver(verbose=verbose, seed=seed, time_limit=time_limit, mip_gap=mip_gap)
    elif backend in ("mip", "mip-cbc"):
        return MipSolver(verbose=verbose, seed=seed, time_limit=time_limit, mip_gap=mip_gap, solver_name="CBC")
    elif backend == "mip-highs":
        return MipSolver(verbose=verbose, seed=seed, time_limit=time_limit, mip_gap=mip_gap, solver_name="HIGHS")
    elif backend == "mip-gurobi":
        return MipSolver(verbose=verbose, seed=seed, time_limit=time_limit, mip_gap=mip_gap, solver_name="GUROBI")
    else:
        raise ConfigurationError(
            message=f"Unknown solver backend: {backend}",
            error_code="unknown_solver_backend",
            error_params={"backend": backend},
        )

solver_sum(terms)

Sum a collection of solver expressions.

This provides a backend-agnostic way to sum terms. Both highspy and mip support Python's built-in sum() for their expression objects.

Parameters:

Name Type Description Default
terms Any

An iterable of solver expressions/variables

required

Returns:

Type Description
Any

A sum expression

Source code in src/sortition_algorithms/committee_generation/solver.py
def solver_sum(terms: Any) -> Any:
    """Sum a collection of solver expressions.

    This provides a backend-agnostic way to sum terms.
    Both highspy and mip support Python's built-in sum() for their expression objects.

    Args:
        terms: An iterable of solver expressions/variables

    Returns:
        A sum expression
    """
    return sum(terms)

find_random_sample(features, people, number_people_wanted, check_same_address_columns, *, selection_algorithm='maximin', solver_backend=DEFAULT_BACKEND, test_selection=False, number_selections=1, max_seconds=30)

Main algorithm to find one or multiple random committees.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
check_same_address_columns list[str]

columns for the address to check, or empty list if no check required

required
selection_algorithm str

one of "legacy", "maximin", "leximin", or "nash"

'maximin'
solver_backend str

solver backend to use - see settings.SOLVER_BACKENDS for full list

DEFAULT_BACKEND
test_selection bool

if set, do not do a random selection, but just return some valid panel. Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!

False
number_selections int

how many panels to return. Most of the time, this should be set to 1, which means that a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n, containing multiple panels (some panels might be repeated in the list). In this case the eventual panel should be drawn uniformly at random from the returned list.

1
max_seconds int

the maximum number of seconds to spend searching, for those algorithms that support it. Currently only diversimax supports this.

30

Returns:

Type Description
list[frozenset[str]]

tuple of (committee_lottery, report)

RunReport
  • committee_lottery: list of committees, where each committee is a frozen set of pool member ids
tuple[list[frozenset[str]], RunReport]
  • report: report with debug strings

Raises:

Type Description
InfeasibleQuotasError

if the quotas cannot be satisfied, which includes a suggestion for how to modify them

SelectionError

in multiple other failure cases

ValueError

for invalid parameters

RuntimeError

if required solver is not available

Source code in src/sortition_algorithms/core.py
def find_random_sample(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    check_same_address_columns: list[str],
    *,
    selection_algorithm: str = "maximin",
    solver_backend: str = DEFAULT_BACKEND,
    test_selection: bool = False,
    number_selections: int = 1,
    max_seconds: int = 30,
) -> tuple[list[frozenset[str]], RunReport]:
    """Main algorithm to find one or multiple random committees.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        check_same_address_columns: columns for the address to check, or empty list if no check required
        selection_algorithm: one of "legacy", "maximin", "leximin", or "nash"
        solver_backend: solver backend to use - see settings.SOLVER_BACKENDS for full list
        test_selection: if set, do not do a random selection, but just return some valid panel.
            Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!
        number_selections: how many panels to return. Most of the time, this should be set to 1, which means that
            a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n,
            containing multiple panels (some panels might be repeated in the list). In this case the eventual panel
            should be drawn uniformly at random from the returned list.
        max_seconds: the maximum number of seconds to spend searching, for those algorithms that support it.
            Currently only diversimax supports this.

    Returns:
        tuple of (committee_lottery, report)
        - committee_lottery: list of committees, where each committee is a frozen set of pool member ids
        - report: report with debug strings

    Raises:
        InfeasibleQuotasError: if the quotas cannot be satisfied, which includes a suggestion for how to modify them
        SelectionError: in multiple other failure cases
        ValueError: for invalid parameters
        RuntimeError: if required solver is not available
    """
    # Input validation
    if test_selection and number_selections != 1:
        msg = (
            "Running the test selection does not support generating a transparent lottery, so, if "
            "`test_selection` is true, `number_selections` must be 1."
        )
        raise ConfigurationError(message=msg, error_code="test_selection_multiple_selections", error_params={})

    if selection_algorithm == "legacy" and number_selections != 1:
        msg = (
            "Currently, the legacy algorithm does not support generating a transparent lottery, "
            "so `number_selections` must be set to 1."
        )
        raise ConfigurationError(message=msg, error_code="legacy_multiple_selections", error_params={})
    if selection_algorithm == "diversimax" and number_selections != 1:
        msg = (
            "The diversimax algorithm does not support generating multiple committees, "
            "so `number_selections` must be set to 1."
        )
        raise ConfigurationError(message=msg, error_code="diversimax_multiple_selections", error_params={})

    # Quick test selection using find_any_committee
    if test_selection:
        logger.info("Running test selection.")
        return find_any_committee(features, people, number_people_wanted, check_same_address_columns, solver_backend)

    report = RunReport()

    # Check if Gurobi is available for leximin
    if selection_algorithm == "leximin" and not GUROBI_AVAILABLE:
        report.add_message("gurobi_unavailable_switching")
        selection_algorithm = "maximin"

    # Route to appropriate algorithm
    if selection_algorithm == "legacy":
        return find_random_sample_legacy(
            people,
            features,
            number_people_wanted,
            check_same_address_columns,
        )
    elif selection_algorithm == "leximin":
        committees, probabilities, new_report = find_distribution_leximin(
            features, people, number_people_wanted, check_same_address_columns, solver_backend
        )
    elif selection_algorithm == "maximin":
        committees, probabilities, new_report = find_distribution_maximin(
            features, people, number_people_wanted, check_same_address_columns, solver_backend
        )
    elif selection_algorithm == "nash":
        committees, probabilities, new_report = find_distribution_nash(
            features, people, number_people_wanted, check_same_address_columns, solver_backend
        )
    elif selection_algorithm == "diversimax":
        selected_ids, new_report = find_distribution_diversimax(
            features,
            people,
            number_people_wanted,
            check_same_address_columns,
            max_seconds=max_seconds,
            solver_backend=solver_backend,
        )
        report.add_report(new_report)
        return [selected_ids], report
    else:
        msg = (
            f"Unknown selection algorithm {selection_algorithm!r}, must be either 'legacy', 'leximin', "
            f"'maximin', 'diversimax', or 'nash'."
        )
        raise ConfigurationError(
            message=msg, error_code="unknown_selection_algorithm", error_params={"algorithm": selection_algorithm}
        )

    report.add_report(new_report)

    # Post-process the distribution
    committees, probabilities = standardize_distribution(committees, probabilities)
    if len(committees) > people.count:
        report.add_message_and_log(
            "basic_solution_warning",
            logging.WARNING,
            algorithm=selection_algorithm,
            num_panels=len(committees),
            num_agents=people.count,
            min_probs=min(probabilities),
        )

    assert len(set(committees)) == len(committees)

    stats_report = _distribution_stats(people, committees, probabilities)
    report.add_report(stats_report)

    # Convert to lottery
    committee_lottery = lottery_rounding(committees, probabilities, number_selections)

    return committee_lottery, report

lottery_rounding(committees, probabilities, number_selections)

Convert probability distribution over committees to a discrete lottery.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities (must sum to 1)

required
number_selections int

number of committees to return

required

Returns:

Type Description
list[frozenset[str]]

list of committees (may contain duplicates) of length number_selections

Source code in src/sortition_algorithms/core.py
def lottery_rounding(
    committees: list[frozenset[str]],
    probabilities: list[float],
    number_selections: int,
) -> list[frozenset[str]]:
    """Convert probability distribution over committees to a discrete lottery.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities (must sum to 1)
        number_selections: number of committees to return

    Returns:
        list of committees (may contain duplicates) of length number_selections
    """
    assert len(committees) == len(probabilities)
    assert number_selections >= 1

    num_copies: list[int] = []
    residuals: list[float] = []
    for _, prob in zip(committees, probabilities, strict=False):
        scaled_prob = prob * number_selections
        num_copies.append(int(scaled_prob))  # give lower quotas
        residuals.append(scaled_prob - int(scaled_prob))

    rounded_up_indices = pipage_rounding(list(enumerate(residuals)))
    for committee_index in rounded_up_indices:
        num_copies[committee_index] += 1

    committee_lottery: list[frozenset[str]] = []
    for committee, committee_copies in zip(committees, num_copies, strict=False):
        committee_lottery += [committee for _ in range(committee_copies)]

    return committee_lottery

pipage_rounding(marginals)

Pipage rounding algorithm for converting fractional solutions to integer solutions.

Takes a list of (object, probability) pairs and randomly rounds them to a set of objects such that the expected number of times each object appears equals its probability.

Parameters:

Name Type Description Default
marginals list[tuple[int, float]]

list of (object, probability) pairs where probabilities sum to an integer

required

Returns:

Type Description
list[int]

list of objects that were selected

Source code in src/sortition_algorithms/core.py
def pipage_rounding(marginals: list[tuple[int, float]]) -> list[int]:
    """Pipage rounding algorithm for converting fractional solutions to integer solutions.

    Takes a list of (object, probability) pairs and randomly rounds them to a set of objects
    such that the expected number of times each object appears equals its probability.

    Args:
        marginals: list of (object, probability) pairs where probabilities sum to an integer

    Returns:
        list of objects that were selected
    """
    assert all(0.0 <= p <= 1.0 for _, p in marginals)

    outcomes: list[int] = []
    while True:
        if len(marginals) == 0:
            return outcomes
        if len(marginals) == 1:
            obj, prob = marginals[0]
            if random_provider().uniform(0.0, 1.0) < prob:
                outcomes.append(obj)
            marginals = []
        else:
            obj0, prob0 = marginals[0]
            if prob0 > 1.0 - EPS2:
                outcomes.append(obj0)
                marginals = marginals[1:]
                continue
            if prob0 < EPS2:
                marginals = marginals[1:]
                continue

            obj1, prob1 = marginals[1]
            if prob1 > 1.0 - EPS2:
                outcomes.append(obj1)
                marginals = [marginals[0]] + marginals[2:]
                continue
            if prob1 < EPS2:
                marginals = [marginals[0]] + marginals[2:]
                continue

            inc0_dec1_amount = min(
                1.0 - prob0, prob1
            )  # maximal amount that prob0 can be increased and prob1 can be decreased
            dec0_inc1_amount = min(prob0, 1.0 - prob1)
            choice_probability = dec0_inc1_amount / (inc0_dec1_amount + dec0_inc1_amount)

            if random_provider().uniform(0.0, 1.0) < choice_probability:  # increase prob0 and decrease prob1
                prob0 += inc0_dec1_amount
                prob1 -= inc0_dec1_amount
            else:
                prob0 -= dec0_inc1_amount
                prob1 += dec0_inc1_amount
            marginals = [(obj0, prob0), (obj1, prob1)] + marginals[2:]

run_stratification(features, people, number_people_wanted, settings, *, test_selection=False, number_selections=1, already_selected=None, max_seconds=30)

Run stratified random selection with retry logic.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas for each feature value

required
people People

People object containing the pool of candidates

required
number_people_wanted int

Desired size of the panel

required
settings Settings

Settings object containing configuration

required
test_selection bool

If True, don't randomize (for testing only)

False
number_selections int

Number of panels to return (default: 1)

1
already_selected People | None

People who have already been selected (optional)

None
max_seconds int

Maximum seconds to try and find optimal answer (diversimax only)

30

Returns:

Type Description
bool

Tuple of (success, selected_committees, report)

list[frozenset[str]]
  • success: Whether selection succeeded within max attempts
RunReport
  • selected_committees: List of committees (frozensets of person IDs)
tuple[bool, list[frozenset[str]], RunReport]
  • report: contains debug and status messages

Raises:

Type Description
Exception

If number_people_wanted is outside valid range for any feature

ValueError

For invalid parameters

RuntimeError

If required solver is not available

InfeasibleQuotasError

If quotas cannot be satisfied

Source code in src/sortition_algorithms/core.py
def run_stratification(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
    *,
    test_selection: bool = False,
    number_selections: int = 1,
    already_selected: People | None = None,
    max_seconds: int = 30,
) -> tuple[bool, list[frozenset[str]], RunReport]:
    """Run stratified random selection with retry logic.

    Args:
        features: FeatureCollection with min/max quotas for each feature value
        people: People object containing the pool of candidates
        number_people_wanted: Desired size of the panel
        settings: Settings object containing configuration
        test_selection: If True, don't randomize (for testing only)
        number_selections: Number of panels to return (default: 1)
        already_selected: People who have already been selected (optional)
        max_seconds: Maximum seconds to try and find optimal answer (diversimax only)

    Returns:
        Tuple of (success, selected_committees, report)
        - success: Whether selection succeeded within max attempts
        - selected_committees: List of committees (frozensets of person IDs)
        - report: contains debug and status messages

    Raises:
        Exception: If number_people_wanted is outside valid range for any feature
        ValueError: For invalid parameters
        RuntimeError: If required solver is not available
        InfeasibleQuotasError: If quotas cannot be satisfied
    """
    success = False
    report = RunReport()
    people_selected: list[frozenset[str]] = []

    try:
        working_people = exclude_matching_selected_addresses(people, already_selected, settings)
        dropped_count = people.count - working_people.count
        if dropped_count:
            report.add_line_and_log(
                f"Dropped {dropped_count} people who have an address matching a selected person.", logging.INFO
            )
        # Check if desired number is within feature constraints
        check_desired(features, number_people_wanted)
        check_enough_people_for_every_feature_value(features, working_people)
    except errors.SelectionError as error:
        report.add_error(error)
        return False, people_selected, report

    # Set random seed if specified
    # If the seed is zero or None, we use the secrets module, as it is better
    # from a security point of view
    set_random_provider(settings.random_number_seed)

    if test_selection:
        report.add_message("test_selection_warning", ReportLevel.CRITICAL)

    report.add_message("initial_state", ReportLevel.IMPORTANT)
    report.add_report(_initial_category_info_table(features, working_people))

    tries = 0
    for tries in range(settings.max_attempts):
        people_selected = []

        report.add_message_and_log("trial_number", logging.WARNING, trial=tries + 1)

        try:
            people_selected, new_report = find_random_sample(
                features,
                working_people,
                number_people_wanted,
                settings.normalised_address_columns,
                selection_algorithm=settings.selection_algorithm,
                solver_backend=settings.solver_backend,
                test_selection=test_selection,
                number_selections=number_selections,
                max_seconds=max_seconds,
            )
            report.add_report(new_report)

            # Check if targets were met (only works for number_selections = 1)
            # This raises an error if we did not select properly
            _check_category_selected(features, working_people, people_selected, number_selections)

            report.add_message("selection_success", ReportLevel.IMPORTANT)
            report.add_report(_category_info_table(features, working_people, people_selected, number_people_wanted))
            success = True
            break

        except errors.SelectionError as serr:
            if serr.is_retryable:
                report.add_error(serr, is_fatal=False)
                report.add_message("retry_after_error", error=str(serr))
                # we do not break here, we try again.
            else:
                report.add_error(serr)
                break
        # these are all fatal errors
        except (ValueError, RuntimeError, errors.InfeasibleQuotasCantRelaxError) as err:
            report.add_error(err)
            break

    if not success:
        report.add_message("selection_failed", ReportLevel.IMPORTANT, attempts=tries + 1)

    return success, people_selected, report

selected_remaining_tables(full_people, people_selected, features, settings, already_selected=None, exclude_matching_addresses=True)

write some text

people_selected is a single frozenset[str] - it must be unwrapped before being passed to this function.

Source code in src/sortition_algorithms/core.py
def selected_remaining_tables(
    full_people: People,
    people_selected: frozenset[str],
    features: FeatureCollection,
    settings: Settings,
    already_selected: People | None = None,
    exclude_matching_addresses: bool = True,
) -> tuple[list[list[str]], list[list[str]], list[str]]:
    """
    write some text

    people_selected is a single frozenset[str] - it must be unwrapped before being passed
    to this function.
    """
    people_working = deepcopy(full_people)
    output_lines: list[str] = []

    # this function only reads from people, so pass in full_people
    people_selected_rows = person_list_to_table(people_selected, full_people, features, settings)

    # now delete the selected people
    for pkey in people_selected:
        people_working.remove(pkey)

    # now delete the people at the same address as a selected person
    # TODO: consider retiring this code once we use already_selected everywhere
    # and then also drop exclude_matching_addresses variable
    num_same_address_deleted = 0
    if (
        exclude_matching_addresses
        and settings.check_same_address
        and (already_selected is None or not already_selected.count)
    ):
        for pkey in people_selected:
            pkey_to_delete = list(full_people.matching_address(pkey, settings.check_same_address_columns))
            num_same_address_deleted += len(pkey_to_delete)
            # then delete this/these people at the same address from the reserve/remaining pool
            people_working.remove_many(pkey_to_delete)

    # add the columns to keep into remaining people
    # as above all these values are all in people_working but this is tidier...
    # this function only reads from people, so pass in full_people
    people_remaining_rows = person_list_to_table(people_working, full_people, features, settings)
    return people_selected_rows, people_remaining_rows, output_lines

N_(message)

No-op marker for translation extraction.

This function does nothing at runtime but marks strings for extraction by babel/gettext tools. It's a common pattern for libraries that want to be translation-ready without depending on translation frameworks.

Parameters:

Name Type Description Default
message str

The message string to mark for extraction

required

Returns:

Type Description
str

The same message string, unchanged

Source code in src/sortition_algorithms/error_messages.py
def N_(message: str) -> str:
    """
    No-op marker for translation extraction.

    This function does nothing at runtime but marks strings for extraction
    by babel/gettext tools. It's a common pattern for libraries that want
    to be translation-ready without depending on translation frameworks.

    Args:
        message: The message string to mark for extraction

    Returns:
        The same message string, unchanged
    """
    return message

get_message(code, **params)

Get a formatted error message by code.

This is a helper function for creating error messages that supports both English output and provides structured data for translation.

Parameters:

Name Type Description Default
code str

The error code to look up in ERROR_MESSAGES

required
**params Any

Parameters to substitute into the message template

{}

Returns:

Type Description
str

The formatted message string

Raises:

Type Description
KeyError

If the error code is not found in ERROR_MESSAGES

Example

get_message('missing_column', column='id', error_label='for people', data_container='CSV file') "No 'id' column for people found in CSV file!"

Source code in src/sortition_algorithms/error_messages.py
def get_message(code: str, **params: Any) -> str:
    """
    Get a formatted error message by code.

    This is a helper function for creating error messages that supports both
    English output and provides structured data for translation.

    Args:
        code: The error code to look up in ERROR_MESSAGES
        **params: Parameters to substitute into the message template

    Returns:
        The formatted message string

    Raises:
        KeyError: If the error code is not found in ERROR_MESSAGES

    Example:
        >>> get_message('missing_column', column='id', error_label='for people', data_container='CSV file')
        "No 'id' column for people found in CSV file!"
    """
    template = ERROR_MESSAGES[code]
    return template % params

BadDataError

Bases: SortitionBaseError

Error for when bad data is found while reading things in

Source code in src/sortition_algorithms/errors.py
class BadDataError(SortitionBaseError):
    """Error for when bad data is found while reading things in"""

ConfigurationError

Bases: SortitionBaseError

Error for invalid configuration or settings

Source code in src/sortition_algorithms/errors.py
class ConfigurationError(SortitionBaseError):
    """Error for invalid configuration or settings"""

InfeasibleQuotasCantRelaxError

Bases: SortitionBaseError

The quotas can't be met, and no feasible relaxation was found

Source code in src/sortition_algorithms/errors.py
class InfeasibleQuotasCantRelaxError(SortitionBaseError):
    """The quotas can't be met, and no feasible relaxation was found"""

InfeasibleQuotasError

Bases: SelectionMultilineError

The quotas can't be met, and a feasible relaxation was found.

The details of what relaxations are recommended are included in the error.

Source code in src/sortition_algorithms/errors.py
class InfeasibleQuotasError(SelectionMultilineError):
    """
    The quotas can't be met, and a feasible relaxation was found.

    The details of what relaxations are recommended are included in the error.
    """

    def __init__(self, features: "FeatureCollection", output: list[str]) -> None:
        self.features = features
        super().__init__(
            lines=[
                "It is not possible to hit all the targets with the current set of people. I suggest the following steps:",
                *output,
            ]
        )

    def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
        """Support pickling by returning constructor and arguments."""
        # Extract output from all_lines (skip first line "It is not possible ...")
        output = self.all_lines[1:] if len(self.all_lines) > 1 else []
        return (self.__class__, (self.features, output))

__reduce__()

Support pickling by returning constructor and arguments.

Source code in src/sortition_algorithms/errors.py
def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
    """Support pickling by returning constructor and arguments."""
    # Extract output from all_lines (skip first line "It is not possible ...")
    output = self.all_lines[1:] if len(self.all_lines) > 1 else []
    return (self.__class__, (self.features, output))

ParseErrorsCollector

Class that we can add errors to, but errors with empty messages will be dropped

Source code in src/sortition_algorithms/errors.py
class ParseErrorsCollector:
    """Class that we can add errors to, but errors with empty messages will be dropped"""

    def __init__(self) -> None:
        self.errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] = []

    def __len__(self) -> int:
        """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
        return len(self.errors)

    def add(
        self,
        msg: str,
        key: str,
        value: str,
        row: int,
        row_name: str,
        error_code: str = "",
        error_params: dict[str, str] | dict[str, str | int] | None = None,
    ) -> None:
        if msg:
            self.errors.append(
                ParseTableErrorMsg(
                    row=row,
                    row_name=row_name,
                    key=key,
                    value=value,
                    msg=msg,
                    error_code=error_code,
                    error_params=error_params or {},
                )
            )

    def add_multi_value(
        self,
        msg: str,
        keys: list[str],
        values: list[str],
        row: int,
        row_name: str,
        error_code: str = "",
        error_params: dict[str, str | int] | None = None,
    ) -> None:
        if msg:
            self.errors.append(
                ParseTableMultiValueErrorMsg(
                    row=row,
                    row_name=row_name,
                    keys=keys,
                    values=values,
                    msg=msg,
                    error_code=error_code,
                    error_params=error_params or {},
                )
            )

    def to_error(self) -> ParseTableMultiError:
        return ParseTableMultiError(self.errors)

__len__()

This means that we will be falsy if len is 0, so is effectively a bool as well

Source code in src/sortition_algorithms/errors.py
def __len__(self) -> int:
    """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
    return len(self.errors)

ParseTableMultiError

Bases: SelectionMultilineError

Specifically for collecting errors from parsing a table

This has information that can be collected at a low level. Then higher level code can read the errors and make a SelectionMultilineError instance with strings with more context, relating to a CSV file, Spreadsheet etc.

Source code in src/sortition_algorithms/errors.py
class ParseTableMultiError(SelectionMultilineError):
    """
    Specifically for collecting errors from parsing a table

    This has information that can be collected at a low level. Then higher level code can read
    the errors and make a SelectionMultilineError instance with strings with more context,
    relating to a CSV file, Spreadsheet etc.
    """

    def __init__(self, errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] | None = None) -> None:
        self.all_errors: list[ParseTableErrorMsg | ParseTableMultiValueErrorMsg] = errors or []
        # Fix: Call parent __init__ to set Exception.args properly
        super().__init__(lines=self.lines())

    def __len__(self) -> int:
        """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
        return len(self.all_errors)

    def lines(self) -> list[str]:
        return [str(e) for e in self.all_errors]

    def combine(self, other: SelectionMultilineError) -> None:
        """Add all the lines from the other error to this one."""
        assert isinstance(other, ParseTableMultiError)
        self.all_errors += other.all_errors

    def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
        """Support pickling by returning constructor and arguments."""
        return (self.__class__, (self.all_errors,))

__len__()

This means that we will be falsy if len is 0, so is effectively a bool as well

Source code in src/sortition_algorithms/errors.py
def __len__(self) -> int:
    """This means that we will be falsy if len is 0, so is effectively a __bool__ as well"""
    return len(self.all_errors)

__reduce__()

Support pickling by returning constructor and arguments.

Source code in src/sortition_algorithms/errors.py
def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
    """Support pickling by returning constructor and arguments."""
    return (self.__class__, (self.all_errors,))

combine(other)

Add all the lines from the other error to this one.

Source code in src/sortition_algorithms/errors.py
def combine(self, other: SelectionMultilineError) -> None:
    """Add all the lines from the other error to this one."""
    assert isinstance(other, ParseTableMultiError)
    self.all_errors += other.all_errors

RetryableSelectionError

Bases: SelectionError

For errors where the selection should be retried.

The main case is when the legacy selection algorithm fails, it can be worth retrying as it might find something the next time around.

Source code in src/sortition_algorithms/errors.py
class RetryableSelectionError(SelectionError):
    """
    For errors where the selection should be retried.

    The main case is when the legacy selection algorithm fails, it can be worth
    retrying as it might find something the next time around.
    """

    is_retryable: bool = True

SelectionError

Bases: SortitionBaseError

Generic error for things that happen in selection

Source code in src/sortition_algorithms/errors.py
class SelectionError(SortitionBaseError):
    """Generic error for things that happen in selection"""

SelectionMultilineError

Bases: SelectionError

Generic error for things that happen in selection - multiline

Source code in src/sortition_algorithms/errors.py
class SelectionMultilineError(SelectionError):
    """Generic error for things that happen in selection - multiline"""

    def __init__(
        self,
        lines: list[str],
        is_retryable: bool = False,
        error_code: str = "",
        error_params: dict[str, str | int] | None = None,
    ) -> None:
        message = "\n".join(lines)
        super().__init__(message=message, error_code=error_code, error_params=error_params)
        self.all_lines = lines
        self.is_retryable = is_retryable

    def __str__(self) -> str:
        return "\n".join(self.lines())

    def to_html(self) -> str:
        return "<br />".join(html.escape(line) for line in self.lines())

    def lines(self) -> list[str]:
        return self.all_lines

    def combine(self, other: "SelectionMultilineError") -> None:
        """Add all the lines from the other error to this one."""
        self.all_lines += other.lines()

    def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
        """Support pickling by returning constructor and arguments."""
        return (self.__class__, (self.all_lines, self.is_retryable, self.error_code, self.error_params))

__reduce__()

Support pickling by returning constructor and arguments.

Source code in src/sortition_algorithms/errors.py
def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
    """Support pickling by returning constructor and arguments."""
    return (self.__class__, (self.all_lines, self.is_retryable, self.error_code, self.error_params))

combine(other)

Add all the lines from the other error to this one.

Source code in src/sortition_algorithms/errors.py
def combine(self, other: "SelectionMultilineError") -> None:
    """Add all the lines from the other error to this one."""
    self.all_lines += other.lines()

SortitionBaseError

Bases: Exception

A base class that allows all errors to be caught easily.

Source code in src/sortition_algorithms/errors.py
class SortitionBaseError(Exception):
    """A base class that allows all errors to be caught easily."""

    is_retryable: bool = False

    def __init__(
        self, message: str = "", error_code: str = "", error_params: dict[str, str | int] | None = None
    ) -> None:
        super().__init__(message)
        self.error_code = error_code
        self.error_params = error_params or {}
        self.message = message

    def to_html(self) -> str:
        return html.escape(str(self))

    def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
        """Support pickling by returning constructor and arguments."""
        return (self.__class__, (self.message, self.error_code, self.error_params))

__reduce__()

Support pickling by returning constructor and arguments.

Source code in src/sortition_algorithms/errors.py
def __reduce__(self) -> tuple[type[Any], tuple[Any, ...]]:
    """Support pickling by returning constructor and arguments."""
    return (self.__class__, (self.message, self.error_code, self.error_params))

MinMaxCrossFeatureIssue

A structured result for a cross-feature min/max validation issue.

For 'inconsistent_min_max' issues (from report_min_max_error_details): - smallest_maximum_feature/value and largest_minimum_feature/value are set - feature_name, feature_sum, and limit are not used

For 'min_exceeds_number_to_select' and 'max_below_number_to_select' issues: - feature_name, feature_sum, and limit are set - smallest_maximum_/largest_minimum_ are not used

Source code in src/sortition_algorithms/features.py
@define(kw_only=True, slots=True, eq=True)
class MinMaxCrossFeatureIssue:
    """A structured result for a cross-feature min/max validation issue.

    For 'inconsistent_min_max' issues (from report_min_max_error_details):
      - smallest_maximum_feature/value and largest_minimum_feature/value are set
      - feature_name, feature_sum, and limit are not used

    For 'min_exceeds_number_to_select' and 'max_below_number_to_select' issues:
      - feature_name, feature_sum, and limit are set
      - smallest_maximum_*/largest_minimum_* are not used
    """

    issue_type: str
    message: str
    # Fields for inconsistent_min_max
    smallest_maximum_feature: str = ""
    smallest_maximum_value: int = 0
    largest_minimum_feature: str = ""
    largest_minimum_value: int = 0
    # Fields for per-feature number_to_select issues
    feature_name: str = ""
    feature_sum: int = 0
    limit: int = 0

check_desired(fc, desired_number)

Check if the desired number of people is within the min/max of every feature.

Source code in src/sortition_algorithms/features.py
def check_desired(fc: FeatureCollection, desired_number: int) -> None:
    """
    Check if the desired number of people is within the min/max of every feature.
    """
    errors: list[str] = []
    for feature_name, fvalues in fc.items():
        if desired_number < _fv_minimum_selection(fvalues) or desired_number > _fv_maximum_selection(fvalues):
            errors.append(
                f"The number of people to select ({desired_number}) is out of the range of "
                f"the numbers of people in the {feature_name} feature. It should be within "
                f"[{_fv_minimum_selection(fvalues)}, {_fv_maximum_selection(fvalues)}]."
            )
    if errors:
        raise SelectionMultilineError(errors)

check_min_max(fc, number_to_select=0, feature_column_name='feature')

If the min is bigger than the max we're in trouble i.e. there's an input error

Source code in src/sortition_algorithms/features.py
def check_min_max(fc: FeatureCollection, number_to_select: int = 0, feature_column_name: str = "feature") -> None:
    """
    If the min is bigger than the max we're in trouble i.e. there's an input error
    """
    errors: list[str] = []
    if minimum_selection(fc) > maximum_selection(fc):
        errors += report_min_max_error_details(fc, feature_column_name)
    if number_to_select:
        errors += report_min_max_against_number_to_select(fc, number_to_select, feature_column_name)
    if errors:
        raise SelectionMultilineError(errors)

iterate_feature_collection(features)

Helper function to iterate over feature collection.

Source code in src/sortition_algorithms/features.py
def iterate_feature_collection(features: FeatureCollection) -> Generator[tuple[str, str, FeatureValueMinMax]]:
    """Helper function to iterate over feature collection."""
    for feature_name, feature_values in features.items():
        for value_name, fv_minmax in feature_values.items():
            yield feature_name, value_name, fv_minmax

maximum_selection(fc)

The maximum selection for this set of features is the smallest maximum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
def maximum_selection(fc: FeatureCollection) -> int:
    """
    The maximum selection for this set of features is the smallest maximum selection
    of any individual feature.
    """
    if not fc:
        return 0

    return min(_fv_maximum_selection(fv) for fv in fc.values())

minimum_selection(fc)

The minimum selection for this set of features is the largest minimum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
def minimum_selection(fc: FeatureCollection) -> int:
    """
    The minimum selection for this set of features is the largest minimum selection
    of any individual feature.
    """
    if not fc:
        return 0

    return max(_fv_minimum_selection(fv) for fv in fc.values())

read_in_features(features_head, features_body, number_to_select=0)

Read in stratified selection features and values

Note we do want features_head to ensure we don't have multiple columns with the same name

Source code in src/sortition_algorithms/features.py
def read_in_features(
    features_head: Iterable[str], features_body: Iterable[dict[str, str]], number_to_select: int = 0
) -> tuple[FeatureCollection, str, str]:
    """
    Read in stratified selection features and values

    Note we do want features_head to ensure we don't have multiple columns with the same name
    """
    features: FeatureCollection = CaseInsensitiveDict()
    features_flex, filtered_headers = _feature_headers_flex(list(features_head))
    combined_error = ParseTableMultiError()
    feature_column_name = "feature"
    feature_value_column_name = "value"
    # row 1 is the header, so the body starts on row 2
    for row_number, row in enumerate(features_body, start=2):
        if row_number == 2:
            _, feature_column_name = _get_feature_from_row(row)
            _, feature_value_column_name = _get_feature_value_from_row(row)
        # check the set of keys in the row are the same as the headers
        assert set(filtered_headers) <= set(row.keys())
        stripped_row = utils.normalise_dict(row)
        fname, _ = _get_feature_from_row(row)
        if not fname:
            continue
        try:
            fname, fvalue, fv_minmax = _clean_row(stripped_row, features_flex, row_number)
        except ParseTableMultiError as error:
            # add all the lines into one large error, so we report all the errors in one go
            combined_error.combine(error)
        else:
            if fname not in features:
                features[fname] = CaseInsensitiveDict()
            features[fname][fvalue] = fv_minmax

    # if we got any errors in the above loop, raise the combined error.
    if combined_error:
        raise combined_error

    check_min_max(features, number_to_select=number_to_select, feature_column_name=feature_column_name)
    # check feature_flex to see if we need to set the max here
    # this only changes the max_flex value if these (optional) flex values are NOT set already
    set_default_max_flex(features)
    return CaseInsensitiveDict(features), feature_column_name, feature_value_column_name

report_min_max_against_number_to_select(fc, number_to_select, feature_column_name)

If any combined minimum is > number_to_select we have a problem. If any combined maximum is < number_to_select we have a problem.

Source code in src/sortition_algorithms/features.py
def report_min_max_against_number_to_select(
    fc: FeatureCollection, number_to_select: int, feature_column_name: str
) -> list[str]:
    """
    If any combined minimum is > number_to_select we have a problem.
    If any combined maximum is < number_to_select we have a problem.
    """
    issues = report_min_max_against_number_to_select_structured(fc, number_to_select, feature_column_name)
    return [issue.message for issue in issues]

report_min_max_against_number_to_select_structured(fc, number_to_select, feature_column_name='feature')

Return structured data about features whose min/max conflict with number_to_select.

Source code in src/sortition_algorithms/features.py
def report_min_max_against_number_to_select_structured(
    fc: FeatureCollection, number_to_select: int, feature_column_name: str = "feature"
) -> list[MinMaxCrossFeatureIssue]:
    """Return structured data about features whose min/max conflict with number_to_select."""
    if not fc:
        return []
    issues: list[MinMaxCrossFeatureIssue] = []
    for key, fv in fc.items():
        feature_minimum = _fv_minimum_selection(fv)
        feature_maximum = _fv_maximum_selection(fv)
        if feature_minimum > number_to_select:
            issues.append(
                MinMaxCrossFeatureIssue(
                    issue_type="min_exceeds_number_to_select",
                    message=(
                        f"Minimum for {feature_column_name} {key} ({feature_minimum}) "
                        f"is more than number to select ({number_to_select})"
                    ),
                    feature_name=str(key),
                    feature_sum=feature_minimum,
                    limit=number_to_select,
                )
            )
        if feature_maximum < number_to_select:
            issues.append(
                MinMaxCrossFeatureIssue(
                    issue_type="max_below_number_to_select",
                    message=(
                        f"Maximum for {feature_column_name} {key} ({feature_maximum}) "
                        f"is less than number to select ({number_to_select})"
                    ),
                    feature_name=str(key),
                    feature_sum=feature_maximum,
                    limit=number_to_select,
                )
            )
    return issues

report_min_max_error_details(fc, feature_column_name='feature')

Return a list of problems in detail, so that the user can debug the errors in detail

Source code in src/sortition_algorithms/features.py
def report_min_max_error_details(fc: FeatureCollection, feature_column_name: str = "feature") -> list[str]:
    """
    Return a list of problems in detail, so that the user can debug the errors in detail
    """
    issues = report_min_max_error_details_structured(fc, feature_column_name)
    if not issues:
        return []
    issue = issues[0]
    return [
        f"Inconsistent numbers in min and max in the {feature_column_name} input:",
        f"The smallest maximum is {issue.smallest_maximum_value} for {feature_column_name} '{issue.smallest_maximum_feature}'",
        f"The largest minimum is {issue.largest_minimum_value} for {feature_column_name} '{issue.largest_minimum_feature}'",
        f"You need to reduce the minimums for {issue.largest_minimum_feature} or increase the maximums for {issue.smallest_maximum_feature}.",
    ]

report_min_max_error_details_structured(fc, feature_column_name='feature')

Return structured data about inconsistent min/max across features.

Returns an empty list if features are consistent (minimum_selection <= maximum_selection).

Source code in src/sortition_algorithms/features.py
def report_min_max_error_details_structured(
    fc: FeatureCollection, feature_column_name: str = "feature"
) -> list[MinMaxCrossFeatureIssue]:
    """Return structured data about inconsistent min/max across features.

    Returns an empty list if features are consistent (minimum_selection <= maximum_selection).
    """
    if not fc:
        return []

    min_sel = minimum_selection(fc)
    max_sel = maximum_selection(fc)
    if min_sel <= max_sel:
        return []

    max_feature, max_val = min(((key, _fv_maximum_selection(fv)) for key, fv in fc.items()), key=lambda x: x[1])
    min_feature, min_val = max(((key, _fv_minimum_selection(fv)) for key, fv in fc.items()), key=lambda x: x[1])
    message = (
        f"Inconsistent numbers in min and max in the {feature_column_name} input: "
        f"The smallest maximum is {max_val} for {feature_column_name} '{max_feature}', "
        f"the largest minimum is {min_val} for {feature_column_name} '{min_feature}'. "
        f"You need to reduce the minimums for {min_feature} or increase the maximums for {max_feature}."
    )
    return [
        MinMaxCrossFeatureIssue(
            issue_type="inconsistent_min_max",
            message=message,
            smallest_maximum_feature=str(max_feature),
            smallest_maximum_value=max_val,
            largest_minimum_feature=str(min_feature),
            largest_minimum_value=min_val,
        )
    ]

set_default_max_flex(fc)

Note this only sets it if left at the default value

Source code in src/sortition_algorithms/features.py
def set_default_max_flex(fc: FeatureCollection) -> None:
    """Note this only sets it if left at the default value"""
    max_flex = _safe_max_flex_val(fc)
    for feature_values in fc.values():
        for fv_minmax in feature_values.values():
            fv_minmax.set_default_max_flex(max_flex)

write_features(fc)

Convert a FeatureCollection back to tabular format (inverse of read_in_features).

Parameters:

Name Type Description Default
fc FeatureCollection

The FeatureCollection to convert

required

Returns:

Type Description
list[str]

A tuple of (headers, body) where:

list[dict[str, str]]
  • headers is a list of column names
tuple[list[str], list[dict[str, str]]]
  • body is a list of dicts, each representing a row

The output format uses modern column names ("feature", "value", "min", "max"). Flex columns ("min_flex", "max_flex") are included only if any feature value has non-default flex values.

Source code in src/sortition_algorithms/features.py
def write_features(fc: FeatureCollection) -> tuple[list[str], list[dict[str, str]]]:
    """
    Convert a FeatureCollection back to tabular format (inverse of read_in_features).

    Args:
        fc: The FeatureCollection to convert

    Returns:
        A tuple of (headers, body) where:
        - headers is a list of column names
        - body is a list of dicts, each representing a row

    The output format uses modern column names ("feature", "value", "min", "max").
    Flex columns ("min_flex", "max_flex") are included only if any feature value
    has non-default flex values.
    """
    # Determine if we need flex columns
    include_flex = _has_non_default_flex(fc)

    # Build headers
    headers = list(FEATURE_FILE_FIELD_NAMES_FLEX) if include_flex else list(FEATURE_FILE_FIELD_NAMES)

    # Build body
    body: list[dict[str, str]] = []
    for feature_name, feature_values in fc.items():
        for value_name, fv_minmax in feature_values.items():
            row = {
                "feature": str(feature_name),
                "value": str(value_name),
                "min": str(fv_minmax.min),
                "max": str(fv_minmax.max),
            }
            if include_flex:
                row["min_flex"] = str(fv_minmax.min_flex)
                row["max_flex"] = str(fv_minmax.max_flex)
            body.append(row)

    return headers, body

FeatureValueCountCheck

A structured result for a feature value where there are not enough people.

Source code in src/sortition_algorithms/people.py
@define(kw_only=True, slots=True, eq=True)
class FeatureValueCountCheck:
    """A structured result for a feature value where there are not enough people."""

    feature_name: str
    value_name: str
    min_required: int
    actual_count: int

People

Source code in src/sortition_algorithms/people.py
class People:
    def __init__(self, columns_to_keep: list[str]) -> None:
        self._columns_to_keep = columns_to_keep
        self._full_data: dict[str, MutableMapping[str, str]] = {}

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self._full_data == other._full_data and self._columns_to_keep == other._columns_to_keep

    @property
    def count(self) -> int:
        return len(self._full_data)

    def __iter__(self) -> Iterator[str]:
        return iter(self._full_data)

    def items(self) -> ItemsView[str, MutableMapping[str, str]]:
        return self._full_data.items()

    def add(
        self,
        person_key: str,
        data: MutableMapping[str, str],
        features: FeatureCollection,
        row_number: int,
        feature_column_name: str = "feature",
    ) -> None:
        person_full_data: MutableMapping[str, str] = CaseInsensitiveDict()
        errors = ParseErrorsCollector()
        # get the feature values: these are the most important and we must check them
        for feature_name, feature_values in features.items():
            # check for input errors here - if it's not in the list of feature values...
            # allow for some unclean data - at least strip empty space, but only if a str!
            # (some values will can be numbers)
            p_value = data[feature_name]
            if p_value in feature_values:
                person_full_data[feature_name] = p_value
            else:
                if p_value:
                    msg = f"Value '{p_value}' not in {feature_column_name} {feature_name}"
                    error_code = "value_not_in_feature"
                    error_params = {
                        "value": p_value,
                        "feature_column_name": feature_column_name,
                        "feature_name": feature_name,
                    }
                else:
                    msg = f"Empty value in {feature_column_name} {feature_name}"
                    error_code = "empty_value_in_feature"
                    error_params = {"feature_column_name": feature_column_name, "feature_name": feature_name}
                errors.add(
                    msg=msg,
                    key=feature_name,
                    value=p_value,
                    row=row_number,
                    row_name=person_key,
                    error_code=error_code,
                    error_params=error_params,
                )
        if errors:
            raise errors.to_error()
        # then get the other column values we need
        # this is address, name etc that we need to keep for output file
        # we don't check anything here - it's just for user convenience
        for col in self._columns_to_keep:
            person_full_data[col] = data[col]

        # add all the data to our people object
        self._full_data[person_key] = person_full_data

    def remove(self, person_key: str) -> None:
        del self._full_data[person_key]

    def remove_many(self, person_keys: Iterable[str]) -> None:
        for key in person_keys:
            self.remove(key)

    def get_person_dict(self, person_key: str) -> MutableMapping[str, str]:
        return self._full_data[person_key]

    @staticmethod
    def _address_tuple(person_dict: MutableMapping[str, str], address_columns: Iterable[str]) -> tuple[str, ...]:
        return tuple(person_dict[col].lower() for col in address_columns)

    def get_address(self, person_key: str, address_columns: Iterable[str]) -> tuple[str, ...]:
        return self._address_tuple(self._full_data[person_key], address_columns)

    def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
        """
        Generates a dict with:
        - keys: a tuple containing the address strings
        - values: a list of person_key for each person at that address
        """
        households = defaultdict(list)
        for person_key, person in self._full_data.items():
            households[self._address_tuple(person, address_columns)].append(person_key)
        return households

    def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
        """
        Returns a list of person keys for all people who have an address matching
        the address of the person passed in.
        """
        person = self._full_data[person_key]
        person_address = self._address_tuple(person, address_columns)
        for loop_key, loop_person in self._full_data.items():
            if loop_key == person_key:
                continue  # skip the person we've been given
            if person_address == self._address_tuple(loop_person, address_columns):
                yield loop_key

    def _iter_matching(self, feature_name: str, feature_value: str) -> Generator[str]:
        for person_key, person_dict in self._full_data.items():
            if person_dict[feature_name].lower() == feature_value.lower():
                yield person_key

    def count_feature_value(self, feature_name: str, feature_value: str) -> int:
        return len(list(self._iter_matching(feature_name, feature_value)))

    def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
        """
        Find the nth person (1-indexed) in a specific feature category.

        Args:
            feature_name: Name of the feature (e.g., "gender")
            feature_value: Value of the feature (e.g., "male")
            position: 1-indexed position within the category

        Returns:
            Person key of the person at the specified position

        Raises:
            SelectionError: If no person is found at the specified position
        """
        people_in_category = list(self._iter_matching(feature_name, feature_value))
        try:
            return people_in_category[position - 1]
        except IndexError:
            # Should always find someone if position is valid
            # If we hit this line it is a bug
            msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
            raise SelectionError(
                message=msg,
                error_code="person_not_found",
                error_params={"position": position, "feature_name": feature_name, "feature_value": feature_value},
            ) from None

find_person_by_position_in_category(feature_name, feature_value, position)

Find the nth person (1-indexed) in a specific feature category.

Parameters:

Name Type Description Default
feature_name str

Name of the feature (e.g., "gender")

required
feature_value str

Value of the feature (e.g., "male")

required
position int

1-indexed position within the category

required

Returns:

Type Description
str

Person key of the person at the specified position

Raises:

Type Description
SelectionError

If no person is found at the specified position

Source code in src/sortition_algorithms/people.py
def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
    """
    Find the nth person (1-indexed) in a specific feature category.

    Args:
        feature_name: Name of the feature (e.g., "gender")
        feature_value: Value of the feature (e.g., "male")
        position: 1-indexed position within the category

    Returns:
        Person key of the person at the specified position

    Raises:
        SelectionError: If no person is found at the specified position
    """
    people_in_category = list(self._iter_matching(feature_name, feature_value))
    try:
        return people_in_category[position - 1]
    except IndexError:
        # Should always find someone if position is valid
        # If we hit this line it is a bug
        msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
        raise SelectionError(
            message=msg,
            error_code="person_not_found",
            error_params={"position": position, "feature_name": feature_name, "feature_value": feature_value},
        ) from None

households(address_columns)

Generates a dict with: - keys: a tuple containing the address strings - values: a list of person_key for each person at that address

Source code in src/sortition_algorithms/people.py
def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
    """
    Generates a dict with:
    - keys: a tuple containing the address strings
    - values: a list of person_key for each person at that address
    """
    households = defaultdict(list)
    for person_key, person in self._full_data.items():
        households[self._address_tuple(person, address_columns)].append(person_key)
    return households

matching_address(person_key, address_columns)

Returns a list of person keys for all people who have an address matching the address of the person passed in.

Source code in src/sortition_algorithms/people.py
def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
    """
    Returns a list of person keys for all people who have an address matching
    the address of the person passed in.
    """
    person = self._full_data[person_key]
    person_address = self._address_tuple(person, address_columns)
    for loop_key, loop_person in self._full_data.items():
        if loop_key == person_key:
            continue  # skip the person we've been given
        if person_address == self._address_tuple(loop_person, address_columns):
            yield loop_key

WeightedSample

Source code in src/sortition_algorithms/people.py
class WeightedSample:
    def __init__(self, features: FeatureCollection) -> None:
        """
        This produces a set of lists of feature values for each feature.  Each value
        is in the list `fv_minmax.max` times - so a random choice with represent the max.

        So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
        "black" with max 2 we'd get:

        ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

        Then making random choices from that list produces a weighted sample.
        """
        self.weighted: dict[str, list[str]] = defaultdict(list)
        for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
            self.weighted[feature_name] += [fvalue_name] * fv_minmax.max

    def value_for(self, feature_name: str) -> str:
        # S311 is random numbers for crypto - but this is just for a sample file
        return random_provider().choice(self.weighted[feature_name])

__init__(features)

This produces a set of lists of feature values for each feature. Each value is in the list fv_minmax.max times - so a random choice with represent the max.

So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and "black" with max 2 we'd get:

["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

Then making random choices from that list produces a weighted sample.

Source code in src/sortition_algorithms/people.py
def __init__(self, features: FeatureCollection) -> None:
    """
    This produces a set of lists of feature values for each feature.  Each value
    is in the list `fv_minmax.max` times - so a random choice with represent the max.

    So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
    "black" with max 2 we'd get:

    ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

    Then making random choices from that list produces a weighted sample.
    """
    self.weighted: dict[str, list[str]] = defaultdict(list)
    for feature_name, fvalue_name, fv_minmax in iterate_feature_collection(features):
        self.weighted[feature_name] += [fvalue_name] * fv_minmax.max

check_enough_people_for_every_feature_value(features, people)

For each feature/value, if the min>0, check there are enough people who have that feature/value

Source code in src/sortition_algorithms/people.py
def check_enough_people_for_every_feature_value(features: FeatureCollection, people: People) -> None:
    """For each feature/value, if the min>0, check there are enough people who have that feature/value"""
    issues = check_people_per_feature_value(features, people)
    if issues:
        error_list = [
            f"Not enough people with the value '{issue.value_name}' in category '{issue.feature_name}' - "
            f"the minimum is {issue.min_required} but we only have {issue.actual_count}"
            for issue in issues
        ]
        raise SelectionMultilineError(error_list)

check_for_duplicate_people(people_body, settings)

If we have rows with duplicate IDs things are going to go bad. First check for any duplicate IDs. If we find any, check if the duplicates are identical.

Returns:

Type Description
RunReport

RunReport containing warnings about duplicate people

Raises:

Type Description
SelectionMultilineError

If duplicate IDs have different data

Source code in src/sortition_algorithms/people.py
def check_for_duplicate_people(people_body: Iterable[MutableMapping[str, str]], settings: Settings) -> RunReport:
    """
    If we have rows with duplicate IDs things are going to go bad.
    First check for any duplicate IDs. If we find any, check if the duplicates are identical.

    Returns:
        RunReport containing warnings about duplicate people

    Raises:
        SelectionMultilineError: If duplicate IDs have different data
    """
    report = RunReport()

    # first check for any duplicate_ids
    id_counter = Counter(row[settings.id_column] for row in people_body)
    duplicate_ids = {k for k, v in id_counter.items() if v > 1}
    if not duplicate_ids:
        return report

    # find the duplicate rows
    duplicate_rows: dict[str, list[MutableMapping[str, str]]] = defaultdict(list)
    for row in people_body:
        pkey = row[settings.id_column]
        if pkey in duplicate_ids:
            duplicate_rows[pkey].append(row)

    report.add_message("duplicate_ids_found", count=len(duplicate_rows))
    report.add_message("duplicate_ids_list", ids=" ".join(duplicate_rows))

    # find rows where everything is not equal
    duplicate_differing_rows: dict[str, list[MutableMapping[str, str]]] = {}
    for key, value in duplicate_rows.items():
        if not _all_in_list_equal(value):
            duplicate_differing_rows[key] = value
    if not duplicate_differing_rows:
        report.add_message("duplicate_rows_identical")
        return report

    # Build error message with full context
    output: list[str] = []
    output.append(f"Found {len(duplicate_rows)} IDs that have more than one row")
    output.append(f"Duplicated IDs are: {' '.join(duplicate_rows)}")
    output.append(f"Found {len(duplicate_differing_rows)} IDs that have more than one row with different data")
    for key, value in duplicate_differing_rows.items():
        for row in value:
            output.append(f"For ID '{key}' one row of data is: {row}")
    raise SelectionMultilineError(output)

check_people_per_feature_value(features, people)

Return structured data about feature values with insufficient people.

Unlike check_enough_people_for_every_feature_value(), this does not raise an exception — it returns a list of issues that callers can inspect.

Source code in src/sortition_algorithms/people.py
def check_people_per_feature_value(features: FeatureCollection, people: People) -> list[FeatureValueCountCheck]:
    """Return structured data about feature values with insufficient people.

    Unlike check_enough_people_for_every_feature_value(), this does not raise
    an exception — it returns a list of issues that callers can inspect.
    """
    issues: list[FeatureValueCountCheck] = []
    for fname, fvalue, fv_minmax in iterate_feature_collection(features):
        matching_count = people.count_feature_value(fname, fvalue)
        if matching_count < fv_minmax.min:
            issues.append(
                FeatureValueCountCheck(
                    feature_name=fname,
                    value_name=fvalue,
                    min_required=fv_minmax.min,
                    actual_count=matching_count,
                )
            )
    return issues

count_people_per_feature_value(features, people)

Return {feature_name: {value_name: count}} for all feature values.

Source code in src/sortition_algorithms/people.py
def count_people_per_feature_value(features: FeatureCollection, people: People) -> dict[str, dict[str, int]]:
    """Return {feature_name: {value_name: count}} for all feature values."""
    result: dict[str, dict[str, int]] = {}
    for fname, fvalue, _ in iterate_feature_collection(features):
        if fname not in result:
            result[fname] = {}
        result[fname][fvalue] = people.count_feature_value(fname, fvalue)
    return result

exclude_matching_selected_addresses(people, already_selected, settings)

If we are checking the same addresses, then we should start by excluding people who have the same address as someone who is already selected.

Source code in src/sortition_algorithms/people.py
def exclude_matching_selected_addresses(people: People, already_selected: People | None, settings: Settings) -> People:
    """
    If we are checking the same addresses, then we should start by excluding people
    who have the same address as someone who is already selected.
    """
    if already_selected is None or not already_selected.count or not settings.check_same_address:
        return people
    selected_addresses = {
        already_selected.get_address(pkey, settings.check_same_address_columns) for pkey in already_selected
    }
    new_people = deepcopy(people)
    for person_key in people:
        if people.get_address(person_key, settings.check_same_address_columns) in selected_addresses:
            new_people.remove(person_key)
    return new_people

SelectCounts

Note that remaining and most of the methods are only used by the legacy algorithm. But to avoid duplication we also use this class, and the associated methods for both reporting and the legacy algorithm. Maybe at some point we will duplicate the code and have separate versions for the two purposes.

Source code in src/sortition_algorithms/people_features.py
@define(kw_only=True, slots=True)
class SelectCounts:
    """
    Note that remaining and most of the methods are only used by the legacy algorithm.
    But to avoid duplication we also use this class, and the associated methods for both
    reporting and the legacy algorithm. Maybe at some point we will duplicate the code
    and have separate versions for the two purposes.
    """

    min_max: FeatureValueMinMax
    selected: int = 0
    remaining: int = 0

    def add_remaining(self) -> None:
        self.remaining += 1

    def add_selected(self) -> None:
        self.selected += 1

    def remove_remaining(self) -> None:
        self.remaining -= 1
        if self.remaining == 0 and self.selected < self.min_max.min:
            msg = "SELECTION IMPOSSIBLE: FAIL - no one/not enough left after deletion."
            raise errors.RetryableSelectionError(msg)

    @property
    def hit_target(self) -> bool:
        """Return true if selected is between min and max (inclusive)"""
        return self.selected >= self.min_max.min and self.selected <= self.min_max.max

    def percent_selected(self, number_people_wanted: int) -> float:
        return self.selected * 100 / float(number_people_wanted)

    @property
    def people_still_needed(self) -> int:
        """The number of extra people to select to get to the minimum - it should never be negative"""
        return max(self.min_max.min - self.selected, 0)

    def sufficient_people(self) -> bool:
        """
        Return true if we can still make the minimum. So either:
        - we have already selected at least the minimum, or
        - the remaining number is at least as big as the number still required
        """
        return self.selected >= self.min_max.min or self.remaining >= self.people_still_needed

hit_target property

Return true if selected is between min and max (inclusive)

people_still_needed property

The number of extra people to select to get to the minimum - it should never be negative

sufficient_people()

Return true if we can still make the minimum. So either: - we have already selected at least the minimum, or - the remaining number is at least as big as the number still required

Source code in src/sortition_algorithms/people_features.py
def sufficient_people(self) -> bool:
    """
    Return true if we can still make the minimum. So either:
    - we have already selected at least the minimum, or
    - the remaining number is at least as big as the number still required
    """
    return self.selected >= self.min_max.min or self.remaining >= self.people_still_needed

iterate_select_collection(select_collection)

Helper function to iterate over select_collection.

Source code in src/sortition_algorithms/people_features.py
def iterate_select_collection(select_collection: SelectCollection) -> Generator[tuple[str, str, SelectCounts]]:
    """Helper function to iterate over select_collection."""
    for feature_name, feature_values in select_collection.items():
        for value_name, select_counts in feature_values.items():
            yield feature_name, value_name, select_counts

simple_add_selected(person_keys, people, features)

Just add the person to the selected counts for the feature values for that person. Don't do the more complex handling of the full PeopleFeatures.add_selected()

Source code in src/sortition_algorithms/people_features.py
def simple_add_selected(person_keys: Iterable[str], people: People, features: SelectCollection) -> None:
    """
    Just add the person to the selected counts for the feature values for that person.
    Don't do the more complex handling of the full PeopleFeatures.add_selected()
    """
    for person_key in person_keys:
        person = people.get_person_dict(person_key)
        for feature_name in features:
            feature_value = person[feature_name]
            features[feature_name][feature_value].add_selected()

Settings

Settings to use when selecting committees. Note that only the first two are required. A minimal example would be:

Settings(id_column="my_id", columns_to_keep=["name", "email"])

Source code in src/sortition_algorithms/settings.py
@define
class Settings:
    """
    Settings to use when selecting committees. Note that only the first two are required.
    A minimal example would be:

    Settings(id_column="my_id", columns_to_keep=["name", "email"])
    """

    # required
    id_column: str = field(validator=validators.instance_of(str))
    columns_to_keep: list[str] = field()

    # fields with defaults
    check_same_address: bool = field(validator=validators.instance_of(bool), default=False)
    check_same_address_columns: list[str] = field(validator=check_columns_for_same_address, factory=list)
    max_attempts: int = field(validator=validators.instance_of(int), default=100)
    selection_algorithm: str = field(default=DEFAULT_ALGORITHM)
    solver_backend: str = field(default=DEFAULT_BACKEND)
    random_number_seed: int = field(validator=validators.instance_of(int), default=0)

    @columns_to_keep.validator
    def check_columns_to_keep(self, attribute: Any, value: Any) -> None:
        if not isinstance(value, list):
            raise TypeError("columns_to_keep must be a LIST of strings", "columns_to_keep_not_list", {})
        if not all(isinstance(element, str) for element in value):
            raise TypeError("columns_to_keep must be a list of STRINGS", "columns_to_keep_not_strings", {})

    @selection_algorithm.validator
    def check_selection_algorithm(self, attribute: Any, value: str) -> None:
        if value not in SELECTION_ALGORITHMS:
            algorithms_str = ", ".join(SELECTION_ALGORITHMS)
            raise ConfigurationError(
                message=f"selection_algorithm {value} is not one of: {algorithms_str}",
                error_code="invalid_selection_algorithm",
                error_params={"algorithm": value, "valid_algorithms": algorithms_str},
            )

    @solver_backend.validator
    def check_solver_backend(self, attribute: Any, value: str) -> None:
        if value not in SOLVER_BACKENDS:
            backends_str = ", ".join(SOLVER_BACKENDS)
            raise ConfigurationError(
                message=f"solver_backend {value} is not one of: {backends_str}",
                error_code="invalid_solver_backend",
                error_params={"backend": value, "valid_backends": backends_str},
            )

    @property
    def normalised_address_columns(self) -> list[str]:
        """
        Returns an empty list if address columns should not be checked (or if the columns
        specified was an empty list). Otherwise return the columns. That way other code can
        just check if the columns are empty rather than checking the bool separately.
        """
        return self.check_same_address_columns if self.check_same_address else []

    @property
    def full_columns_to_keep(self) -> list[str]:
        """
        We always need to keep the address columns, so in case they are not listed in
        self.columns_to_keep we have this property to have the combined list.
        """
        extra_address_columns = [col for col in self.check_same_address_columns if col not in self.columns_to_keep]
        return [*self.columns_to_keep, *extra_address_columns]

    def as_dict(self) -> dict[str, Any]:
        dict_repr = unstructure(self)
        assert isinstance(dict_repr, dict)
        assert all(isinstance(key, str) for key in dict_repr)
        return dict_repr

    @classmethod
    def load_from_file(
        cls,
        settings_file_path: Path,
    ) -> tuple["Settings", RunReport]:
        report = RunReport()
        if not settings_file_path.is_file():
            with open(settings_file_path, "w", encoding="utf-8") as settings_file:
                settings_file.write(DEFAULT_SETTINGS)
            report.add_message("wrote_default_settings", file_path=str(settings_file_path.absolute()))
        with open(settings_file_path, "rb") as settings_file:
            settings = tomllib.load(settings_file)
        # you can't check an address if there is no info about which columns to check...
        if settings["check_same_address"] is False:
            report.add_message("address_checking_disabled_warning", level=ReportLevel.IMPORTANT)
            settings["check_same_address_columns"] = []
        return structure(settings, cls), report

full_columns_to_keep property

We always need to keep the address columns, so in case they are not listed in self.columns_to_keep we have this property to have the combined list.

normalised_address_columns property

Returns an empty list if address columns should not be checked (or if the columns specified was an empty list). Otherwise return the columns. That way other code can just check if the columns are empty rather than checking the bool separately.

RandomProvider

Bases: ABC

This is something of a hack. Mostly we want to use the secrets module. But for repeatable testing we might want to set the random.seed sometimes.

So we have a global _random_provider which can be switched between an instance of this class that uses the secrets module and an instance that uses random with a seed. The switch is done by the set_random_provider() function.

Then every time we want some randomness, we call random_provider() to get the current version of the global.

Source code in src/sortition_algorithms/utils.py
class RandomProvider(ABC):
    """
    This is something of a hack. Mostly we want to use the `secrets` module.
    But for repeatable testing we might want to set the random.seed sometimes.

    So we have a global `_random_provider` which can be switched between an
    instance of this class that uses the `secrets` module and an instance that
    uses `random` with a seed. The switch is done by the `set_random_provider()`
    function.

    Then every time we want some randomness, we call `random_provider()` to get
    the current version of the global.
    """

    # tests fail if we use sys.maxsize, so we use the max signed 32 bit int instead
    max_int = 2**31 - 1

    @classmethod
    @abstractmethod
    def uniform(cls, lower: float, upper: float) -> float: ...

    @classmethod
    @abstractmethod
    def randbelow(cls, upper: int) -> int: ...

    @classmethod
    def randint(cls) -> int:
        return cls.randbelow(cls.max_int)

    @classmethod
    @abstractmethod
    def choice(cls, seq: "SupportsLenAndGetItem[str]") -> str: ...

RunReport

A class to hold a report to show to the user at the end

Source code in src/sortition_algorithms/utils.py
@define(eq=True)
class RunReport:
    """A class to hold a report to show to the user at the end"""

    _data: list[RunLineLevel | RunTable | RunError] = field(factory=list)

    def __bool__(self) -> bool:
        """
        Basically, False is the report is empty, or True if there is some content. So you can do
        things like

        ```
        if run_report:
            print(f"Run Report\n\n{run_report.as_text()}")
        ```
        """
        return self.has_content()

    def serialize(self) -> dict[str, Any]:
        return _converter.unstructure(self)  # type: ignore[no-any-return]

    @classmethod
    def deserialize(cls, serialized_data: dict[str, Any]) -> "RunReport":
        return _converter.structure(serialized_data, cls)

    def has_content(self) -> bool:
        """
        False is the report is empty, or True if there is some content. So you can do
        things like

        ```
        if run_report.has_content():
            print(f"Run Report\n\n{run_report.as_text()}")
        ```
        """
        return bool(self._data)

    def add_line(
        self,
        line: str,
        level: ReportLevel = ReportLevel.NORMAL,
        message_code: str | None = None,
        message_params: dict[str, Any] | None = None,
    ) -> None:
        """
        Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

        Args:
            line: The English message text (for backward compatibility and standalone use)
            level: Importance level of the message
            message_code: Optional translation key for i18n (e.g., "loading_features_from_file")
            message_params: Optional parameters for message translation (e.g., {"file_path": "features.csv"})
        """
        self._data.append(RunLineLevel(line, level, message_code=message_code, message_params=message_params or {}))

    def add_line_and_log(
        self,
        line: str,
        log_level: int,
        message_code: str | None = None,
        message_params: dict[str, Any] | None = None,
    ) -> None:
        """
        Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

        This method will also log the message to the `user_logger`. This message can be shown to the user as
        the run is happening, so the user has feedback on what is going on while the run is in progress.

        When generating the report we can skip those messages, to avoid duplication. But if the user_logger
        has not been set up to be shown to the user during the run, we do want those messages to be in the
        final report.

        Args:
            line: The English message text (for backward compatibility and standalone use)
            log_level: Logging level for the message
            message_code: Optional translation key for i18n (e.g., "trial_number")
            message_params: Optional parameters for message translation (e.g., {"trial": 3})
        """
        self._data.append(
            RunLineLevel(
                line, ReportLevel.NORMAL, log_level, message_code=message_code, message_params=message_params or {}
            )
        )
        user_logger.log(level=log_level, msg=line)

    def add_message(self, code: str, level: ReportLevel = ReportLevel.NORMAL, **params: Any) -> None:
        """
        Add a translatable message using a message code and parameters.

        This is a convenience method that combines get_message() and add_line() in one call,
        making it simpler to add messages with translation support.

        Args:
            code: The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")
            level: Importance level of the message
            **params: Parameters to substitute into the message template

        Example:
            >>> report.add_message("features_found", count=5)
            >>> report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)
        """
        message = get_message(code, **params)
        self.add_line(message, level=level, message_code=code, message_params=params)

    def add_message_and_log(self, code: str, log_level: int, **params: Any) -> None:
        """
        Add a translatable message using a message code and parameters, and log it.

        This is a convenience method that combines get_message() and add_line_and_log() in one call,
        making it simpler to add messages with translation support that are also logged.

        Args:
            code: The message code from REPORT_MESSAGES (e.g., "trial_number")
            log_level: Logging level for the message
            **params: Parameters to substitute into the message template

        Example:
            >>> report.add_message_and_log("trial_number", logging.WARNING, trial=3)
            >>> report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)
        """
        message = get_message(code, **params)
        self.add_line_and_log(message, log_level, message_code=code, message_params=params)

    def add_lines(self, lines: Iterable[str], level: ReportLevel = ReportLevel.NORMAL) -> None:
        """
        Add multiple lines of text with the same level.

        .. deprecated:: (next version)
            This method is deprecated. Functions should return RunReport instead of list[str],
            and callers should use add_report() to merge them. This provides better support
            for translation and structured reporting.
        """
        warnings.warn(
            "add_lines() is deprecated. Functions should return RunReport instead of list[str], "
            "and use add_report() to merge them.",
            DeprecationWarning,
            stacklevel=2,
        )
        for line in lines:
            self._data.append(RunLineLevel(line, level))

    def add_table(self, table_headings: list[str], table_data: list[list[str | int | float]]) -> None:
        self._data.append(RunTable(table_headings, table_data))

    def add_error(self, error: Exception, is_fatal: bool = True) -> None:
        self._data.append(RunError(error, is_fatal=is_fatal))

    def add_report(self, other: "RunReport") -> None:
        self._data += other._data

    def _element_to_text(self, element: RunLineLevel | RunTable | RunError, include_logged: bool) -> str | None:
        if isinstance(element, RunLineLevel):
            # we might want to skip lines that were already logged
            if include_logged or element.log_level == logging.NOTSET:
                return element.line
            else:
                # sometimes we want empty strings for blank lines, so here we return None
                # instead so the logged lines can be filtered out
                return None
        elif isinstance(element, RunTable):
            table_text = tabulate(element.data, headers=element.headers, tablefmt="simple")
            # we want a blank line before and after the table.
            return f"\n{table_text}\n"
        else:
            return str(element.error)

    def as_text(self, include_logged: bool = True) -> str:
        parts = [self._element_to_text(element, include_logged) for element in self._data]
        return "\n".join(p for p in parts if p is not None)

    def _line_to_html(self, line_level: RunLineLevel) -> str:
        tags = {
            ReportLevel.NORMAL: ("", ""),
            ReportLevel.IMPORTANT: ("<b>", "</b>"),
            ReportLevel.CRITICAL: ('<b style="color: red">', "</b>"),
        }
        start_tag, end_tag = tags[line_level.level]
        escaped_line = html.escape(line_level.line)
        return f"{start_tag}{escaped_line}{end_tag}"

    def _error_to_html(self, run_error: RunError) -> str:
        start_tag, end_tag = ("<b>", "</b>") if run_error.is_fatal else ("", "")
        if isinstance(run_error.error, errors.SortitionBaseError):
            return f"{start_tag}{run_error.error.to_html()}{end_tag}"
        # default to the string representation
        return f"{start_tag}{run_error.error}{end_tag}"

    def _element_to_html(self, element: RunLineLevel | RunTable | RunError, include_logged: bool) -> str | None:
        if isinstance(element, RunLineLevel):
            if include_logged or element.log_level == logging.NOTSET:
                return self._line_to_html(element)
            else:
                return None
        elif isinstance(element, RunTable):
            return tabulate(element.data, headers=element.headers, tablefmt="html")
        else:
            return self._error_to_html(element)

    def as_html(self, include_logged: bool = True) -> str:
        parts = [self._element_to_html(element, include_logged) for element in self._data]
        return "<br />\n".join(p for p in parts if p is not None)

    def last_error(self) -> Exception | None:
        for element in reversed(self._data):
            if isinstance(element, RunError):
                return element.error
        return None

__bool__()

    Basically, False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report:
        print(f"Run Report

{run_report.as_text()}") ```

Source code in src/sortition_algorithms/utils.py
def __bool__(self) -> bool:
    """
    Basically, False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report:
        print(f"Run Report\n\n{run_report.as_text()}")
    ```
    """
    return self.has_content()

add_line(line, level=ReportLevel.NORMAL, message_code=None, message_params=None)

Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

Parameters:

Name Type Description Default
line str

The English message text (for backward compatibility and standalone use)

required
level ReportLevel

Importance level of the message

NORMAL
message_code str | None

Optional translation key for i18n (e.g., "loading_features_from_file")

None
message_params dict[str, Any] | None

Optional parameters for message translation (e.g., {"file_path": "features.csv"})

None
Source code in src/sortition_algorithms/utils.py
def add_line(
    self,
    line: str,
    level: ReportLevel = ReportLevel.NORMAL,
    message_code: str | None = None,
    message_params: dict[str, Any] | None = None,
) -> None:
    """
    Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

    Args:
        line: The English message text (for backward compatibility and standalone use)
        level: Importance level of the message
        message_code: Optional translation key for i18n (e.g., "loading_features_from_file")
        message_params: Optional parameters for message translation (e.g., {"file_path": "features.csv"})
    """
    self._data.append(RunLineLevel(line, level, message_code=message_code, message_params=message_params or {}))

add_line_and_log(line, log_level, message_code=None, message_params=None)

Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

This method will also log the message to the user_logger. This message can be shown to the user as the run is happening, so the user has feedback on what is going on while the run is in progress.

When generating the report we can skip those messages, to avoid duplication. But if the user_logger has not been set up to be shown to the user during the run, we do want those messages to be in the final report.

Parameters:

Name Type Description Default
line str

The English message text (for backward compatibility and standalone use)

required
log_level int

Logging level for the message

required
message_code str | None

Optional translation key for i18n (e.g., "trial_number")

None
message_params dict[str, Any] | None

Optional parameters for message translation (e.g., {"trial": 3})

None
Source code in src/sortition_algorithms/utils.py
def add_line_and_log(
    self,
    line: str,
    log_level: int,
    message_code: str | None = None,
    message_params: dict[str, Any] | None = None,
) -> None:
    """
    Add a line of text, and a level - so important/critical messages can be highlighted in the HTML report.

    This method will also log the message to the `user_logger`. This message can be shown to the user as
    the run is happening, so the user has feedback on what is going on while the run is in progress.

    When generating the report we can skip those messages, to avoid duplication. But if the user_logger
    has not been set up to be shown to the user during the run, we do want those messages to be in the
    final report.

    Args:
        line: The English message text (for backward compatibility and standalone use)
        log_level: Logging level for the message
        message_code: Optional translation key for i18n (e.g., "trial_number")
        message_params: Optional parameters for message translation (e.g., {"trial": 3})
    """
    self._data.append(
        RunLineLevel(
            line, ReportLevel.NORMAL, log_level, message_code=message_code, message_params=message_params or {}
        )
    )
    user_logger.log(level=log_level, msg=line)

add_lines(lines, level=ReportLevel.NORMAL)

Add multiple lines of text with the same level.

.. deprecated:: (next version) This method is deprecated. Functions should return RunReport instead of list[str], and callers should use add_report() to merge them. This provides better support for translation and structured reporting.

Source code in src/sortition_algorithms/utils.py
def add_lines(self, lines: Iterable[str], level: ReportLevel = ReportLevel.NORMAL) -> None:
    """
    Add multiple lines of text with the same level.

    .. deprecated:: (next version)
        This method is deprecated. Functions should return RunReport instead of list[str],
        and callers should use add_report() to merge them. This provides better support
        for translation and structured reporting.
    """
    warnings.warn(
        "add_lines() is deprecated. Functions should return RunReport instead of list[str], "
        "and use add_report() to merge them.",
        DeprecationWarning,
        stacklevel=2,
    )
    for line in lines:
        self._data.append(RunLineLevel(line, level))

add_message(code, level=ReportLevel.NORMAL, **params)

Add a translatable message using a message code and parameters.

This is a convenience method that combines get_message() and add_line() in one call, making it simpler to add messages with translation support.

Parameters:

Name Type Description Default
code str

The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")

required
level ReportLevel

Importance level of the message

NORMAL
**params Any

Parameters to substitute into the message template

{}
Example

report.add_message("features_found", count=5) report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)

Source code in src/sortition_algorithms/utils.py
def add_message(self, code: str, level: ReportLevel = ReportLevel.NORMAL, **params: Any) -> None:
    """
    Add a translatable message using a message code and parameters.

    This is a convenience method that combines get_message() and add_line() in one call,
    making it simpler to add messages with translation support.

    Args:
        code: The message code from REPORT_MESSAGES (e.g., "loading_features_from_file")
        level: Importance level of the message
        **params: Parameters to substitute into the message template

    Example:
        >>> report.add_message("features_found", count=5)
        >>> report.add_message("trial_number", ReportLevel.IMPORTANT, trial=3)
    """
    message = get_message(code, **params)
    self.add_line(message, level=level, message_code=code, message_params=params)

add_message_and_log(code, log_level, **params)

Add a translatable message using a message code and parameters, and log it.

This is a convenience method that combines get_message() and add_line_and_log() in one call, making it simpler to add messages with translation support that are also logged.

Parameters:

Name Type Description Default
code str

The message code from REPORT_MESSAGES (e.g., "trial_number")

required
log_level int

Logging level for the message

required
**params Any

Parameters to substitute into the message template

{}
Example

report.add_message_and_log("trial_number", logging.WARNING, trial=3) report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)

Source code in src/sortition_algorithms/utils.py
def add_message_and_log(self, code: str, log_level: int, **params: Any) -> None:
    """
    Add a translatable message using a message code and parameters, and log it.

    This is a convenience method that combines get_message() and add_line_and_log() in one call,
    making it simpler to add messages with translation support that are also logged.

    Args:
        code: The message code from REPORT_MESSAGES (e.g., "trial_number")
        log_level: Logging level for the message
        **params: Parameters to substitute into the message template

    Example:
        >>> report.add_message_and_log("trial_number", logging.WARNING, trial=3)
        >>> report.add_message_and_log("basic_solution_warning", logging.WARNING, algorithm="maximin", num_panels=150, num_agents=100, min_probs=0.001)
    """
    message = get_message(code, **params)
    self.add_line_and_log(message, log_level, message_code=code, message_params=params)

has_content()

    False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report.has_content():
        print(f"Run Report

{run_report.as_text()}") ```

Source code in src/sortition_algorithms/utils.py
def has_content(self) -> bool:
    """
    False is the report is empty, or True if there is some content. So you can do
    things like

    ```
    if run_report.has_content():
        print(f"Run Report\n\n{run_report.as_text()}")
    ```
    """
    return bool(self._data)

default_logging_setup()

Set both logger and user_logger to send output to stdout

Source code in src/sortition_algorithms/utils.py
def default_logging_setup() -> tuple[logging.Logger, logging.Logger]:
    """Set both logger and user_logger to send output to stdout"""
    # we have two loggers
    # - user_logger is used for messages that any user should see
    # - logger is used for messages that only a developer or admin should need to see
    user_logger = logging.getLogger("sortition_algorithms_user")
    user_logger.setLevel(logging.INFO)
    if not user_logger.handlers:
        # no set up has been done yet - so we do it here
        user_logger.addHandler(logging.StreamHandler(sys.stdout))
    logger = logging.getLogger("sortition_algorithms")
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        # no set up has been done yet - so we do it here
        # this logger just goes straight to stdout - no timestamps or anything
        logger.addHandler(logging.StreamHandler(sys.stdout))
    return user_logger, logger

get_cell_name(row, col_name, headers)

Given the column_name, get the spreadsheet cell name, eg "A5"

Source code in src/sortition_algorithms/utils.py
def get_cell_name(row: int, col_name: str, headers: Sequence[str]) -> str:
    """Given the column_name, get the spreadsheet cell name, eg "A5" """
    col_index = headers.index(col_name)
    if col_index > 25:
        col1 = ["", *string.ascii_uppercase][col_index // 26]
        col2 = string.ascii_uppercase[col_index % 26]
        col_name = f"{col1}{col2}"
    else:
        col_name = string.ascii_uppercase[col_index]
    return f"{col_name}{row}"

normalise_dict(original)

Wraps a dict, and whenever we get a value from it, we convert to str and strip() whitespace

Source code in src/sortition_algorithms/utils.py
def normalise_dict(original: Mapping[str, str] | Mapping[str, str | int]) -> MutableMapping[str, str]:
    """
    Wraps a dict, and whenever we get a value from it, we convert to str and
    strip() whitespace
    """
    new_dict: MutableMapping[str, str] = CaseInsensitiveDict()
    for key, original_value in original.items():
        new_dict[key] = strip_str_int(original_value)
    return new_dict

override_logging_handlers(user_logger_handlers, logger_handlers)

Replace the default handlers with other ones

Source code in src/sortition_algorithms/utils.py
def override_logging_handlers(
    user_logger_handlers: list[logging.Handler], logger_handlers: list[logging.Handler]
) -> None:
    """Replace the default handlers with other ones"""
    if user_logger_handlers:
        _override_handlers_for(logging.getLogger("sortition_algorithms_user"), user_logger_handlers)
    if logger_handlers:
        _override_handlers_for(logging.getLogger("sortition_algorithms"), logger_handlers)