Skip to content

Commit

Permalink
Merge pull request #37 from allenai/datasets
Browse files Browse the repository at this point in the history
add Beaker.create_dataset() method
  • Loading branch information
epwalsh authored Mar 31, 2022
2 parents 786a2a5 + 5456651 commit 0f3783f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Added `Beaker.create_dataset()` method.

## [v0.2.6](https://github.com/allenai/beaker-py/releases/tag/v0.2.6) - 2022-01-19

### Added
Expand Down
103 changes: 94 additions & 9 deletions beaker/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import os
import urllib.parse
from collections import OrderedDict
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Union

import docker
import requests
Expand All @@ -12,10 +14,14 @@

from .config import Config
from .exceptions import *
from .version import VERSION

__all__ = ["Beaker"]


PathOrStr = Union[os.PathLike, Path]


class Beaker:
"""
A client for interacting with `Beaker <https://beaker.org>`_.
Expand Down Expand Up @@ -67,20 +73,26 @@ def request(
resource: str,
method: str = "GET",
query: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
exceptions_for_status: Optional[Dict[int, BeakerError]] = None,
headers: Optional[Dict[str, str]] = None,
token: Optional[str] = None,
base_url: Optional[str] = None,
) -> requests.Response:
with self._session_with_backoff() as session:
url = f"{self.base_url}/{resource}"
url = f"{base_url or self.base_url}/{resource}"
if query is not None:
url = url + "?" + urllib.parse.urlencode(query)
default_headers = {
"Authorization": f"Bearer {token or self.config.user_token}",
"Content-Type": "application/json",
}
if headers is not None:
default_headers.update(headers)
response = getattr(session, method.lower())(
url,
headers={
"Authorization": f"Bearer {self.config.user_token}",
"Content-Type": "application/json",
},
data=None if data is None else json.dumps(data),
headers=default_headers,
data=json.dumps(data) if isinstance(data, dict) else data,
)
if exceptions_for_status is not None and response.status_code in exceptions_for_status:
raise exceptions_for_status[response.status_code]
Expand Down Expand Up @@ -178,7 +190,80 @@ def get_dataset(self, dataset_id: str) -> Dict[str, Any]:
"""
return self.request(
f"datasets/{dataset_id}", exceptions_for_status={404: DatasetNotFound(dataset_id)}
f"datasets/{urllib.parse.quote(dataset_id, safe='')}",
exceptions_for_status={404: DatasetNotFound(dataset_id)},
).json()

def delete_dataset(self, dataset_id: str):
self.request(
f"datasets/{urllib.parse.quote(dataset_id, safe='')}",
method="DELETE",
exceptions_for_status={404: DatasetNotFound(dataset_id)},
)

def create_dataset(
self,
name: str,
source: PathOrStr,
target: Optional[str] = None,
workspace: Optional[str] = None,
force: bool = False,
) -> Dict[str, Any]:
"""
Create a dataset with the source file(s).
"""
workspace_name = workspace or self.config.default_workspace
if workspace_name is None:
raise ValueError("'workspace' argument required")

# Ensure workspace exists.
self.ensure_workspace(workspace_name)

# Ensure source exists.
source: Path = Path(source)
if not source.exists():
raise FileNotFoundError(source)

if not source.is_file():
raise NotImplementedError("'create_dataset()' only works for single files so far")

# Create the dataset.
def make_dataset() -> Dict[str, Any]:
return self.request(
"datasets",
method="POST",
query={"name": name},
data={"workspace": workspace_name, "fileheap": True},
exceptions_for_status={409: DatasetConflict(name)},
).json()

try:
dataset_info = make_dataset()
except DatasetConflict:
if force:
self.delete_dataset(f"{self.user}/{name}")
dataset_info = make_dataset()
else:
raise

# Upload the file.
with source.open("rb") as source_file:
self.request(
f"datasets/{dataset_info['storage']['id']}/files/{target or source.name}",
method="PUT",
data=source_file,
token=dataset_info["storage"]["token"],
base_url=dataset_info["storage"]["address"],
headers={
"User-Agent": f"beaker-py v{VERSION}",
},
)

# Commit the dataset.
return self.request(
f"datasets/{dataset_info['id']}",
method="PATCH",
data={"commit": True},
).json()

def get_logs(self, job_id: str) -> Generator[bytes, None, None]:
Expand Down
4 changes: 4 additions & 0 deletions beaker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class ExperimentConflict(BeakerError):
pass


class DatasetConflict(BeakerError):
pass


class DatasetNotFound(BeakerError):
pass

Expand Down
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[mypy]
ignore_missing_imports = true
no_site_packages = true
allow_redefinition = true

[mypy-tests.*]
strict_optional = false

0 comments on commit 0f3783f

Please sign in to comment.