Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search Methods Enhancements to Avoid Duplicate Evaluated Pipelines πŸ₯ˆ #211

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions gama/gama.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def __init__(
eliminate=eliminate_from_pareto,
evaluate_callback=self._on_evaluation_completed,
completed_evaluations=self._evaluation_library.lookup,
is_evaluated=self._evaluation_library.is_evaluated,
)

def cleanup(self, which="evaluations") -> None:
Expand Down
2 changes: 2 additions & 0 deletions gama/genetic_programming/operator_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
evaluate_callback: Callable[[Evaluation], None],
max_retry: int = 50,
completed_evaluations: Optional[Dict[str, Evaluation]] = None,
is_evaluated: Optional[Callable[[Individual], bool]] = None,
):
self._mutate = mutate
self._mate = mate
Expand All @@ -37,6 +38,7 @@ def __init__(
self._evaluate = None
self._evaluate_callback = evaluate_callback
self.evaluate: Optional[Callable[..., Evaluation]] = None
self.is_evaluated = is_evaluated

self._completed_evaluations = completed_evaluations

Expand Down
17 changes: 16 additions & 1 deletion gama/search_methods/asha.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def asha(
maximum_resource: Union[int, float] = 1.0,
minimum_early_stopping_rate: int = 0,
max_full_evaluations: Optional[int] = None,
max_attempts: int = 100000,
) -> List[Individual]:
"""Asynchronous Halving Algorithm by Li et al.

Expand All @@ -115,6 +116,9 @@ def asha(
max_full_evaluations: Optional[int] (default=None)
Maximum number of individuals to evaluate on the max rung (i.e. on all data).
If None, the algorithm will be run indefinitely.
max_attempts: int (default=100000)
Maximum number of attempts to generate a unique individual otherwise raise
an error.

Returns
-------
Expand Down Expand Up @@ -163,7 +167,18 @@ def get_job():

if start_candidates:
return start_candidates.pop(), minimum_early_stopping_rate
return operations.individual(), minimum_early_stopping_rate

attempts = 0
while (new_individual := operations.individual()) and operations.is_evaluated(
new_individual
):
if attempts >= max_attempts:
raise ValueError(
"Maximum attempts reached while trying to generate a"
"unique individual."
)
attempts += 1
return new_individual, minimum_early_stopping_rate

try:
with AsyncEvaluator() as async_:
Expand Down
32 changes: 30 additions & 2 deletions gama/search_methods/async_ea.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,31 @@ def search(
)


def generate_unique_individual(
ops: OperatorSet, generator_function: Callable, max_attempts: int
) -> Individual:
"""Generate a unique individual using the given generator function"""
attempts = 0
while (new_individual := generator_function()) and ops.is_evaluated(
new_individual
): # type: ignore
if attempts >= max_attempts:
raise ValueError(
"Maximum attempts reached while trying to generate a"
"unique individual."
)
attempts += 1
return new_individual


def async_ea(
ops: OperatorSet,
output: List[Individual],
start_candidates: List[Individual],
restart_callback: Optional[Callable[[], bool]] = None,
max_n_evaluations: Optional[int] = None,
population_size: int = 50,
max_attempts: int = 100000,
) -> List[Individual]:
"""Perform asynchronous evolutionary optimization with given operators.

Expand All @@ -97,6 +115,9 @@ def async_ea(
If None, the algorithm will be run indefinitely.
population_size: int (default=50)
Maximum number of individuals in the population at any time.
max_attempts: int (default=100000)
Maximum number of attempts to generate a unique individual otherwise raise
an error.

Returns
-------
Expand Down Expand Up @@ -139,14 +160,21 @@ def async_ea(
# Increasing the number decreases the risk of lost compute time,
# but also increases information lag. An offspring created too
# early might miss out on a better parent.
new_individual = ops.create(current_population, 1)[0]
new_individual = generate_unique_individual(
ops, lambda: ops.create(current_population, 1)[0], max_attempts
)
async_.submit(ops.evaluate, new_individual)

should_restart = restart_callback is not None and restart_callback()
n_evaluated_individuals += 1
if should_restart:
log.info("Restart criterion met. Creating new random population.")
start_candidates = [ops.individual() for _ in range(max_pop_size)]
start_candidates = [
generate_unique_individual(
ops, lambda: ops.individual(), max_attempts
)
for _ in range(max_pop_size)
]
break

return current_population
22 changes: 20 additions & 2 deletions gama/search_methods/random_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def random_search(
output: List[Individual],
start_candidates: List[Individual],
max_evaluations: Optional[int] = None,
max_attempts: int = 100000,
) -> List[Individual]:
"""Perform random search over all possible pipelines.
"""Perform random search over all possible pipelines

Parameters
----------
Expand All @@ -47,6 +48,9 @@ def random_search(
max_evaluations: int, optional (default=None)
If specified, only a maximum of `max_evaluations` individuals are evaluated.
If None, the algorithm will be run indefinitely.
max_attempts: int (default=100000)
Maximum number of attempts to generate a unique individual otherwise raise
an error.

Returns
-------
Expand All @@ -63,6 +67,20 @@ def random_search(
future = operations.wait_next(async_)
if future.result is not None:
output.append(future.result.individual)
async_.submit(operations.evaluate, operations.individual())

attempts = 0
while (
new_individual := operations.individual()
) and operations.is_evaluated(
new_individual
): # type: ignore
if attempts >= max_attempts:
raise ValueError(
"Maximum attempts reached while trying to generate a"
"unique individual."
)
attempts += 1

async_.submit(operations.evaluate, new_individual)

return output
10 changes: 10 additions & 0 deletions gama/utilities/evaluation_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,13 @@ def n_best(self, n: int = 5, with_pipelines=True) -> List[Evaluation]:
return heapq.nlargest(n, self.top_evaluations)
else:
return list(reversed(sorted(self.evaluations)))[:n]

def is_evaluated(self, candidate: Union[Individual, None]) -> bool:
"""Check if a candidate pipeline has already been evaluated."""
if candidate is None:
log.warning("Candidate to check is None. Returning False.")
return False
return any(
str(candidate.pipeline) == str(evaluation.individual.pipeline)
for evaluation in self.lookup.values()
)