diff --git a/docs/notebooks/Optimization.ipynb b/docs/notebooks/Optimization.ipynb new file mode 100644 index 00000000..75960782 --- /dev/null +++ b/docs/notebooks/Optimization.ipynb @@ -0,0 +1,392 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Introduction to Optimization in GerryChain\n", + "\n", + "This notebook walks through how to use the SingleMetricOptimizer class (as well as its Gingleator subclass) in order to perfrom heuristic optimization runs in GerryChain.\n", + "\n", + "There are 3 heuristic optimization methods whose use is shown in this notebook:\n", + "* **Short Bursts**: chaining together small neutral explorers ([More reading about short bursts here](https://arxiv.org/abs/2011.02288))\n", + "* **Simulated Annealing**: vary the probability of accepting worse plans over time wrt a temperature parameter $\\beta$.\n", + "* **Tilted runs**: accept worse plans with a fixed probability $p$\n", + "\n", + "## When do we want to use Heuristic Optimization?\n", + "\n", + "While sampling naively with GerryChain can give us an understanding of the neutral baseline for a state, there are often cases where we want to find plans with properties that are rare to encounter in a neutral run. Many states have laws/guidelines that state that plans should be as compact as feasibly possible, maximize preservation of political boundaries and/or communities of interest, some even look to minimize double bunking of incumbents or seek proportionality/competitiveness in contests. Heuristic optimization methods can be used to find example plans with these properties and to explore the trade-offs between them." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from gerrychain import (GeographicPartition, Partition, Graph, MarkovChain,\n", + " proposals, updaters, constraints, accept, Election)\n", + "# from gerrychain.proposals import ReCom\n", + "from gerrychain.optimization import SingleMetricOptimizer, Gingleator\n", + "from gerrychain.tree import recursive_seed_part\n", + "# from gerrychain.updaters import Tally\n", + "from functools import partial\n", + "import pandas as pd\n", + "import json\n", + "import requests\n", + "from networkx.readwrite import json_graph\n", + "import matplotlib.pyplot as plt\n", + "from tqdm import tqdm\n", + "import numpy as np" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "response = json.loads(requests.get(\"https://people.csail.mit.edu/ddeford/BG/BG_05.json\").text)\n", + "graph = Graph(json_graph.adjacency_graph(response))" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "POPCOL = \"TOTPOP\"\n", + "SEN_DISTS = 35\n", + "EPS = 0.02\n", + "TOTPOP = sum(graph.nodes()[n][POPCOL] for n in graph.nodes())" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "sddict = recursive_seed_part(graph, range(SEN_DISTS), TOTPOP/SEN_DISTS, POPCOL, EPS)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "chain_updaters = {\n", + " \"population\": updaters.Tally(POPCOL, alias=\"population\"),\n", + " \"VAP\": updaters.Tally(\"VAP\"),\n", + " \"BVAP\": updaters.Tally(\"BVAP\")\n", + " }\n", + "part = Partition(graph, sddict, chain_updaters)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "proposal = partial(proposals.recom,\n", + " pop_col=POPCOL,\n", + " pop_target=TOTPOP/SEN_DISTS,\n", + " epsilon=EPS,\n", + " node_repeats=1)\n", + "cons = constraints.within_percent_of_ideal_population(part, EPS)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Optimizing for Gingles Districts\n", + "\n", + "Named for the Supreme Court case _Thornburg v. Gingles_, which created their precedent as one of the litmus tests in bringing forth a VRA court case, *Gingles' Districts* are districts that are 50% + 1 of a minority population subgroup (more colloquially called majority-minority districts). It is common to seek plans with greater/maximal numbers of gingles districts to understand the landscape of the state space." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "gingles = Gingleator(proposal, cons, part, \n", + " minority_pop_col=\"BVAP\", total_pop_col=\"VAP\",\n", + " score_function=Gingleator.reward_partial_dist)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "total_steps = 10000" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [06:16<00:00, 26.56it/s]\n" + ] + } + ], + "source": [ + "max_scores_sb = np.zeros(total_steps)\n", + "scores_sb = np.zeros(total_steps)\n", + "for i, part in enumerate(gingles.short_bursts(10, 1000, with_progress_bar=True)):\n", + " max_scores_sb[i] = gingles.best_score\n", + " scores_sb[i] = gingles.score(part)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [07:48<00:00, 21.33it/s]\n" + ] + } + ], + "source": [ + "max_scores_anneal = np.zeros(total_steps)\n", + "scores_anneal = np.zeros(total_steps)\n", + "for i, part in enumerate(gingles.simulated_annealing(total_steps, gingles.jumpcycle_beta_function(1000, 4000),\n", + " beta_magnitude=500, with_progress_bar=True)):\n", + " max_scores_anneal[i] = gingles.best_score\n", + " scores_anneal[i] = gingles.score(part)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [07:41<00:00, 21.65it/s]\n" + ] + } + ], + "source": [ + "max_scores_tilt = np.zeros(total_steps)\n", + "scores_tilt = np.zeros(total_steps)\n", + "for i, part in enumerate(gingles.tilted_run(total_steps, 0.125, with_progress_bar=True)):\n", + " max_scores_tilt[i] = gingles.best_score\n", + " scores_tilt[i] = gingles.score(part)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "fig, ax = plt.subplots(figsize=(12,6))\n", + "plt.plot(max_scores_sb, label=\"Short Bursts\")\n", + "plt.plot(max_scores_anneal, label=\"Simulated Annealing\")\n", + "plt.plot(max_scores_tilt, label=\"Tilted Run\")\n", + "plt.xlabel(\"Steps\", fontsize=20)\n", + "plt.ylabel(\"Max Score Observered\", fontsize=20)\n", + "plt.legend()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "fig, ax = plt.subplots(figsize=(12,6))\n", + "plt.plot(scores_sb, label=\"Short Bursts\")\n", + "plt.plot(scores_anneal, label=\"Simulated Annealing\")\n", + "plt.plot(scores_tilt, label=\"Tilted Run\")\n", + "plt.xlabel(\"Steps\", fontsize=20)\n", + "plt.ylabel(\"Score Observered\", fontsize=20)\n", + "plt.legend()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Optimizing for Compactness\n", + "\n", + "Another metric we may seek to optimize over is compactness. Below we look at minimizing the number of cut edges in a plan." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "num_cut_edges = lambda p: len(p[\"cut_edges\"])\n", + "optimizer = SingleMetricOptimizer(proposal, cons, part, num_cut_edges, maximize=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [05:20<00:00, 31.20it/s]\n" + ] + } + ], + "source": [ + "total_steps = 10000\n", + "min_scores_sb = np.zeros(total_steps)\n", + "for i, part in enumerate(optimizer.short_bursts(5, 2000, with_progress_bar=True)):\n", + " min_scores_sb[i] = optimizer.best_score" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [06:31<00:00, 25.57it/s]\n" + ] + } + ], + "source": [ + "min_scores_anneal = np.zeros(total_steps)\n", + "for i, part in enumerate(optimizer.simulated_annealing(total_steps, optimizer.jumpcycle_beta_function(200, 800),\n", + " beta_magnitude=1, with_progress_bar=True)):\n", + " min_scores_anneal[i] = optimizer.best_score" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████| 10000/10000 [06:05<00:00, 27.34it/s]\n" + ] + } + ], + "source": [ + "min_scores_tilt = np.zeros(total_steps)\n", + "for i, part in enumerate(optimizer.tilted_run(total_steps, p=0.125, with_progress_bar=True)):\n", + " min_scores_tilt[i] = optimizer.best_score" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "fig, ax = plt.subplots(figsize=(12,6))\n", + "plt.plot(min_scores_sb, label=\"Short Bursts\")\n", + "plt.plot(min_scores_anneal, label=\"Simulated Annealing\")\n", + "plt.plot(min_scores_tilt, label=\"Tilted Run\")\n", + "plt.xlabel(\"Steps\", fontsize=20)\n", + "plt.ylabel(\"Min #CutEdges Observered\", fontsize=20)\n", + "plt.legend()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "de0b2e8f3822c864e297dc34a9420a5549c338b59d7a373e92a79db0315964f9" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/gerrychain/optimization/__init__.py b/gerrychain/optimization/__init__.py new file mode 100644 index 00000000..831dd950 --- /dev/null +++ b/gerrychain/optimization/__init__.py @@ -0,0 +1,4 @@ +from .optimization import SingleMetricOptimizer +from .gingleator import Gingleator + +__all__ = ['SingleMetricOptimizer', 'Gingleator'] \ No newline at end of file diff --git a/gerrychain/optimization/gingleator.py b/gerrychain/optimization/gingleator.py new file mode 100755 index 00000000..6b461321 --- /dev/null +++ b/gerrychain/optimization/gingleator.py @@ -0,0 +1,168 @@ +from .optimization import SingleMetricOptimizer + +from functools import partial +import numpy as np +import warnings + + +class Gingleator(SingleMetricOptimizer): + """ + `Gingleator` is a child class of `SingleMetricOptimizer` which can be used to search for plans + with increased numbers of Gingles' districts. + + A gingles district (named for the Supreme Court case Thornburg v. Gingles) is a district that is + majority-minority. aka 50% + 1 of some population subgroup. Demonstrating additional Gingles + districts is one of the litmus tests used in bringing forth a VRA case. + """ + + def __init__(self, proposal, constraints, initial_state, + minority_perc_col=None, threshold=0.5, score_function=None, + minority_pop_col=None, total_pop_col="TOTPOP", + min_perc_column_name="_gingleator_auxiliary_helper_updater_min_perc_col"): + """ + :param `proposal`: Function proposing the next state from the current state. + :param `constraints`: A function with signature ``Partition -> bool`` determining whether + the proposed next state is valid (passes all binary constraints). Usually this is a + :class:`~gerrychain.constraints.Validator` class instance. + :param `initial_state`: Initial :class:`gerrychain.partition.Partition` class. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + :param `score_function`: The function to using doing optimization. Should have the + signature ``Partition * str (minority_perc_col) * float (threshold) -> + 'a where 'a is Comparable``. This class implements a few potential choices as class + methods. + :param `minority_pop_col`: If minority_perc_col is defined, the minority population column + with which to compute percentage. + :param `total_pop_col`: If minority_perc_col is defined, the total population column with + which to compute percentage. + :param `min_perc_column_name`: If minority_perc_col is defined, the name to give the created + percentage updater. + """ + if minority_perc_col is None and minority_pop_col is None: + raise ValueError("`minority_perc_col` and `minority_pop_col` cannot both be `None`. \ + Unclear how to compute gingles district.") + elif minority_perc_col is not None and minority_pop_col is not None: + warnings.warn("`minority_perc_col` and `minority_pop_col` are both specified. By \ + default `minority_perc_col` will be used.") + score_function = self.num_opportunity_dists if score_function is None else score_function + + if minority_perc_col is None: + perc_up = {min_perc_column_name: + lambda part: {k: part[minority_pop_col][k] / part[total_pop_col][k] + for k in part.parts.keys()}} + initial_state.updaters.update(perc_up) + minority_perc_col = min_perc_column_name + + score = partial(score_function, minority_perc_col=minority_perc_col, threshold=threshold) + + super().__init__(proposal, constraints, initial_state, score, maximize=True) + + """ + Score Functions + """ + + @classmethod + def num_opportunity_dists(cls, part, minority_perc_col, threshold): + """ + Given a partition, returns the number of opportunity districts. + + :param `part`: Partition to score. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + + :rtype int + """ + dist_percs = part[minority_perc_col].values() + return sum(list(map(lambda v: v >= threshold, dist_percs))) + + @classmethod + def reward_partial_dist(cls, part, minority_perc_col, threshold): + """ + Given a partition, returns the number of opportunity districts + the percentage of the next + highest district. + + :param `part`: Partition to score. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + + :rtype float + """ + dist_percs = part[minority_perc_col].values() + num_opport_dists = sum(list(map(lambda v: v >= threshold, dist_percs))) + next_dist = max(i for i in dist_percs if i < threshold) + return num_opport_dists + next_dist + + @classmethod + def reward_next_highest_close(cls, part, minority_perc_col, threshold): + """ + Given a partition, returns the number of opportunity districts, if no additional district + is within 10% of reaching the threshold. If one is, the distance that district is from the + threshold is scaled between 0 and 1 and added to the count of opportunity districts. + + :param `part`: Partition to score. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + + :rtype float + """ + dist_percs = part[minority_perc_col].values() + num_opport_dists = sum(list(map(lambda v: v >= threshold, dist_percs))) + next_dist = max(i for i in dist_percs if i < threshold) + + if next_dist < threshold - 0.1: + return num_opport_dists + else: + return num_opport_dists + (next_dist - threshold + 0.1) * 10 + + @classmethod + def penalize_maximum_over(cls, part, minority_perc_col, threshold): + """ + Given a partition, returns the number of opportunity districts + (1 - the maximum excess) + scaled to between 0 and 1. + + :param `part`: Partition to score. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + + :rtype float + """ + dist_percs = part[minority_perc_col].values() + num_opportunity_dists = sum(list(map(lambda v: v >= threshold, dist_percs))) + if num_opportunity_dists == 0: + return 0 + else: + max_dist = max(dist_percs) + return num_opportunity_dists + (1 - max_dist) / (1 - threshold) + + @classmethod + def penalize_avg_over(cls, part, minority_perc_col, threshold): + """ + Given a partition, returns the number of opportunity districts + (1 - the average excess) + scaled to between 0 and 1. + + :param `part`: Partition to score. + :param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of + minority population within that district. + :param `threshold`: Beyond which fraction to consider something a "Gingles" + (or opportunity) district. + + :rtype float + """ + dist_percs = part[minority_perc_col].values() + opport_dists = list(filter(lambda v: v >= threshold, dist_percs)) + if opport_dists == []: + return 0 + else: + num_opportunity_dists = len(opport_dists) + avg_opportunity_dist = np.mean(opport_dists) + return num_opportunity_dists + (1 - avg_opportunity_dist) / (1 - threshold) diff --git a/gerrychain/optimization/optimization.py b/gerrychain/optimization/optimization.py new file mode 100644 index 00000000..611d585c --- /dev/null +++ b/gerrychain/optimization/optimization.py @@ -0,0 +1,381 @@ +from ..chain import MarkovChain +from .. partition import Partition +from ..accept import always_accept +import random +from typing import Union, Callable, List, Any +from tqdm import tqdm +import math + + +class SingleMetricOptimizer: + """ + SingleMetricOptimizer represents the class of algorithms / chains that optimize plans with + respect to a single metric. An instance of this class encapsulates the following state + information: + * the dualgraph and updaters via the initial partition, + * the constraints new proposals are subject to, + * the metric over which to optimize, + * and whether or not to seek maximal or minimal values of the metric. + + The SingleMetricOptimizer class implements the following common methods of optimization: + * Short Bursts + * Simulated Annealing + * Tilted Runs + + Both during and after a optimization run, the class properties `best_part` and `best_score` + represent the optimal partition / corresponding score value observed. Note that these + properties do NOT persist across multiple optimization runs, as they are reset each time an + optimization run is invoked. + """ + + def __init__(self, proposal: Callable[[Partition], Partition], + constraints: Union[Callable[[Partition], bool], List[Callable[[Partition], bool]]], + initial_state: Partition, optimization_metric: Callable[[Partition], Any], + maximize: bool = True, step_indexer: str = "step"): + """ + Args: + proposal (Callable[[Partition], Partition]): Function proposing the next state from the + current state. + constraints (Union[Callable[[Partition], bool], List[Callable[[Partition], bool]]]): + A function, or lists of functions, determining whether the proposed next state is + valid (passes all binary constraints). Usually this is a + `~gerrychain.constraints.Validator` class instance. + initial_state (Partition): Initial state of the optimizer. + optimization_metric (Callable[[Partition], Any]): The score function with which to + optimize over. This should have the signature: + `Partition -> 'a where 'a is Comparable` + maximize (bool, optional): Whether to minimize or maximize the function? Defaults to + maximize. + step_indexer (str, optional): Name of the updater tracking the partitions step in the + chain. If not implemented on the partition the constructor creates and adds it. + Defaults to `"step"`. + + Returns: + A SingleMetricOptimizer object + """ + self._initial_part = initial_state + self._proposal = proposal + self._constraints = constraints + self._score = optimization_metric + self._maximize = maximize + self._best_part = None + self._best_score = None + self._step_indexer = step_indexer + + if self._step_indexer not in self._initial_part.updaters: + step_updater = lambda p: 0 if p.parent is None else p.parent[self._step_indexer] + 1 + self._initial_part.updaters[self._step_indexer] = step_updater + + @property + def best_part(self) -> Partition: + """ + Partition object corresponding to best scoring plan observed over the current (or most + recent) optimization run. + """ + return self._best_part + + @property + def best_score(self) -> Any: + """ + Value of score metric corresponding to best scoring plan observed over the current (or most + recent) optimization run. + """ + return self._best_score + + @property + def score(self) -> Callable[[Partition], Any]: + """ + The score function which is being optimized over. + """ + return self._score + + def _is_improvement(self, new_score, old_score) -> bool: + """ + Helper function defining improvement comparison between scores. Scores can be any + comparable type. + + Args: + new_score (Any): Score of proposed partition. + old_score (Any): Score of previous partition. + + Returns: + Whether the new score is an improvement over the old score. + + """ + if self._maximize: + return new_score >= old_score + else: + return new_score <= old_score + + def _tilted_acceptance_function(self, p: float) -> Callable[[Partition], bool]: + """ + Function factory that binds and returns a tilted acceptance function. + + Args: + p (float): The probability of accepting a worse score. + + Returns: + A acceptance function for tilted chains. + """ + def tilted_acceptance_function(part): + if part.parent is None: + return True + + part_score = self.score(part) + prev_score = self.score(part.parent) + + if self._is_improvement(part_score, prev_score): + return True + else: + return random.random() < p + + return tilted_acceptance_function + + def _simulated_annealing_acceptance_function(self, beta_function: Callable[[int], float], + beta_magnitude: float): + """ + Function factory that binds and returns a simulated annealing acceptance function. + + Args: + beta_functions (Callable[[int], float]): Function (f: t -> beta, where beta is in [0,1]) + defining temperature over time. f(t) = 0 the chain is hot and every proposal is + accepted. At f(t) = 1 the chain is cold and worse proposal have a low probability + of being accepted relative to the magnitude of change in score. + beta_magnitude (float): Scaling parameter for how much to weight changes in score. + + Returns: + A acceptance function for simulated annealing runs. + """ + def simulated_annealing_acceptance_function(part): + if part.parent is None: + return True + score_delta = self.score(part) - self.score(part.parent) + beta = beta_function(part[self._step_indexer]) + if self._maximize: + score_delta *= -1 + return random.random() < math.exp(-beta * beta_magnitude * score_delta) + + return simulated_annealing_acceptance_function + + @classmethod + def jumpcycle_beta_function(cls, duration_hot: int, + duration_cold: int) -> Callable[[int], float]: + """ + Class method that binds and return simple hot-cold cycle beta temperature function, where + the chain runs hot for some given duration and then cold for some duration, and repeats that + cycle. + + Args: + duration_hot: Number of steps to run chain hot. + duration_cold: Number of steps to run chain cold. + + Returns: + Beta function defining hot-cold cycle. + """ + cycle_length = duration_hot + duration_cold + + def beta_function(step: int): + time_in_cycle = step % cycle_length + return float(time_in_cycle >= duration_hot) + + return beta_function + + @classmethod + def linearcycle_beta_function(cls, duration_hot: int, duration_cooldown: int, + duration_cold: int) -> Callable[[int], float]: + cycle_length = duration_hot + duration_cooldown + duration_cold + + def beta_function(step: int): + pass + + return beta_function + + @classmethod + def logitcycle_beta_function(cls, duration_hot: int, duration_cooldown: int, + duration_cold: int) -> Callable[[int], float]: + cycle_length = duration_hot + duration_cooldown + duration_cold + + def beta_function(step: int): + pass + + return beta_function + + def short_bursts(self, burst_length: int, num_bursts: int, + accept: Callable[[Partition], bool] = always_accept, + with_progress_bar: bool = False): + """ + Performs a short burst run using the instance's score function. Each burst starts at the + best performing plan of the previous burst. If there's a tie, the later observed one is + selected. + + Args + burst_length (int): How many steps to run within each burst? + num_bursts (int): How many bursts to perform? + accept (Callable[[Partition], bool], optional): Function accepting or rejecting the + proposed state. In the most basic use case, this always returns True. + with_progress_bar (bool, optional): Whether or not to draw tqdm progress bar. Defaults + to False. + + Returns: + Partition generator. + """ + if with_progress_bar: + for part in tqdm(self.short_bursts(burst_length, num_bursts, accept, + with_progress_bar=False), + total=burst_length * num_bursts): + yield part + return + + self._best_part = self._initial_part + self._best_score = self.score(self._best_part) + + for _ in range(num_bursts): + chain = MarkovChain(self._proposal, self._constraints, accept, self._best_part, + burst_length) + + for part in chain: + yield part + part_score = self.score(part) + + if self._is_improvement(part_score, self._best_score): + self._best_part = part + self._best_score = part_score + + def simulated_annealing(self, num_steps: int, beta_function: Callable[[int], float], + beta_magnitude: float = 1, with_progress_bar: bool = False): + """ + Performs simulated annealing with respect to the class instance's score function. + + Args: + num_steps (int): How many steps to run for. + beta_function (Callable[[int], float]): Function (f: t -> beta, where beta is in [0,1]) + defining temperature over time. f(t) = 0 the chain is hot and every proposal is + accepted. At f(t) = 1 the chain is cold and worse proposal have a low probability + of being accepted relative to the magnitude of change in score. + beta_magnitude (float): Scaling parameter for how much to weight changes in score. + with_progress_bar (bool, optional): Whether or not to draw tqdm progress bar. Defaults + to False. + + Returns: + Partition generator. + """ + chain = MarkovChain(self._proposal, self._constraints, + self._simulated_annealing_acceptance_function(beta_function, + beta_magnitude), + self._initial_part, num_steps) + + self._best_part = self._initial_part + self._best_score = self.score(self._best_part) + + chain_generator = tqdm(chain) if with_progress_bar else chain + + for part in chain_generator: + yield part + part_score = self.score(part) + if self._is_improvement(part_score, self._best_score): + self._best_part = part + self._best_score = part_score + + def tilted_short_bursts(self, burst_length: int, num_bursts: int, p: float, + with_progress_bar: bool = False): + """ + Performs a short burst run using the instance's score function. Each burst starts at the + best performing plan of the previous burst. If there's a tie, the later observed one is + selected. Within each burst a tilted acceptance function is used where better scoring plans + are always accepted and worse scoring plans are accepted with probability `p`. + + Args + burst_length (int): How many steps to run within each burst? + num_bursts (int): How many bursts to perform? + p (float): The probability of accepting a plan with a worse score. + with_progress_bar (bool, optional): Whether or not to draw tqdm progress bar. Defaults + to False. + + Returns: + Partition generator. + """ + return self.short_bursts(burst_length, num_bursts, + accept=self._tilted_acceptance_function(p), + with_progress_bar=with_progress_bar) + + def variable_length_short_bursts(self, num_steps: int, stuck_buffer: int, + accept: Callable[[Partition], bool] = always_accept, + with_progress_bar: bool = False): + """ + Performs a short burst where the burst length is allowed to increase as it gets harder to + find high scoring plans. The initial burst length is set to 2, and it is doubled each time + there is no improvement over the passed number (`stuck_buffer`) of runs. + + Args: + num_steps (int): How many steps to run for. + stuck_buffer (int): How many bursts of a given length with no improvement to allow + before increasing the burst length. + accept (Callable[[Partition], bool], optional): Function accepting or rejecting the + proposed state. In the most basic use case, this always returns True. + with_progress_bar (bool, optional): Whether or not to draw tqdm progress bar. Defaults + to False. + + Returns: + Partition generator. + """ + if with_progress_bar: + for part in tqdm(self.variable_length_short_bursts(num_steps, stuck_buffer, accept, + with_progress_bar=False), total=num_steps): + yield part + return + + self._best_part = self._initial_part + self._best_score = self.score(self._best_part) + time_stuck = 0 + burst_length = 2 + i = 0 + + while(i < num_steps): + chain = MarkovChain(self._proposal, self._constraints, accept, self._best_part, + burst_length) + for part in chain: + yield part + part_score = self.score(part) + if self._is_improvement(part_score, self._best_score): + self._best_part = part + self._best_score = part_score + time_stuck = 0 + else: + time_stuck += 1 + + i += 1 + if i >= num_steps: + break + + if time_stuck >= stuck_buffer * burst_length: + burst_length *= 2 + + def tilted_run(self, num_steps: int, p: float, with_progress_bar: bool = False): + """ + Performs a tilted run. A chain where the acceptance function always accepts better plans + and accepts worse plans with some probability `p`. + + Args: + num_steps (int): How many steps to run for. + p (float): The probability of accepting a plan with a worse score. + with_progress_bar (bool, optional): Whether or not to draw tqdm progress bar. Defaults + to False. + + Returns: + Partition generator. + """ + chain = MarkovChain(self._proposal, self._constraints, self._tilted_acceptance_function(p), + self._initial_part, num_steps) + + self._best_part = self._initial_part + self._best_score = self.score(self._best_part) + + chain_generator = tqdm(chain) if with_progress_bar else chain + + for part in chain_generator: + yield part + part_score = self.score(part) + + if self._is_improvement(part_score, self._best_score): + self._best_part = part + self._best_score = part_score