Skip to content

Modules

Adapters for loading and saving data.

Initially we have CSV files locally, and Google Docs Spreadsheets.

GSheetAdapter

Source code in src/sortition_algorithms/adapters.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class GSheetAdapter:
    scope: ClassVar = [
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/drive",
    ]
    hl_light_blue: ClassVar = {
        "backgroundColor": {
            "red": 153 / 255,
            "green": 204 / 255,
            "blue": 255 / 255,
        }
    }
    hl_orange: ClassVar = {"backgroundColor": {"red": 5, "green": 2.5, "blue": 0}}

    def __init__(self, auth_json_path: Path, gen_rem_tab: str = "on") -> None:
        self.auth_json_path = auth_json_path
        self._client: gspread.client.Client | None = None
        self._spreadsheet: gspread.Spreadsheet | None = None
        self.original_selected_tab_name = "Original Selected - output - "
        self.selected_tab_name = "Selected"
        self.columns_selected_first = "C"
        self.column_selected_blank_num = 6
        self.remaining_tab_name = "Remaining - output - "
        self.new_tab_default_size_rows = 2
        self.new_tab_default_size_cols = 40
        self.g_sheet_name = ""
        self._messages: list[str] = []
        self.features_loaded = False
        self.people_loaded = False
        self.gen_rem_tab = gen_rem_tab  # Added for checkbox.

    def messages(self) -> list[str]:
        """Return accumulated messages and reset"""
        messages = self._messages
        self._messages = []
        return messages

    @property
    def client(self) -> gspread.client.Client:
        if self._client is None:
            creds = ServiceAccountCredentials.from_json_keyfile_name(
                str(self.auth_json_path),
                self.scope,
            )
            self._client = gspread.authorize(creds)
        return self._client

    @property
    def spreadsheet(self) -> gspread.Spreadsheet:
        if self._spreadsheet is None:
            self._spreadsheet = self.client.open(self.g_sheet_name)
            self._messages.append(f"Opened Google Sheet: '{self.g_sheet_name}'. ")
        return self._spreadsheet

    def _tab_exists(self, tab_name: str) -> bool:
        if self.spreadsheet is None:
            return False
        tab_list = self.spreadsheet.worksheets()
        return any(tab.title == tab_name for tab in tab_list)

    def _clear_or_create_tab(self, tab_name: str, other_tab_name: str, inc: int) -> gspread.Worksheet:
        # this now does not clear data but increments the sheet number...
        num = 0
        tab_ready: gspread.Worksheet | None = None
        tab_name_new = f"{tab_name}{num}"
        other_tab_name_new = f"{other_tab_name}{num}"
        while tab_ready is None:
            if self._tab_exists(tab_name_new) or self._tab_exists(other_tab_name_new):
                num += 1
                tab_name_new = f"{tab_name}{num}"
                other_tab_name_new = f"{other_tab_name}{num}"
            else:
                if inc == -1:
                    tab_name_new = f"{tab_name}{num - 1}"
                tab_ready = self.spreadsheet.add_worksheet(
                    title=tab_name_new,
                    rows=self.new_tab_default_size_rows,
                    cols=self.new_tab_default_size_cols,
                )
        return tab_ready

    def load_features(self, g_sheet_name: str, feature_tab_name: str) -> tuple[FeatureCollection | None, list[str]]:
        self.g_sheet_name = g_sheet_name
        features: FeatureCollection | None = None
        try:
            if not self._tab_exists(feature_tab_name):
                self._messages.append(f"Error in Google sheet: no tab called '{feature_tab_name}' found. ")
                return None, self.messages()
        except gspread.SpreadsheetNotFound:
            self._messages.append(f"Google spreadsheet not found: {self.g_sheet_name}. ")
            return None, self.messages()
        tab_features = self.spreadsheet.worksheet(feature_tab_name)
        feature_head = tab_features.row_values(1)
        feature_body = _stringify_records(tab_features.get_all_records(expected_headers=[]))
        features, msgs = read_in_features(feature_head, feature_body)
        self.features_loaded = True
        self._messages += msgs
        return features, self.messages()

    def load_people(
        self,
        respondents_tab_name: str,
        settings: Settings,
        features: FeatureCollection,
    ) -> tuple[People | None, list[str]]:
        self._messages = []
        people: People | None = None
        try:
            if not self._tab_exists(respondents_tab_name):
                self._messages.append(
                    f"Error in Google sheet: no tab called '{respondents_tab_name}' found. ",
                )
                return None, self.messages()
        except gspread.SpreadsheetNotFound:
            self._messages.append(f"Google spreadsheet not found: {self.g_sheet_name}. ")
            return None, self.messages()

        tab_people = self.spreadsheet.worksheet(respondents_tab_name)
        # if we don't read this in here we can't check if there are 2 columns with the same name
        people_head = tab_people.row_values(1)
        # the numericise_ignore doesn't convert the phone numbers to ints...
        # 1 Oct 2024: the final argument with expected_headers is to deal with the fact that
        # updated versions of gspread can't cope with duplicate headers
        people_body = _stringify_records(
            tab_people.get_all_records(
                numericise_ignore=["all"],
                expected_headers=[],
            )
        )
        self._messages.append(f"Reading in '{respondents_tab_name}' tab in above Google sheet.")
        people, msgs = read_in_people(people_head, people_body, features, settings)
        self._messages += msgs
        self.people_loaded = True
        return people, self.messages()

    def output_selected_remaining(
        self,
        people_selected_rows: list[list[str]],
        people_remaining_rows: list[list[str]],
        settings: Settings,
    ) -> list[int]:
        tab_original_selected = self._clear_or_create_tab(
            self.original_selected_tab_name,
            self.remaining_tab_name,
            0,
        )
        tab_original_selected.update(people_selected_rows)
        tab_original_selected.format("A1:U1", self.hl_light_blue)
        dupes: list[int] = []
        if self.gen_rem_tab == "on":
            tab_remaining = self._clear_or_create_tab(
                self.remaining_tab_name,
                self.original_selected_tab_name,
                -1,
            )
            tab_remaining.update(people_remaining_rows)
            tab_remaining.format("A1:U1", self.hl_light_blue)
            # highlight any people in remaining tab at the same address
            # TODO: do we ever actually hit this code? We should have deleted
            # all the people who might have been duplicates in selected_remaining_tables()
            if settings.check_same_address:
                address_cols: list[int] = [tab_remaining.find(csa).col for csa in settings.check_same_address_columns]  # type: ignore[union-attr]
                dupes_set: set[int] = set()
                n = len(people_remaining_rows)
                for i in range(n):
                    rowrem1 = people_remaining_rows[i]
                    for j in range(i + 1, n):
                        rowrem2 = people_remaining_rows[j]
                        if rowrem1 != rowrem2 and all(rowrem1[col] == rowrem2[col] for col in address_cols):
                            dupes_set.add(i + 1)
                            dupes_set.add(j + 1)
                dupes = sorted(dupes_set)
                for i in range(min(30, len(dupes))):
                    tab_remaining.format(str(dupes[i]), self.hl_orange)
        return dupes

    def output_multi_selections(
        self,
        multi_selections: list[list[str]],
    ) -> None:
        assert self.gen_rem_tab == "off"
        tab_original_selected = self._clear_or_create_tab(
            self.original_selected_tab_name,
            "ignoreme",
            0,
        )
        tab_original_selected.update(multi_selections)
        tab_original_selected.format("A1:U1", self.hl_light_blue)

messages()

Return accumulated messages and reset

Source code in src/sortition_algorithms/adapters.py
152
153
154
155
156
def messages(self) -> list[str]:
    """Return accumulated messages and reset"""
    messages = self._messages
    self._messages = []
    return messages

find_any_committee(features, people, number_people_wanted, settings)

Find any single feasible committee that satisfies the quotas.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
settings Settings

Settings object containing configuration

required

Returns:

Type Description
tuple[list[frozenset[str]], list[str]]

tuple of (list containing one committee as frozenset of person_ids, empty list of messages)

Raises:

Type Description
InfeasibleQuotasError

If quotas are infeasible

SelectionError

If solver fails for other reasons

Source code in src/sortition_algorithms/committee_generation.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def find_any_committee(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
) -> tuple[list[frozenset[str]], list[str]]:
    """Find any single feasible committee that satisfies the quotas.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        settings: Settings object containing configuration

    Returns:
        tuple of (list containing one committee as frozenset of person_ids, empty list of messages)

    Raises:
        InfeasibleQuotasError: If quotas are infeasible
        SelectionError: If solver fails for other reasons
    """
    model, agent_vars = _setup_committee_generation(features, people, number_people_wanted, settings)
    committee = _ilp_results_to_committee(agent_vars)
    return [committee], []

find_distribution_leximin(features, people, number_people_wanted, settings)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the third-lowest probability and so forth.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
settings Settings

Settings object containing configuration

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
list[str]
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], list[str]]
  • output_lines: list of debug strings

Raises:

Type Description
RuntimeError

If Gurobi is not available

Source code in src/sortition_algorithms/committee_generation.py
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
def find_distribution_leximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
) -> tuple[list[frozenset[str]], list[float], list[str]]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected
    (just like maximin), but breaks ties to maximize the second-lowest probability, breaks further ties to maximize the
    third-lowest probability and so forth.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        settings: Settings object containing configuration

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    Raises:
        RuntimeError: If Gurobi is not available
    """
    if not GUROBI_AVAILABLE:
        msg = "Leximin algorithm requires Gurobi solver which is not available"
        raise RuntimeError(msg)

    output_lines = [print_ret("Using leximin algorithm.")]
    grb.setParam("OutputFlag", 0)

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = _setup_committee_generation(features, people, number_people_wanted, settings)

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_output = _generate_initial_committees(
        new_committee_model, agent_vars, 3 * people.count
    )
    output_lines += initial_output

    # Run the main leximin optimization loop to fix agent probabilities
    fixed_probabilities = _run_leximin_main_loop(new_committee_model, agent_vars, committees, people, output_lines)

    # Convert fixed agent probabilities to committee probabilities
    probabilities_normalised = _solve_leximin_primal_for_final_probabilities(committees, fixed_probabilities)

    return list(committees), probabilities_normalised, output_lines

find_distribution_maximin(features, people, number_people_wanted, settings)

Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
settings Settings

Settings object containing configuration

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
list[str]
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], list[str]]
  • output_lines: list of debug strings
Source code in src/sortition_algorithms/committee_generation.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
def find_distribution_maximin(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
) -> tuple[list[frozenset[str]], list[float], list[str]]:
    """Find a distribution over feasible committees that maximizes the minimum probability of an agent being selected.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        settings: Settings object containing configuration

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings
    """
    output_lines = [print_ret("Using maximin algorithm.")]

    # Set up an ILP that can be used for discovering new feasible committees
    new_committee_model, agent_vars = _setup_committee_generation(features, people, number_people_wanted, settings)

    # Find initial committees that cover every possible agent
    committees, covered_agents, initial_output = _generate_initial_committees(
        new_committee_model, agent_vars, people.count
    )
    output_lines += initial_output

    # Set up the incremental LP model for column generation
    incremental_model, incr_agent_vars, upper_bound_var = _setup_maximin_incremental_model(committees, covered_agents)

    # Run the main optimization loop
    return _run_maximin_optimization_loop(
        new_committee_model,
        agent_vars,
        incremental_model,
        incr_agent_vars,
        upper_bound_var,
        committees,
        covered_agents,
        output_lines,
    )

find_distribution_nash(features, people, number_people_wanted, settings)

Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of selection probabilities over all persons.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
settings Settings

Settings object containing configuration

required

Returns:

Type Description
list[frozenset[str]]

tuple of (committees, probabilities, output_lines)

list[float]
  • committees: list of feasible committees (frozenset of agent IDs)
list[str]
  • probabilities: list of probabilities for each committee
tuple[list[frozenset[str]], list[float], list[str]]
  • output_lines: list of debug strings

The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.

Source code in src/sortition_algorithms/committee_generation.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
def find_distribution_nash(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
) -> tuple[list[frozenset[str]], list[float], list[str]]:
    """Find a distribution over feasible committees that maximizes the Nash welfare, i.e., the product of
    selection probabilities over all persons.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        settings: Settings object containing configuration

    Returns:
        tuple of (committees, probabilities, output_lines)
        - committees: list of feasible committees (frozenset of agent IDs)
        - probabilities: list of probabilities for each committee
        - output_lines: list of debug strings

    The algorithm maximizes the product of selection probabilities Πᵢ pᵢ by equivalently maximizing
    log(Πᵢ pᵢ) = Σᵢ log(pᵢ). If some person i is not included in any feasible committee, their pᵢ is 0, and
    this sum is -∞. We maximize Σᵢ log(pᵢ) where i is restricted to range over persons that can possibly be included.
    """
    output_lines = [print_ret("Using Nash algorithm.")]

    # Set up an ILP used for discovering new feasible committees
    new_committee_model, agent_vars = _setup_committee_generation(features, people, number_people_wanted, settings)

    # Find initial committees that include every possible agent
    committee_set, covered_agents, initial_output = _generate_initial_committees(
        new_committee_model, agent_vars, 2 * people.count
    )
    committees = list(committee_set)
    output_lines += initial_output

    # Map the covered agents to indices in a list for easier matrix representation
    entitlements, contributes_to_entitlement = _define_entitlements(covered_agents)

    # Run the main Nash welfare optimization loop
    return _run_nash_optimization_loop(
        new_committee_model,
        agent_vars,
        committees,
        entitlements,
        contributes_to_entitlement,
        covered_agents,
        number_people_wanted,
        output_lines,
    )

standardize_distribution(committees, probabilities)

Remove committees with zero probability and renormalize.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities

required

Returns:

Type Description
tuple[list[frozenset[str]], list[float]]

tuple of (filtered_committees, normalized_probabilities)

Source code in src/sortition_algorithms/committee_generation.py
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
def standardize_distribution(
    committees: list[frozenset[str]],
    probabilities: list[float],
) -> tuple[list[frozenset[str]], list[float]]:
    """Remove committees with zero probability and renormalize.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities

    Returns:
        tuple of (filtered_committees, normalized_probabilities)
    """
    assert len(committees) == len(probabilities)
    new_committees = []
    new_probabilities = []
    for committee, prob in zip(committees, probabilities, strict=False):
        if prob >= EPS2:
            new_committees.append(committee)
            new_probabilities.append(prob)
    prob_sum = sum(new_probabilities)
    new_probabilities = [prob / prob_sum for prob in new_probabilities]
    return new_committees, new_probabilities

find_random_sample(features, people, number_people_wanted, settings, selection_algorithm='maximin', test_selection=False, number_selections=1)

Main algorithm to find one or multiple random committees.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas

required
people People

People object with pool members

required
number_people_wanted int

desired size of the panel

required
settings Settings

Settings object containing configuration

required
selection_algorithm str

one of "legacy", "maximin", "leximin", or "nash"

'maximin'
test_selection bool

if set, do not do a random selection, but just return some valid panel. Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!

False
number_selections int

how many panels to return. Most of the time, this should be set to 1, which means that a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n, containing multiple panels (some panels might be repeated in the list). In this case the eventual panel should be drawn uniformly at random from the returned list.

1

Returns:

Type Description
list[frozenset[str]]

tuple of (committee_lottery, output_lines)

list[str]
  • committee_lottery: list of committees, where each committee is a frozen set of pool member ids
tuple[list[frozenset[str]], list[str]]
  • output_lines: list of debug strings

Raises:

Type Description
InfeasibleQuotasError

if the quotas cannot be satisfied, which includes a suggestion for how to modify them

SelectionError

in multiple other failure cases

ValueError

for invalid parameters

RuntimeError

if required solver is not available

Source code in src/sortition_algorithms/core.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def find_random_sample(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
    selection_algorithm: str = "maximin",
    test_selection: bool = False,
    number_selections: int = 1,
) -> tuple[list[frozenset[str]], list[str]]:
    """Main algorithm to find one or multiple random committees.

    Args:
        features: FeatureCollection with min/max quotas
        people: People object with pool members
        number_people_wanted: desired size of the panel
        settings: Settings object containing configuration
        selection_algorithm: one of "legacy", "maximin", "leximin", or "nash"
        test_selection: if set, do not do a random selection, but just return some valid panel.
            Useful for quickly testing whether quotas are satisfiable, but should always be false for actual selection!
        number_selections: how many panels to return. Most of the time, this should be set to 1, which means that
            a single panel is chosen. When specifying a value n ≥ 2, the function will return a list of length n,
            containing multiple panels (some panels might be repeated in the list). In this case the eventual panel
            should be drawn uniformly at random from the returned list.

    Returns:
        tuple of (committee_lottery, output_lines)
        - committee_lottery: list of committees, where each committee is a frozen set of pool member ids
        - output_lines: list of debug strings

    Raises:
        InfeasibleQuotasError: if the quotas cannot be satisfied, which includes a suggestion for how to modify them
        SelectionError: in multiple other failure cases
        ValueError: for invalid parameters
        RuntimeError: if required solver is not available
    """
    # Input validation
    if test_selection and number_selections != 1:
        msg = (
            "Running the test selection does not support generating a transparent lottery, so, if "
            "`test_selection` is true, `number_selections` must be 1."
        )
        raise ValueError(msg)

    if selection_algorithm == "legacy" and number_selections != 1:
        msg = (
            "Currently, the legacy algorithm does not support generating a transparent lottery, "
            "so `number_selections` must be set to 1."
        )
        raise ValueError(msg)

    # Quick test selection using find_any_committee
    if test_selection:
        print("Running test selection.")
        return find_any_committee(features, people, number_people_wanted, settings)

    output_lines = []

    # Check if Gurobi is available for leximin
    if selection_algorithm == "leximin" and not GUROBI_AVAILABLE:
        output_lines.append(
            print_ret(
                "The leximin algorithm requires the optimization library Gurobi to be installed "
                "(commercial, free academic licenses available). Switching to the simpler "
                "maximin algorithm, which can be run using open source solvers."
            )
        )
        selection_algorithm = "maximin"

    # Route to appropriate algorithm
    if selection_algorithm == "legacy":
        # Import here to avoid circular imports
        from sortition_algorithms.find_sample import find_random_sample_legacy

        return find_random_sample_legacy(
            people,
            features,
            number_people_wanted,
            settings.check_same_address,
            settings.check_same_address_columns,
        )
    elif selection_algorithm == "leximin":
        committees, probabilities, new_output_lines = find_distribution_leximin(
            features, people, number_people_wanted, settings
        )
    elif selection_algorithm == "maximin":
        committees, probabilities, new_output_lines = find_distribution_maximin(
            features, people, number_people_wanted, settings
        )
    elif selection_algorithm == "nash":
        committees, probabilities, new_output_lines = find_distribution_nash(
            features, people, number_people_wanted, settings
        )
    else:
        msg = (
            f"Unknown selection algorithm {selection_algorithm!r}, must be either 'legacy', 'leximin', "
            f"'maximin', or 'nash'."
        )
        raise ValueError(msg)

    # Post-process the distribution
    committees, probabilities = standardize_distribution(committees, probabilities)
    if len(committees) > people.count:
        print(
            "INFO: The distribution over panels is what is known as a 'basic solution'. There is no reason for concern "
            "about the correctness of your output, but we'd appreciate if you could reach out to panelot"
            f"@paulgoelz.de with the following information: algorithm={selection_algorithm}, "
            f"num_panels={len(committees)}, num_agents={people.count}, min_probs={min(probabilities)}."
        )

    assert len(set(committees)) == len(committees)

    output_lines += new_output_lines
    output_lines += _distribution_stats(people, committees, probabilities)

    # Convert to lottery
    committee_lottery = lottery_rounding(committees, probabilities, number_selections)

    return committee_lottery, output_lines

lottery_rounding(committees, probabilities, number_selections)

Convert probability distribution over committees to a discrete lottery.

Parameters:

Name Type Description Default
committees list[frozenset[str]]

list of committees

required
probabilities list[float]

corresponding probabilities (must sum to 1)

required
number_selections int

number of committees to return

required

Returns:

Type Description
list[frozenset[str]]

list of committees (may contain duplicates) of length number_selections

Source code in src/sortition_algorithms/core.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def lottery_rounding(
    committees: list[frozenset[str]],
    probabilities: list[float],
    number_selections: int,
) -> list[frozenset[str]]:
    """Convert probability distribution over committees to a discrete lottery.

    Args:
        committees: list of committees
        probabilities: corresponding probabilities (must sum to 1)
        number_selections: number of committees to return

    Returns:
        list of committees (may contain duplicates) of length number_selections
    """
    assert len(committees) == len(probabilities)
    assert number_selections >= 1

    num_copies: list[int] = []
    residuals: list[float] = []
    for _, prob in zip(committees, probabilities, strict=False):
        scaled_prob = prob * number_selections
        num_copies.append(int(scaled_prob))  # give lower quotas
        residuals.append(scaled_prob - int(scaled_prob))

    rounded_up_indices = pipage_rounding(list(enumerate(residuals)))
    for committee_index in rounded_up_indices:
        num_copies[committee_index] += 1

    committee_lottery: list[frozenset[str]] = []
    for committee, committee_copies in zip(committees, num_copies, strict=False):
        committee_lottery += [committee for _ in range(committee_copies)]

    return committee_lottery

pipage_rounding(marginals)

Pipage rounding algorithm for converting fractional solutions to integer solutions.

Takes a list of (object, probability) pairs and randomly rounds them to a set of objects such that the expected number of times each object appears equals its probability.

Parameters:

Name Type Description Default
marginals list[tuple[int, float]]

list of (object, probability) pairs where probabilities sum to an integer

required

Returns:

Type Description
list[int]

list of objects that were selected

Source code in src/sortition_algorithms/core.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def pipage_rounding(marginals: list[tuple[int, float]]) -> list[int]:
    """Pipage rounding algorithm for converting fractional solutions to integer solutions.

    Takes a list of (object, probability) pairs and randomly rounds them to a set of objects
    such that the expected number of times each object appears equals its probability.

    Args:
        marginals: list of (object, probability) pairs where probabilities sum to an integer

    Returns:
        list of objects that were selected
    """
    assert all(0.0 <= p <= 1.0 for _, p in marginals)

    outcomes: list[int] = []
    while True:
        if len(marginals) == 0:
            return outcomes
        if len(marginals) == 1:
            obj, prob = marginals[0]
            if random_provider().uniform(0.0, 1.0) < prob:
                outcomes.append(obj)
            marginals = []
        else:
            obj0, prob0 = marginals[0]
            if prob0 > 1.0 - EPS2:
                outcomes.append(obj0)
                marginals = marginals[1:]
                continue
            if prob0 < EPS2:
                marginals = marginals[1:]
                continue

            obj1, prob1 = marginals[1]
            if prob1 > 1.0 - EPS2:
                outcomes.append(obj1)
                marginals = [marginals[0]] + marginals[2:]
                continue
            if prob1 < EPS2:
                marginals = [marginals[0]] + marginals[2:]
                continue

            inc0_dec1_amount = min(
                1.0 - prob0, prob1
            )  # maximal amount that prob0 can be increased and prob1 can be decreased
            dec0_inc1_amount = min(prob0, 1.0 - prob1)
            choice_probability = dec0_inc1_amount / (inc0_dec1_amount + dec0_inc1_amount)

            if random_provider().uniform(0.0, 1.0) < choice_probability:  # increase prob0 and decrease prob1
                prob0 += inc0_dec1_amount
                prob1 -= inc0_dec1_amount
            else:
                prob0 -= dec0_inc1_amount
                prob1 += dec0_inc1_amount
            marginals = [(obj0, prob0), (obj1, prob1)] + marginals[2:]

run_stratification(features, people, number_people_wanted, settings, test_selection=False, number_selections=1)

Run stratified random selection with retry logic.

Parameters:

Name Type Description Default
features FeatureCollection

FeatureCollection with min/max quotas for each feature value

required
people People

People object containing the pool of candidates

required
number_people_wanted int

Desired size of the panel

required
settings Settings

Settings object containing configuration

required
test_selection bool

If True, don't randomize (for testing only)

False
number_selections int

Number of panels to return

1

Returns:

Type Description
bool

Tuple of (success, selected_committees, output_lines)

list[frozenset[str]]
  • success: Whether selection succeeded within max attempts
list[str]
  • selected_committees: List of committees (frozensets of person IDs)
tuple[bool, list[frozenset[str]], list[str]]
  • output_lines: Debug and status messages

Raises:

Type Description
Exception

If number_people_wanted is outside valid range for any feature

ValueError

For invalid parameters

RuntimeError

If required solver is not available

InfeasibleQuotasError

If quotas cannot be satisfied

Source code in src/sortition_algorithms/core.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def run_stratification(
    features: FeatureCollection,
    people: People,
    number_people_wanted: int,
    settings: Settings,
    test_selection: bool = False,
    number_selections: int = 1,
) -> tuple[bool, list[frozenset[str]], list[str]]:
    """Run stratified random selection with retry logic.

    Args:
        features: FeatureCollection with min/max quotas for each feature value
        people: People object containing the pool of candidates
        number_people_wanted: Desired size of the panel
        settings: Settings object containing configuration
        test_selection: If True, don't randomize (for testing only)
        number_selections: Number of panels to return

    Returns:
        Tuple of (success, selected_committees, output_lines)
        - success: Whether selection succeeded within max attempts
        - selected_committees: List of committees (frozensets of person IDs)
        - output_lines: Debug and status messages

    Raises:
        Exception: If number_people_wanted is outside valid range for any feature
        ValueError: For invalid parameters
        RuntimeError: If required solver is not available
        InfeasibleQuotasError: If quotas cannot be satisfied
    """
    # Check if desired number is within feature constraints
    features.check_desired(number_people_wanted)

    # Set random seed if specified
    # If the seed is zero or None, we use the secrets module, as it is better
    # from a security point of view
    set_random_provider(settings.random_number_seed)

    success = False
    output_lines = []

    if test_selection:
        output_lines.append(
            "<b style='color: red'>WARNING: Panel is not selected at random! Only use for testing!</b><br>",
        )

    output_lines.append("<b>Initial: (selected = 0)</b>")
    output_lines += _initial_print_category_info(
        features,
        people,
    )
    people_selected: list[frozenset[str]] = []

    tries = 0
    for tries in range(settings.max_attempts):
        people_selected = []

        output_lines.append(f"<b>Trial number: {tries}</b>")

        try:
            people_selected, new_output_lines = find_random_sample(
                features,
                people,
                number_people_wanted,
                settings,
                settings.selection_algorithm,
                test_selection,
                number_selections,
            )
            output_lines += new_output_lines

            # Check if targets were met (only works for number_selections = 1)
            new_output_lines = _print_category_info(
                features,
                people,
                people_selected,
                number_people_wanted,
            )
            success, check_output_lines = _check_category_selected(
                features,
                people,
                people_selected,
                number_selections,
            )

            if success:
                output_lines.append("<b>SUCCESS!!</b> Final:")
                output_lines += new_output_lines + check_output_lines
                break

        except (ValueError, RuntimeError) as err:
            output_lines.append(str(err))
            break
        except errors.InfeasibleQuotasError as err:
            output_lines += err.output
            break
        except errors.SelectionError as serr:
            output_lines.append(f"Failed: Selection Error thrown: {serr}")

    if not success:
        output_lines.append(f"Failed {tries} times... gave up.")

    return success, people_selected, output_lines

selected_remaining_tables(full_people, people_selected, features, settings)

write some text

people_selected is a single frozenset[str] - it must be unwrapped before being passed to this function.

Source code in src/sortition_algorithms/core.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def selected_remaining_tables(
    full_people: People,
    people_selected: frozenset[str],
    features: FeatureCollection,
    settings: Settings,
) -> tuple[list[list[str]], list[list[str]], list[str]]:
    """
    write some text

    people_selected is a single frozenset[str] - it must be unwrapped before being passed
    to this function.
    """
    people_working = deepcopy(full_people)
    output_lines: list[str] = []

    people_selected_rows = person_list_to_table(people_selected, people_working, features, settings)

    # now delete the selected people (and maybe also those at the same address)
    num_same_address_deleted = 0
    for pkey in people_selected:
        # if check address then delete all those at this address (will NOT delete the one we want as well)
        if settings.check_same_address:
            pkey_to_delete = list(people_working.matching_address(pkey, settings.check_same_address_columns))
            num_same_address_deleted += len(pkey_to_delete) + 1
            # then delete this/these people at the same address from the reserve/remaining pool
            people_working.remove_many([pkey, *pkey_to_delete])
        else:
            people_working.remove(pkey)

    # add the columns to keep into remaining people
    # as above all these values are all in people_working but this is tidier...
    people_remaining_rows = person_list_to_table(people_working, people_working, features, settings)
    return people_selected_rows, people_remaining_rows, output_lines

    # TODO: put this code somewhere more suitable
    # maybe in strat app only?
    """
    dupes = self._output_selected_remaining(
        settings,
        people_selected_rows,
        people_remaining_rows,
    )
    if settings.check_same_address and self.gen_rem_tab == "on":
        output_lines.append(
            f"Deleted {num_same_address_deleted} people from remaining file who had the same "
            f"address as selected people.",
        )
        m = min(30, len(dupes))
        output_lines.append(
            f"In the remaining tab there are {len(dupes)} people who share the same address as "
            f"someone else in the tab. We highlighted the first {m} of these. "
            f"The full list of lines is {dupes}",
        )
    """

FeatureCollection

A full set of features for a stratification.

The keys here are the names of the features. They could be: gender, age_bracket, education_level etc

The values are FeatureValues objects - the breakdown of the values for a feature.

Source code in src/sortition_algorithms/features.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class FeatureCollection:
    """
    A full set of features for a stratification.

    The keys here are the names of the features. They could be: gender, age_bracket, education_level etc

    The values are FeatureValues objects - the breakdown of the values for a feature.
    """

    # TODO: consider splitting the updates/remaining into a parallel set of classes
    # then this can just have targets, and the running totals can be in classes we can
    # regenerate now and then

    def __init__(self) -> None:
        self.collection: dict[str, FeatureValues] = defaultdict(FeatureValues)

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self.collection == other.collection

    def add_feature(self, feature_name: str, value_name: str, fv_counts: FeatureValueCounts) -> None:
        self.collection[feature_name].add_value_counts(value_name, fv_counts)

    @property
    def feature_names(self) -> list[str]:
        return list(self.collection.keys())

    def feature_values(self) -> Iterator[tuple[str, list[str]]]:
        for feature_name, feature_value in self.collection.items():
            yield feature_name, feature_value.values

    def feature_values_counts(self) -> Iterator[tuple[str, str, FeatureValueCounts]]:
        for feature_name, feature_values in self.collection.items():
            for value, value_counts in feature_values.values_counts():
                yield feature_name, value, value_counts

    def _safe_max_flex_val(self) -> int:
        if not self.collection:
            return 0
        # to avoid errors, if max_flex is not set we must set it at least as high as the highest
        return max(v.maximum_selection() for v in self.collection.values())

    def set_default_max_flex(self) -> None:
        """Note this only sets it if left at the default value"""
        max_flex = self._safe_max_flex_val()
        for feature_values in self.collection.values():
            feature_values.set_default_max_flex(max_flex)

    def add_remaining(self, feature: str, value_name: str) -> None:
        self.collection[feature].add_remaining(value_name)

    def add_selected(self, feature: str, value_name: str) -> None:
        self.collection[feature].add_selected(value_name)

    def remove_remaining(self, feature: str, value_name: str) -> None:
        try:
            self.collection[feature].remove_remaining(value_name)
        except errors.SelectionError as e:
            msg = f"Failed removing from {feature}/{value_name}: {e}"
            raise errors.SelectionError(msg) from None

    def minimum_selection(self) -> int:
        """
        The minimum selection for this set of features is the largest minimum selection
        of any individual feature.
        """
        if not self.collection:
            return 0
        return max(v.minimum_selection() for v in self.collection.values())

    def maximum_selection(self) -> int:
        """
        The maximum selection for this set of features is the smallest maximum selection
        of any individual feature.
        """
        if not self.collection:
            return 0
        return min(v.maximum_selection() for v in self.collection.values())

    def check_min_max(self) -> None:
        """
        If the min is bigger than the max we're in trouble i.e. there's an input error
        """
        if self.minimum_selection() > self.maximum_selection():
            msg = (
                "Inconsistent numbers in min and max in the features input: the sum "
                "of the minimum values of a features is larger than the sum of the "
                "maximum values of a(nother) feature. "
            )
            raise ValueError(msg)

    def check_desired(self, desired_number: int) -> None:
        """
        Check if the desired number of people is within the min/max of every feature.
        """
        for feature_name, feature_values in self.collection.items():
            if (
                desired_number < feature_values.minimum_selection()
                or desired_number > feature_values.maximum_selection()
            ):
                msg = (
                    f"The number of people to select ({desired_number}) is out of the range of "
                    f"the numbers of people in the {feature_name} feature. It should be within "
                    f"[{feature_values.minimum_selection()}, {feature_values.maximum_selection()}]."
                )
                raise Exception(msg)

check_desired(desired_number)

Check if the desired number of people is within the min/max of every feature.

Source code in src/sortition_algorithms/features.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def check_desired(self, desired_number: int) -> None:
    """
    Check if the desired number of people is within the min/max of every feature.
    """
    for feature_name, feature_values in self.collection.items():
        if (
            desired_number < feature_values.minimum_selection()
            or desired_number > feature_values.maximum_selection()
        ):
            msg = (
                f"The number of people to select ({desired_number}) is out of the range of "
                f"the numbers of people in the {feature_name} feature. It should be within "
                f"[{feature_values.minimum_selection()}, {feature_values.maximum_selection()}]."
            )
            raise Exception(msg)

check_min_max()

If the min is bigger than the max we're in trouble i.e. there's an input error

Source code in src/sortition_algorithms/features.py
205
206
207
208
209
210
211
212
213
214
215
def check_min_max(self) -> None:
    """
    If the min is bigger than the max we're in trouble i.e. there's an input error
    """
    if self.minimum_selection() > self.maximum_selection():
        msg = (
            "Inconsistent numbers in min and max in the features input: the sum "
            "of the minimum values of a features is larger than the sum of the "
            "maximum values of a(nother) feature. "
        )
        raise ValueError(msg)

maximum_selection()

The maximum selection for this set of features is the smallest maximum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
196
197
198
199
200
201
202
203
def maximum_selection(self) -> int:
    """
    The maximum selection for this set of features is the smallest maximum selection
    of any individual feature.
    """
    if not self.collection:
        return 0
    return min(v.maximum_selection() for v in self.collection.values())

minimum_selection()

The minimum selection for this set of features is the largest minimum selection of any individual feature.

Source code in src/sortition_algorithms/features.py
187
188
189
190
191
192
193
194
def minimum_selection(self) -> int:
    """
    The minimum selection for this set of features is the largest minimum selection
    of any individual feature.
    """
    if not self.collection:
        return 0
    return max(v.minimum_selection() for v in self.collection.values())

set_default_max_flex()

Note this only sets it if left at the default value

Source code in src/sortition_algorithms/features.py
168
169
170
171
172
def set_default_max_flex(self) -> None:
    """Note this only sets it if left at the default value"""
    max_flex = self._safe_max_flex_val()
    for feature_values in self.collection.values():
        feature_values.set_default_max_flex(max_flex)

FeatureValues

A full set of values for a single feature.

If the feature is gender, the values could be: male, female, non_binary_other

The values are FeatureValueCounts objects - the min, max and current counts of the selected people in that feature value.

Source code in src/sortition_algorithms/features.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class FeatureValues:
    """
    A full set of values for a single feature.

    If the feature is gender, the values could be: male, female, non_binary_other

    The values are FeatureValueCounts objects - the min, max and current counts of the
    selected people in that feature value.
    """

    def __init__(self) -> None:
        self.feature_values: dict[str, FeatureValueCounts] = {}

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self.feature_values == other.feature_values

    def add_value_counts(self, value_name: str, fv_counts: FeatureValueCounts) -> None:
        self.feature_values[value_name] = fv_counts

    def set_default_max_flex(self, max_flex: int) -> None:
        """Note this only sets it if left at the default value"""
        for fv_counts in self.feature_values.values():
            fv_counts.set_default_max_flex(max_flex)

    @property
    def values(self) -> list[str]:
        return list(self.feature_values.keys())

    def values_counts(self) -> Iterator[tuple[str, FeatureValueCounts]]:
        yield from self.feature_values.items()

    def add_remaining(self, value_name: str) -> None:
        self.feature_values[value_name].add_remaining()

    def add_selected(self, value_name: str) -> None:
        self.feature_values[value_name].add_selected()

    def remove_remaining(self, value_name: str) -> None:
        self.feature_values[value_name].remove_remaining()

    def minimum_selection(self) -> int:
        """
        For this feature, we have to select at least the sum of the minimum of each value
        """
        return sum(c.min for c in self.feature_values.values())

    def maximum_selection(self) -> int:
        """
        For this feature, we have to select at most the sum of the maximum of each value
        """
        return sum(c.max for c in self.feature_values.values())

maximum_selection()

For this feature, we have to select at most the sum of the maximum of each value

Source code in src/sortition_algorithms/features.py
118
119
120
121
122
def maximum_selection(self) -> int:
    """
    For this feature, we have to select at most the sum of the maximum of each value
    """
    return sum(c.max for c in self.feature_values.values())

minimum_selection()

For this feature, we have to select at least the sum of the minimum of each value

Source code in src/sortition_algorithms/features.py
112
113
114
115
116
def minimum_selection(self) -> int:
    """
    For this feature, we have to select at least the sum of the minimum of each value
    """
    return sum(c.min for c in self.feature_values.values())

set_default_max_flex(max_flex)

Note this only sets it if left at the default value

Source code in src/sortition_algorithms/features.py
91
92
93
94
def set_default_max_flex(self, max_flex: int) -> None:
    """Note this only sets it if left at the default value"""
    for fv_counts in self.feature_values.values():
        fv_counts.set_default_max_flex(max_flex)

read_in_features(features_head, features_body)

Read in stratified selection features and values

Note we do want features_head to ensure we don't have multiple columns with the same name

Source code in src/sortition_algorithms/features.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def read_in_features(
    features_head: Iterable[str], features_body: Iterable[dict[str, str]]
) -> tuple[FeatureCollection, list[str]]:
    """
    Read in stratified selection features and values

    Note we do want features_head to ensure we don't have multiple columns with the same name
    """
    features = FeatureCollection()
    msg: list[str] = []
    features_flex, filtered_headers = _feature_headers_flex(list(features_head))
    for row in features_body:
        # check the set of keys in the row are the same as the headers
        assert set(filtered_headers) <= set(row.keys())
        stripped_row = utils.StrippedDict(_normalise_col_names(row))
        if not stripped_row["feature"]:
            continue
        features.add_feature(*_clean_row(stripped_row, features_flex))

    msg.append(f"Number of features: {len(features.feature_names)}")
    features.check_min_max()
    # check feature_flex to see if we need to set the max here
    # this only changes the max_flex value if these (optional) flex values are NOT set already
    features.set_default_max_flex()
    return features, msg

Selection algorithms for stratified sampling.

find_random_sample_legacy(people, features, number_people_wanted, check_same_address=False, check_same_address_columns=None)

Legacy stratified random selection algorithm.

Implements the original algorithm that uses greedy selection based on priority ratios. Always selects from the most urgently needed category first (highest ratio of (min-selected)/remaining), then randomly picks within that category.

Parameters:

Name Type Description Default
people People

People collection

required
features FeatureCollection

Feature definitions with min/max targets

required
number_people_wanted int

Number of people to select

required
check_same_address bool

Whether to remove household members when selecting someone

False
check_same_address_columns list[str] | None

Address columns for household identification

None

Returns:

Type Description
list[frozenset[str]]

Tuple of (selected_committees, output_messages) where:

list[str]
  • selected_committees: List containing one frozenset of selected person IDs
tuple[list[frozenset[str]], list[str]]
  • output_messages: List of log messages about the selection process

Raises:

Type Description
SelectionError

If selection becomes impossible (not enough people, etc.)

Source code in src/sortition_algorithms/find_sample.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def find_random_sample_legacy(
    people: People,
    features: FeatureCollection,
    number_people_wanted: int,
    check_same_address: bool = False,
    check_same_address_columns: list[str] | None = None,
) -> tuple[list[frozenset[str]], list[str]]:
    """
    Legacy stratified random selection algorithm.

    Implements the original algorithm that uses greedy selection based on priority ratios.
    Always selects from the most urgently needed category first (highest ratio of
    (min-selected)/remaining), then randomly picks within that category.

    Args:
        people: People collection
        features: Feature definitions with min/max targets
        number_people_wanted: Number of people to select
        check_same_address: Whether to remove household members when selecting someone
        check_same_address_columns: Address columns for household identification

    Returns:
        Tuple of (selected_committees, output_messages) where:
        - selected_committees: List containing one frozenset of selected person IDs
        - output_messages: List of log messages about the selection process

    Raises:
        SelectionError: If selection becomes impossible (not enough people, etc.)
    """
    output_lines = ["Using legacy algorithm."]
    people_selected: set[str] = set()

    # Create PeopleFeatures and initialize
    people_features = PeopleFeatures(people, features, check_same_address, check_same_address_columns or [])
    people_features.update_all_features_remaining()
    people_features.prune_for_feature_max_0()

    # Main selection loop
    for count in range(number_people_wanted):
        # Find the category with highest priority ratio
        try:
            ratio_result = people_features.find_max_ratio_category()
        except errors.SelectionError as e:
            msg = f"Selection failed on iteration {count + 1}: {e}"
            raise errors.SelectionError(msg) from e

        # Find the randomly selected person within that category
        target_feature = ratio_result.feature_name
        target_value = ratio_result.feature_value
        random_position = ratio_result.random_person_index

        selected_person_key = people_features.people.find_person_by_position_in_category(
            target_feature, target_value, random_position
        )

        # Should never select the same person twice
        assert selected_person_key not in people_selected, f"Person {selected_person_key} was already selected"

        # Select the person (this also removes household members if configured)
        people_selected.add(selected_person_key)
        selected_person_data = people_features.people.get_person_dict(selected_person_key)
        household_members_removed = people_features.select_person(selected_person_key)

        # Add output messages about household member removal
        if household_members_removed:
            output_lines.append(
                f"Selected {selected_person_key}, also removed household members: "
                f"{', '.join(household_members_removed)}"
            )

        # Handle any categories that are now full after this selection
        try:
            category_messages = people_features.handle_category_full_deletions(selected_person_data)
            output_lines.extend(category_messages)
        except errors.SelectionError as e:
            msg = f"Selection failed after selecting {selected_person_key}: {e}"
            raise errors.SelectionError(msg) from e

        # Check if we're about to run out of people (but not on the last iteration)
        if count < (number_people_wanted - 1) and people_features.people.count == 0:
            msg = "Selection failed: Ran out of people before completing selection"
            raise errors.SelectionError(msg)

    # Return in legacy format: list containing single frozenset
    return [frozenset(people_selected)], output_lines

MaxRatioResult

Result from finding the category with maximum selection ratio.

Source code in src/sortition_algorithms/people_features.py
16
17
18
19
20
21
22
@define(kw_only=True, slots=True)
class MaxRatioResult:
    """Result from finding the category with maximum selection ratio."""

    feature_name: str
    feature_value: str
    random_person_index: int

PeopleFeatures

This class manipulates people and features together, making a deepcopy on init.

Source code in src/sortition_algorithms/people_features.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class PeopleFeatures:
    """
    This class manipulates people and features together, making a deepcopy on init.
    """

    # TODO: consider naming: maybe SelectionState

    def __init__(
        self,
        people: People,
        features: FeatureCollection,
        check_same_address: bool = False,
        check_same_address_columns: list[str] | None = None,
    ) -> None:
        self.people = deepcopy(people)
        self.features = deepcopy(features)
        self.check_same_address = check_same_address
        self.check_same_address_columns = check_same_address_columns or []

    def update_features_remaining(self, person_key: str) -> None:
        # this will blow up if the person does not exist
        person = self.people.get_person_dict(person_key)
        for feature_name in self.features.feature_names:
            self.features.add_remaining(feature_name, person[feature_name])

    def update_all_features_remaining(self) -> None:
        for person_key in self.people:
            self.update_features_remaining(person_key)

    def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
        """
        When a feature/value is "full" we delete everyone else in it.
        "Full" means that the number selected equals the "max" amount - that
        is detected elsewhere and then this method is called.
        Returns count of those deleted, and count of those left
        """
        # when a category is full we want to delete everyone in it
        people_to_delete: list[str] = []
        for pkey, person in self.people.items():
            if person[feature_name] == feature_value:
                people_to_delete.append(pkey)
                for feature in self.features.feature_names:
                    try:
                        self.features.remove_remaining(feature, person[feature])
                    except errors.SelectionError as e:
                        msg = (
                            f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                            f"as after previous deletion no one/not enough left in {feature} "
                            f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                        )
                        raise errors.SelectionError(msg) from e

        self.people.remove_many(people_to_delete)
        # return the number of people deleted and the number of people left
        return len(people_to_delete), self.people.count

    def prune_for_feature_max_0(self) -> list[str]:
        """
        Check if any feature_value.max is set to zero. if so delete everyone with that feature value
        NOT DONE: could then check if anyone is left.
        """
        # TODO: when do we want to do this?
        msg: list[str] = []
        msg.append(f"Number of people: {self.people.count}.")
        total_num_deleted = 0
        for (
            feature_name,
            feature_value,
            fv_counts,
        ) in self.features.feature_values_counts():
            if fv_counts.max == 0:  # we don't want any of these people
                # pass the message in as deleting them might throw an exception
                msg.append(f"Feature/value {feature_name}/{feature_value} full - deleting people...")
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, feature_value)
                # if no exception was thrown above add this bit to the end of the previous message
                msg[-1] += f" Deleted {num_deleted}, {num_left} left."
                total_num_deleted += num_deleted
        # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
        # the 'remaining' file will be useless - remind the user of this!
        if total_num_deleted >= self.people.count / 2:
            msg.append(
                ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
                "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
            )
        return msg

    def select_person(self, person_key: str) -> list[str]:
        """
        Selecting a person means:
        - remove the person from our copy of People
        - update the `selected` and `remaining` counts of the FeatureCollection
        - if check_same_address is True, also remove household members (without adding to selected)

        Returns:
            List of additional people removed due to same address (empty if check_same_address is False)
        """
        # First, find household members if address checking is enabled (before removing the person)
        household_members_removed = []
        if self.check_same_address and self.check_same_address_columns:
            household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

        # Handle the main person selection
        person = self.people.get_person_dict(person_key)
        for feature in self.features.feature_names:
            self.features.remove_remaining(feature, person[feature])
            self.features.add_selected(feature, person[feature])
        self.people.remove(person_key)

        # Then remove household members if any were found
        for household_member_key in household_members_removed:
            household_member = self.people.get_person_dict(household_member_key)
            for feature in self.features.feature_names:
                self.features.remove_remaining(feature, household_member[feature])
                # Note: we don't call add_selected() for household members
            self.people.remove(household_member_key)

        return household_members_removed

    def find_max_ratio_category(self) -> MaxRatioResult:
        """
        Find the feature/value combination with the highest selection ratio.

        The ratio is calculated as: (min - selected) / remaining
        This represents how urgently we need people from this category.
        Higher ratio = more urgent need (fewer people available relative to what we still need).

        Returns:
            MaxRatioResult containing the feature name, value, and a random person index

        Raises:
            SelectionError: If insufficient people remain to meet minimum requirements
        """
        max_ratio = -100.0
        result_feature_name = ""
        result_feature_value = ""
        random_person_index = -1

        for (
            feature_name,
            feature_value,
            fv_counts,
        ) in self.features.feature_values_counts():
            # Check if we have insufficient people to meet minimum requirements
            people_still_needed = fv_counts.min - fv_counts.selected
            if fv_counts.selected < fv_counts.min and fv_counts.remaining < people_still_needed:
                msg = (
                    f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{feature_value}. "
                    f"Need {people_still_needed} more, but only {fv_counts.remaining} remaining."
                )
                raise errors.SelectionError(msg)

            # Skip categories with no remaining people or max = 0
            if fv_counts.remaining == 0 or fv_counts.max == 0:
                continue

            # Calculate the priority ratio
            ratio = people_still_needed / float(fv_counts.remaining)

            # Track the highest ratio category
            if ratio > max_ratio:
                max_ratio = ratio
                result_feature_name = feature_name
                result_feature_value = feature_value
                # from 1 to remaining
                random_person_index = random_provider().randbelow(fv_counts.remaining) + 1

        # If no valid category found, all categories must be at their max or have max=0
        if not result_feature_name:
            msg = "No valid categories found - all may be at maximum or have max=0"
            raise errors.SelectionError(msg)

        return MaxRatioResult(
            feature_name=result_feature_name,
            feature_value=result_feature_value,
            random_person_index=random_person_index,
        )

    def handle_category_full_deletions(self, selected_person_data: dict[str, str]) -> list[str]:
        """
        Check if any categories are now full after a selection and delete remaining people.

        When a person is selected, some categories may reach their maximum quota.
        This method identifies such categories and removes all remaining people from them.

        Args:
            selected_person_data: Dictionary of the selected person's feature values

        Returns:
            List of output messages about categories that became full and people deleted

        Raises:
            SelectionError: If deletions would violate minimum constraints
        """
        output_messages = []

        for (
            feature_name,
            feature_value,
            fv_counts,
        ) in self.features.feature_values_counts():
            if feature_value == selected_person_data[feature_name] and fv_counts.selected == fv_counts.max:
                num_deleted, num_left = self.delete_all_with_feature_value(feature_name, feature_value)
                if num_deleted > 0:
                    output_messages.append(
                        f"Category {feature_name}/{feature_value} full - deleted {num_deleted} people, {num_left} left."
                    )

        return output_messages

delete_all_with_feature_value(feature_name, feature_value)

When a feature/value is "full" we delete everyone else in it. "Full" means that the number selected equals the "max" amount - that is detected elsewhere and then this method is called. Returns count of those deleted, and count of those left

Source code in src/sortition_algorithms/people_features.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def delete_all_with_feature_value(self, feature_name: str, feature_value: str) -> tuple[int, int]:
    """
    When a feature/value is "full" we delete everyone else in it.
    "Full" means that the number selected equals the "max" amount - that
    is detected elsewhere and then this method is called.
    Returns count of those deleted, and count of those left
    """
    # when a category is full we want to delete everyone in it
    people_to_delete: list[str] = []
    for pkey, person in self.people.items():
        if person[feature_name] == feature_value:
            people_to_delete.append(pkey)
            for feature in self.features.feature_names:
                try:
                    self.features.remove_remaining(feature, person[feature])
                except errors.SelectionError as e:
                    msg = (
                        f"SELECTION IMPOSSIBLE: FAIL in delete_all_in_feature_value() "
                        f"as after previous deletion no one/not enough left in {feature} "
                        f"{person[feature]}. Tried to delete: {len(people_to_delete)}"
                    )
                    raise errors.SelectionError(msg) from e

    self.people.remove_many(people_to_delete)
    # return the number of people deleted and the number of people left
    return len(people_to_delete), self.people.count

find_max_ratio_category()

Find the feature/value combination with the highest selection ratio.

The ratio is calculated as: (min - selected) / remaining This represents how urgently we need people from this category. Higher ratio = more urgent need (fewer people available relative to what we still need).

Returns:

Type Description
MaxRatioResult

MaxRatioResult containing the feature name, value, and a random person index

Raises:

Type Description
SelectionError

If insufficient people remain to meet minimum requirements

Source code in src/sortition_algorithms/people_features.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def find_max_ratio_category(self) -> MaxRatioResult:
    """
    Find the feature/value combination with the highest selection ratio.

    The ratio is calculated as: (min - selected) / remaining
    This represents how urgently we need people from this category.
    Higher ratio = more urgent need (fewer people available relative to what we still need).

    Returns:
        MaxRatioResult containing the feature name, value, and a random person index

    Raises:
        SelectionError: If insufficient people remain to meet minimum requirements
    """
    max_ratio = -100.0
    result_feature_name = ""
    result_feature_value = ""
    random_person_index = -1

    for (
        feature_name,
        feature_value,
        fv_counts,
    ) in self.features.feature_values_counts():
        # Check if we have insufficient people to meet minimum requirements
        people_still_needed = fv_counts.min - fv_counts.selected
        if fv_counts.selected < fv_counts.min and fv_counts.remaining < people_still_needed:
            msg = (
                f"SELECTION IMPOSSIBLE: Not enough people remaining in {feature_name}/{feature_value}. "
                f"Need {people_still_needed} more, but only {fv_counts.remaining} remaining."
            )
            raise errors.SelectionError(msg)

        # Skip categories with no remaining people or max = 0
        if fv_counts.remaining == 0 or fv_counts.max == 0:
            continue

        # Calculate the priority ratio
        ratio = people_still_needed / float(fv_counts.remaining)

        # Track the highest ratio category
        if ratio > max_ratio:
            max_ratio = ratio
            result_feature_name = feature_name
            result_feature_value = feature_value
            # from 1 to remaining
            random_person_index = random_provider().randbelow(fv_counts.remaining) + 1

    # If no valid category found, all categories must be at their max or have max=0
    if not result_feature_name:
        msg = "No valid categories found - all may be at maximum or have max=0"
        raise errors.SelectionError(msg)

    return MaxRatioResult(
        feature_name=result_feature_name,
        feature_value=result_feature_value,
        random_person_index=random_person_index,
    )

handle_category_full_deletions(selected_person_data)

Check if any categories are now full after a selection and delete remaining people.

When a person is selected, some categories may reach their maximum quota. This method identifies such categories and removes all remaining people from them.

Parameters:

Name Type Description Default
selected_person_data dict[str, str]

Dictionary of the selected person's feature values

required

Returns:

Type Description
list[str]

List of output messages about categories that became full and people deleted

Raises:

Type Description
SelectionError

If deletions would violate minimum constraints

Source code in src/sortition_algorithms/people_features.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def handle_category_full_deletions(self, selected_person_data: dict[str, str]) -> list[str]:
    """
    Check if any categories are now full after a selection and delete remaining people.

    When a person is selected, some categories may reach their maximum quota.
    This method identifies such categories and removes all remaining people from them.

    Args:
        selected_person_data: Dictionary of the selected person's feature values

    Returns:
        List of output messages about categories that became full and people deleted

    Raises:
        SelectionError: If deletions would violate minimum constraints
    """
    output_messages = []

    for (
        feature_name,
        feature_value,
        fv_counts,
    ) in self.features.feature_values_counts():
        if feature_value == selected_person_data[feature_name] and fv_counts.selected == fv_counts.max:
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, feature_value)
            if num_deleted > 0:
                output_messages.append(
                    f"Category {feature_name}/{feature_value} full - deleted {num_deleted} people, {num_left} left."
                )

    return output_messages

prune_for_feature_max_0()

Check if any feature_value.max is set to zero. if so delete everyone with that feature value NOT DONE: could then check if anyone is left.

Source code in src/sortition_algorithms/people_features.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def prune_for_feature_max_0(self) -> list[str]:
    """
    Check if any feature_value.max is set to zero. if so delete everyone with that feature value
    NOT DONE: could then check if anyone is left.
    """
    # TODO: when do we want to do this?
    msg: list[str] = []
    msg.append(f"Number of people: {self.people.count}.")
    total_num_deleted = 0
    for (
        feature_name,
        feature_value,
        fv_counts,
    ) in self.features.feature_values_counts():
        if fv_counts.max == 0:  # we don't want any of these people
            # pass the message in as deleting them might throw an exception
            msg.append(f"Feature/value {feature_name}/{feature_value} full - deleting people...")
            num_deleted, num_left = self.delete_all_with_feature_value(feature_name, feature_value)
            # if no exception was thrown above add this bit to the end of the previous message
            msg[-1] += f" Deleted {num_deleted}, {num_left} left."
            total_num_deleted += num_deleted
    # if the total number of people deleted is lots then we're probably doing a replacement selection, which means
    # the 'remaining' file will be useless - remind the user of this!
    if total_num_deleted >= self.people.count / 2:
        msg.append(
            ">>> WARNING <<< That deleted MANY PEOPLE - are you doing a "
            "replacement? If so your REMAINING FILE WILL BE USELESS!!!"
        )
    return msg

select_person(person_key)

Selecting a person means: - remove the person from our copy of People - update the selected and remaining counts of the FeatureCollection - if check_same_address is True, also remove household members (without adding to selected)

Returns:

Type Description
list[str]

List of additional people removed due to same address (empty if check_same_address is False)

Source code in src/sortition_algorithms/people_features.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def select_person(self, person_key: str) -> list[str]:
    """
    Selecting a person means:
    - remove the person from our copy of People
    - update the `selected` and `remaining` counts of the FeatureCollection
    - if check_same_address is True, also remove household members (without adding to selected)

    Returns:
        List of additional people removed due to same address (empty if check_same_address is False)
    """
    # First, find household members if address checking is enabled (before removing the person)
    household_members_removed = []
    if self.check_same_address and self.check_same_address_columns:
        household_members_removed = list(self.people.matching_address(person_key, self.check_same_address_columns))

    # Handle the main person selection
    person = self.people.get_person_dict(person_key)
    for feature in self.features.feature_names:
        self.features.remove_remaining(feature, person[feature])
        self.features.add_selected(feature, person[feature])
    self.people.remove(person_key)

    # Then remove household members if any were found
    for household_member_key in household_members_removed:
        household_member = self.people.get_person_dict(household_member_key)
        for feature in self.features.feature_names:
            self.features.remove_remaining(feature, household_member[feature])
            # Note: we don't call add_selected() for household members
        self.people.remove(household_member_key)

    return household_members_removed

WeightedSample

Source code in src/sortition_algorithms/people_features.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class WeightedSample:
    def __init__(self, features: FeatureCollection) -> None:
        """
        This produces a set of lists of feature values for each feature.  Each value
        is in the list `fv_counts.max` times - so a random choice with represent the max.

        So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
        "black" with max 2 we'd get:

        ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

        Then making random choices from that list produces a weighted sample.
        """
        self.weighted: dict[str, list[str]] = defaultdict(list)
        for feature_name, value, fv_counts in features.feature_values_counts():
            self.weighted[feature_name] += [value] * fv_counts.max

    def value_for(self, feature_name: str) -> str:
        # S311 is random numbers for crypto - but this is just for a sample file
        return random_provider().choice(self.weighted[feature_name])

__init__(features)

This produces a set of lists of feature values for each feature. Each value is in the list fv_counts.max times - so a random choice with represent the max.

So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and "black" with max 2 we'd get:

["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

Then making random choices from that list produces a weighted sample.

Source code in src/sortition_algorithms/people_features.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def __init__(self, features: FeatureCollection) -> None:
    """
    This produces a set of lists of feature values for each feature.  Each value
    is in the list `fv_counts.max` times - so a random choice with represent the max.

    So if we had feature "ethnicity", value "white" w max 4, "asian" w max 3 and
    "black" with max 2 we'd get:

    ["white", "white", "white", "white", "asian", "asian", "asian", "black", "black"]

    Then making random choices from that list produces a weighted sample.
    """
    self.weighted: dict[str, list[str]] = defaultdict(list)
    for feature_name, value, fv_counts in features.feature_values_counts():
        self.weighted[feature_name] += [value] * fv_counts.max

simple_add_selected(person_keys, people, features)

Just add the person to the selected counts for the feature values for that person. Don't do the more complex handling of the full PeopleFeatures.add_selected()

Source code in src/sortition_algorithms/people_features.py
235
236
237
238
239
240
241
242
243
def simple_add_selected(person_keys: Iterable[str], people: People, features: FeatureCollection) -> None:
    """
    Just add the person to the selected counts for the feature values for that person.
    Don't do the more complex handling of the full PeopleFeatures.add_selected()
    """
    for person_key in person_keys:
        person = people.get_person_dict(person_key)
        for feature_name in features.feature_names:
            features.add_selected(feature_name, person[feature_name])

People

Source code in src/sortition_algorithms/people.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class People:
    def __init__(self, columns_to_keep: list[str]) -> None:
        self._columns_to_keep = columns_to_keep
        self._full_data: dict[str, dict[str, str]] = {}

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self._full_data == other._full_data and self._columns_to_keep == self._columns_to_keep

    @property
    def count(self) -> int:
        return len(self._full_data)

    def __iter__(self) -> Iterator[str]:
        return iter(self._full_data)

    def items(self) -> ItemsView[str, dict[str, str]]:
        return self._full_data.items()

    def add(self, person_key: str, data: StrippedDict, features: FeatureCollection) -> None:
        person_full_data: dict[str, str] = {}
        # get the feature values: these are the most important and we must check them
        for feature_name, feature_values in features.feature_values():
            # check for input errors here - if it's not in the list of feature values...
            # allow for some unclean data - at least strip empty space, but only if a str!
            # (some values will can be numbers)
            p_value = data[feature_name]
            if p_value not in feature_values:
                exc_msg = (
                    f"ERROR reading in people (read_in_people): "
                    f"Person (id = {person_key}) has value '{p_value}' not in feature {feature_name}"
                )
                raise errors.BadDataError(exc_msg)
            person_full_data[feature_name] = p_value
        # then get the other column values we need
        # this is address, name etc that we need to keep for output file
        # we don't check anything here - it's just for user convenience
        for col in self._columns_to_keep:
            person_full_data[col] = data[col]

        # add all the data to our people object
        self._full_data[person_key] = person_full_data

    def remove(self, person_key: str) -> None:
        del self._full_data[person_key]

    def remove_many(self, person_keys: Iterable[str]) -> None:
        for key in person_keys:
            self.remove(key)

    def get_person_dict(self, person_key: str) -> dict[str, str]:
        return self._full_data[person_key]

    def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
        """
        Generates a dict with:
        - keys: a tuple containing the address strings
        - values: a list of person_key for each person at that address
        """
        households = defaultdict(list)
        for person_key, person in self._full_data.items():
            address = tuple(person[col] for col in address_columns)
            households[address].append(person_key)
        return households

    def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
        """
        Returns a list of person keys for all people who have an address matching
        the address of the person passed in.
        """
        person = self._full_data[person_key]
        person_address = tuple(person[col] for col in address_columns)
        for loop_key, loop_person in self._full_data.items():
            if loop_key == person_key:
                continue  # skip the person we've been given
            if person_address == tuple(loop_person[col] for col in address_columns):
                yield loop_key

    def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
        """
        Find the nth person (1-indexed) in a specific feature category.

        Args:
            feature_name: Name of the feature (e.g., "gender")
            feature_value: Value of the feature (e.g., "male")
            position: 1-indexed position within the category

        Returns:
            Person key of the person at the specified position

        Raises:
            SelectionError: If no person is found at the specified position
        """
        current_position = 0

        for person_key, person_dict in self._full_data.items():
            if person_dict[feature_name] == feature_value:
                current_position += 1
                if current_position == position:
                    return person_key

        # Should always find someone if position is valid
        msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
        raise errors.SelectionError(msg)

find_person_by_position_in_category(feature_name, feature_value, position)

Find the nth person (1-indexed) in a specific feature category.

Parameters:

Name Type Description Default
feature_name str

Name of the feature (e.g., "gender")

required
feature_value str

Value of the feature (e.g., "male")

required
position int

1-indexed position within the category

required

Returns:

Type Description
str

Person key of the person at the specified position

Raises:

Type Description
SelectionError

If no person is found at the specified position

Source code in src/sortition_algorithms/people.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def find_person_by_position_in_category(self, feature_name: str, feature_value: str, position: int) -> str:
    """
    Find the nth person (1-indexed) in a specific feature category.

    Args:
        feature_name: Name of the feature (e.g., "gender")
        feature_value: Value of the feature (e.g., "male")
        position: 1-indexed position within the category

    Returns:
        Person key of the person at the specified position

    Raises:
        SelectionError: If no person is found at the specified position
    """
    current_position = 0

    for person_key, person_dict in self._full_data.items():
        if person_dict[feature_name] == feature_value:
            current_position += 1
            if current_position == position:
                return person_key

    # Should always find someone if position is valid
    msg = f"Failed to find person at position {position} in {feature_name}/{feature_value}"
    raise errors.SelectionError(msg)

households(address_columns)

Generates a dict with: - keys: a tuple containing the address strings - values: a list of person_key for each person at that address

Source code in src/sortition_algorithms/people.py
65
66
67
68
69
70
71
72
73
74
75
def households(self, address_columns: list[str]) -> dict[tuple[str, ...], list[str]]:
    """
    Generates a dict with:
    - keys: a tuple containing the address strings
    - values: a list of person_key for each person at that address
    """
    households = defaultdict(list)
    for person_key, person in self._full_data.items():
        address = tuple(person[col] for col in address_columns)
        households[address].append(person_key)
    return households

matching_address(person_key, address_columns)

Returns a list of person keys for all people who have an address matching the address of the person passed in.

Source code in src/sortition_algorithms/people.py
77
78
79
80
81
82
83
84
85
86
87
88
def matching_address(self, person_key: str, address_columns: list[str]) -> Iterable[str]:
    """
    Returns a list of person keys for all people who have an address matching
    the address of the person passed in.
    """
    person = self._full_data[person_key]
    person_address = tuple(person[col] for col in address_columns)
    for loop_key, loop_person in self._full_data.items():
        if loop_key == person_key:
            continue  # skip the person we've been given
        if person_address == tuple(loop_person[col] for col in address_columns):
            yield loop_key

RandomProvider

Bases: ABC

This is something of a hack. Mostly we want to use the secrets module. But for repeatable testing we might want to set the random.seed sometimes.

So we have a global _random_provider which can be switched between an instance of this class that uses the secrets module and an instance that uses random with a seed. The switch is done by the set_random_provider() function.

Then every time we want some randomness, we call random_provider() to get the current version of the global.

Source code in src/sortition_algorithms/utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class RandomProvider(ABC):
    """
    This is something of a hack. Mostly we want to use the `secrets` module.
    But for repeatable testing we might want to set the random.seed sometimes.

    So we have a global `_random_provider` which can be switched between an
    instance of this class that uses the `secrets` module and an instance that
    uses `random` with a seed. The switch is done by the `set_random_provider()`
    function.

    Then every time we want some randomness, we call `random_provider()` to get
    the current version of the global.
    """

    @classmethod
    @abstractmethod
    def uniform(cls, lower: float, upper: float) -> float: ...

    @classmethod
    @abstractmethod
    def randbelow(cls, upper: int) -> int: ...

    @classmethod
    @abstractmethod
    def choice(cls, seq: "SupportsLenAndGetItem[str]") -> str: ...

StrippedDict

Wraps a dict, and whenever we get a value from it, we convert to str and strip() whitespace

Source code in src/sortition_algorithms/utils.py
22
23
24
25
26
27
28
29
30
31
32
class StrippedDict:
    """
    Wraps a dict, and whenever we get a value from it, we convert to str and
    strip() whitespace
    """

    def __init__(self, raw_dict: Mapping[str, str] | Mapping[str, str | int]) -> None:
        self.raw_dict = raw_dict

    def __getitem__(self, key: str) -> str:
        return strip_str_int(self.raw_dict[key])

print_ret(message)

Print and return a message for output collection.

Source code in src/sortition_algorithms/utils.py
11
12
13
14
15
def print_ret(message: str) -> str:
    """Print and return a message for output collection."""
    # TODO: should we replace this with logging or similar?
    print(message)
    return message