Skip to content

Commit

Permalink
Add Beaker.workspace.iter_* methods (#179)
Browse files Browse the repository at this point in the history
* Add `Beaker.workspace.iter_*` methods

* update clean

* Add `older_than` method to `Beaker.workspace.clear()`

* update comment
  • Loading branch information
epwalsh authored Nov 23, 2022
1 parent 4972c99 commit 4152916
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 33 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use patch releases for compatibility fixes instead.

## Unreleased

### Added

- Added `Beaker.workspace.iter_(images|experiments|datasets)` methods.
- Added `older_than` parameter to `Beaker.workspace.clear()` method.

## [v1.11.6](https://github.com/allenai/beaker-py/releases/tag/v1.11.6) - 2022-11-22

### Added
Expand Down
167 changes: 134 additions & 33 deletions beaker/services/workspace.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict
from typing import Dict, List, Optional, Union
from datetime import datetime
from typing import Dict, Generator, List, Optional, Union

from ..data_model import *
from ..exceptions import *
Expand Down Expand Up @@ -269,14 +270,14 @@ def list(

return workspaces

def images(
def iter_images(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Image]:
) -> Generator[Image, None, None]:
"""
List the images in a workspace.
Iterate over the images in a workspace.
:param workspace: The Beaker workspace name or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
Expand All @@ -291,12 +292,12 @@ def images(
Beaker server.
"""
workspace_name = self.resolve_workspace(workspace, read_only_ok=True).full_name
images: List[Image] = []
cursor: Optional[str] = None
query: Dict[str, str] = {}
if match is not None:
query["q"] = match

count = 0
while True:
query["cursor"] = cursor or ""
page = ImagesPage.from_json(
Expand All @@ -309,24 +310,48 @@ def images(
},
).json()
)
images.extend(page.data)

for image in page.data:
count += 1
yield image
if limit is not None and count >= limit:
return

cursor = page.next_cursor or page.next
if not cursor:
break
if limit is not None and len(images) >= limit:
images = images[:limit]
break

return images
def images(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Image]:
"""
List the images in a workspace.
:param workspace: The Beaker workspace name or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
:param match: Only include images matching the text.
:param limit: Limit the number of images returned.
def experiments(
:raises WorkspaceNotFound: If the workspace doesn't exist.
:raises WorkspaceNotSet: If neither ``workspace`` nor
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
return list(self.iter_images(workspace=workspace, match=match, limit=limit))

def iter_experiments(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Experiment]:
) -> Generator[Experiment, None, None]:
"""
List the experiments in a workspace.
Iterate over the experiments in a workspace.
:param workspace: The Beaker workspace name or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
Expand All @@ -341,12 +366,12 @@ def experiments(
Beaker server.
"""
workspace_name = self.resolve_workspace(workspace, read_only_ok=True).full_name
experiments: List[Experiment] = []
cursor: Optional[str] = None
query: Dict[str, str] = {}
if match is not None:
query["q"] = match

count = 0
while True:
query["cursor"] = cursor or ""
page = ExperimentsPage.from_json(
Expand All @@ -359,26 +384,50 @@ def experiments(
},
).json()
)
experiments.extend(page.data)

for experiment in page.data:
count += 1
yield experiment
if limit is not None and count >= limit:
return

cursor = page.next_cursor or page.next
if not cursor:
break
if limit is not None and len(experiments) >= limit:
experiments = experiments[:limit]
break

return experiments
def experiments(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Experiment]:
"""
List the experiments in a workspace.
def datasets(
:param workspace: The Beaker workspace name or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
:param match: Only include experiments matching the text.
:param limit: Limit the number of experiments returned.
:raises WorkspaceNotFound: If the workspace doesn't exist.
:raises WorkspaceNotSet: If neither ``workspace`` nor
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
return list(self.iter_experiments(workspace=workspace, match=match, limit=limit))

def iter_datasets(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
results: Optional[bool] = None,
uncommitted: Optional[bool] = None,
limit: Optional[int] = None,
) -> List[Dataset]:
) -> Generator[Dataset, None, None]:
"""
List the datasets in a workspace.
Iterate over the datasets in a workspace.
:param workspace: The Beaker workspace name, or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
Expand All @@ -395,7 +444,6 @@ def datasets(
Beaker server.
"""
workspace_name = self.resolve_workspace(workspace, read_only_ok=True).full_name
datasets: List[Dataset] = []
cursor: Optional[str] = None
query: Dict[str, str] = {}
if match is not None:
Expand All @@ -405,6 +453,7 @@ def datasets(
if uncommitted is not None:
query["committed"] = str(not uncommitted).lower()

count = 0
while True:
query["cursor"] = cursor or ""
page = DatasetsPage.from_json(
Expand All @@ -417,15 +466,51 @@ def datasets(
},
).json()
)
datasets.extend(page.data)

for dataset in page.data:
count += 1
yield dataset
if limit is not None and count >= limit:
return

cursor = page.next_cursor or page.next
if not cursor:
break
if limit is not None and len(datasets) >= limit:
datasets = datasets[:limit]
break

return datasets
def datasets(
self,
workspace: Optional[Union[str, Workspace]] = None,
match: Optional[str] = None,
results: Optional[bool] = None,
uncommitted: Optional[bool] = None,
limit: Optional[int] = None,
) -> List[Dataset]:
"""
List the datasets in a workspace.
:param workspace: The Beaker workspace name, or object. If not specified,
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` is used.
:param match: Only include datasets matching the text.
:param results: Only include/exclude experiment result datasets.
:param uncommitted: Only include/exclude uncommitted datasets.
:param limit: Limit the number of datasets returned.
:raises WorkspaceNotFound: If the workspace doesn't exist.
:raises WorkspaceNotSet: If neither ``workspace`` nor
:data:`Beaker.config.default_workspace <beaker.Config.default_workspace>` are set.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
return list(
self.iter_datasets(
workspace=workspace,
match=match,
results=results,
uncommitted=uncommitted,
limit=limit,
)
)

def secrets(self, workspace: Optional[Union[str, Workspace]] = None) -> List[Secret]:
"""
Expand Down Expand Up @@ -626,6 +711,7 @@ def clear(
images: bool = True,
datasets: bool = True,
secrets: bool = True,
older_than: Optional[datetime] = None,
):
"""
Remove groups, experiments, images, datasets, and secrets from a workspace.
Expand All @@ -637,6 +723,7 @@ def clear(
:param images: Whether to delete images.
:param datasets: Whether to delete datasets.
:param secrets: Whether to delete secrets.
:param older_than: Only delete objects created before this date.
:raises WorkspaceNotFound: If the workspace doesn't exist.
:raises WorkspaceNotSet: If neither ``workspace`` nor
Expand All @@ -647,36 +734,50 @@ def clear(
"""
import concurrent.futures

def should_delete(created: Optional[datetime]) -> bool:
if older_than is None or created is None:
return True
if any([dt.tzinfo is None for dt in (created, older_than)]):
return created.replace(tzinfo=None) < older_than.replace(tzinfo=None)
else:
return created < older_than

deletion_counts: Dict[str, int] = defaultdict(int)
with concurrent.futures.ThreadPoolExecutor() as executor:
deletion_futures = []

if groups:
for group in self.groups(workspace):
for group in filter(lambda x: should_delete(x.created), self.groups(workspace)):
future = executor.submit(self.beaker.group.delete, group)
deletion_futures.append(future)
deletion_counts["groups_deleted"] += 1

if experiments:
for experiment in self.experiments(workspace):
for experiment in filter(
lambda x: should_delete(x.created), self.iter_experiments(workspace)
):
future = executor.submit(self.beaker.experiment.delete, experiment)
deletion_futures.append(future)
deletion_counts["experiments_deleted"] += 1

if images:
for image in self.images(workspace):
for image in filter(
lambda x: should_delete(x.committed), self.iter_images(workspace)
):
future = executor.submit(self.beaker.image.delete, image)
deletion_futures.append(future)
deletion_counts["images_deleted"] += 1

if datasets:
for dataset in self.datasets(workspace):
for dataset in filter(
lambda x: should_delete(x.created), self.iter_datasets(workspace)
):
future = executor.submit(self.beaker.dataset.delete, dataset)
deletion_futures.append(future)
deletion_counts["datasets_deleted"] += 1

if secrets:
for secret in self.secrets(workspace):
for secret in filter(lambda x: should_delete(x.created), self.secrets(workspace)):
future = executor.submit(self.beaker.secret.delete, secret, workspace)
deletion_futures.append(future)
deletion_counts["secrets_deleted"] += 1
Expand Down

0 comments on commit 4152916

Please sign in to comment.