Skip to content

Commit

Permalink
RESTJob.download_result(s) finetuning
Browse files Browse the repository at this point in the history
- based on Open-EO#128
- more flexible file/folder handling
- automatically create parent folder
- support multiple result files: `download_results` vs `download_result`
- add unit tests
  • Loading branch information
soxofaan committed Apr 3, 2020
1 parent c3f5bf5 commit 0976e2c
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 20 deletions.
1 change: 1 addition & 0 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def list_processgraphs(self, process_graph):

@property
def _api_version(self) -> ComparableVersion:
# TODO make this a public property (it's also useful outside the Connection class)
return self.capabilities().api_version_check

def load_collection(self, collection_id: str, **kwargs) -> Union[ImageCollectionClient, DataCube]:
Expand Down
92 changes: 73 additions & 19 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import datetime
import logging
import pathlib
import time
import typing
import urllib.request
from typing import List, Union
from pathlib import Path
from requests import ConnectionError
from typing import List, Union, Dict
from deprecated import deprecated

from openeo.job import Job, JobResult, JobLogEntry
from openeo.rest import OpenEoClientException, JobFailedException
from requests import ConnectionError
from openeo.util import ensure_dir

if hasattr(typing, 'TYPE_CHECKING') and typing.TYPE_CHECKING:
# Only import this for type hinting purposes. Runtime import causes circular dependency issues.
Expand Down Expand Up @@ -75,52 +77,103 @@ def list_results(self, type=None):
# GET /jobs/{job_id}/results
raise NotImplementedError

def download_results(self, target: Union[str, pathlib.Path]) -> pathlib.Path:
""" Download job results."""
def _download_get_assets(self) -> Dict[str, dict]:
results_url = "/jobs/{}/results".format(self.job_id)
r = self.connection.get(results_url, expected_status=200)
links = r.json()["links"]
if len(links) != 1:
# TODO handle download of multiple files?
raise OpenEoClientException("Expected 1 result file to download, but got {c}".format(c=len(links)))
file_url = links[0]["href"]

target = pathlib.Path(target)
with target.open('wb') as handle:
response = self.connection.get(file_url, stream=True)
response = self.connection.get(results_url, expected_status=200).json()
if "assets" in response:
# API 1.0 style: dictionary mapping filenames to metadata dict (with at least a "href" field)
return response["assets"]
else:
# Best effort translation of on old style to "assets" style (#134)
return {a["href"].split("/")[-1]: a for a in response["links"]}

def _download_url(self, url: str, path: Path):
ensure_dir(path.parent)
with path.open('wb') as handle:
# TODO: finetune download parameters/chunking?
response = self.connection.get(url, stream=True)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
return path

def download_result(self, target: Union[str, Path] = None) -> Path:
"""
Download single job result to the target file path or into folder (current working dir by default).
Fails if there are multiple result files.
:param target: String or path where the file should be downloaded to.
"""
assets = self._download_get_assets()
if len(assets) != 1:
raise OpenEoClientException(
"Expected one result file to download, but got {c}: {u!r}".format(c=len(assets), u=assets))
filename, metadata = assets.popitem()
url = metadata["href"]

target = Path(target or Path.cwd())
if target.is_dir():
target = target / filename

self._download_url(url, target)
return target

def download_results(self, target: Union[str, Path] = None) -> Dict[Path, dict]:
"""
Download job results into given folder (current working dir by default).
The names of the files are taken directly from the backend.
:param target: String/path, folder where to put the result files.
:return: file_list: Dict containing the downloaded file path as value and asset metadata
"""
target = Path(target or Path.cwd())
if target.exists() and not target.is_dir():
raise OpenEoClientException("The target argument must be a folder. Got {t!r}".format(t=str(target)))

assets = {target / f: m for (f, m) in self._download_get_assets().items()}
if len(assets) == 0:
raise OpenEoClientException("Expected at least one result file to download, but got 0.")

for path, metadata in assets.items():
self._download_url(metadata["href"], path)

return assets

# TODO: All below methods are deprecated (at least not specified in the coreAPI)
@deprecated
def download(self, outputfile: str, outputformat=None):
""" Download the result as a raster."""
try:
return self.connection.download_job(self.job_id, outputfile, outputformat)
except ConnectionAbortedError as e:
return print(str(e))

@deprecated
def status(self):
""" Returns the status of the job."""
return self.connection.job_info(self.job_id)['status']

@deprecated
def queue(self):
""" Queues the job. """
return self.connection.queue_job(self.job_id)

@deprecated
def results(self) -> List[RESTJobResult]:
""" Returns this job's results. """
return [RESTJobResult(link['href']) for link in self.connection.job_results(self.job_id)['links']]

""" Retrieve job logs."""

def logs(self, offset=None) -> List[JobLogEntry]:
return[JobLogEntry(log_entry['id'], log_entry['level'], log_entry['message'])
for log_entry in self.connection.job_logs(self.job_id, offset)['logs']]
return [JobLogEntry(log_entry['id'], log_entry['level'], log_entry['message'])
for log_entry in self.connection.job_logs(self.job_id, offset)['logs']]

@classmethod
def run_synchronous(cls, job, outputfile: Union[str, pathlib.Path],
def run_synchronous(cls, job, outputfile: Union[str, Path],
print=print, max_poll_interval=60, connection_retry_interval=30):
job.start_job()

Expand Down Expand Up @@ -151,7 +204,8 @@ def run_synchronous(cls, job, outputfile: Union[str, pathlib.Path],

elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
if status == 'finished':
job.download_results(outputfile)
# TODO: support downloading multiple results
job.download_result(outputfile)
else:
raise JobFailedException("Batch job {i} didn't finish properly. Status: {s} (after {t}).".format(
i=job_id, s=status, t=elapsed
Expand Down
99 changes: 98 additions & 1 deletion tests/rest/test_job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import re

import openeo
from openeo.rest import JobFailedException
from openeo.rest import JobFailedException, OpenEoClientException
import pytest

from openeo.rest.job import RESTJob

API_URL = "https://oeo.net"

TIFF_CONTENT = b'T1f7D6t6l0l' * 1000


@pytest.fixture
def session040(requests_mock):
Expand Down Expand Up @@ -130,3 +134,96 @@ def check_request(request):

requests_mock.post(API_URL + "/jobs", headers={"OpenEO-Identifier": "f00ba5"}, additional_matcher=check_request)
con100.create_job({"foo1": {"process_id": "foo"}}, title="Foo")


def test_download_result_040(session040, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"links": [
{"href": API_URL + "/dl/jjr1.tiff"},
]})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
job = RESTJob("jj", connection=session040)
target = tmp_path / "result.tiff"
res = job.download_result(target)
assert res == target
with target.open("rb") as f:
assert f.read() == TIFF_CONTENT


def test_download_result(con100, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff"},
}})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
job = RESTJob("jj", connection=con100)
target = tmp_path / "result.tiff"
res = job.download_result(target)
assert res == target
with target.open("rb") as f:
assert f.read() == TIFF_CONTENT


def test_download_result_folder(con100, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff"},
}})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
job = RESTJob("jj", connection=con100)
target = tmp_path / "folder"
target.mkdir()
res = job.download_result(target)
assert res == target / "1.tiff"
assert list(p.name for p in target.iterdir()) == ["1.tiff"]
with (target / "1.tiff").open("rb") as f:
assert f.read() == TIFF_CONTENT


def test_download_result_multiple(con100, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff"},
"2.tiff": {"href": API_URL + "/dl/jjr2.tiff"},
}})
job = RESTJob("jj", connection=con100)
with pytest.raises(OpenEoClientException, match="Expected one result file to download, but got 2"):
job.download_result(tmp_path / "res.tiff")


def test_download_results_040(session040, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"links": [
{"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff"},
{"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff"},
]})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT)
job = RESTJob("jj", connection=session040)
target = tmp_path / "folder"
target.mkdir()
downloads = job.download_results(target)
assert downloads == {
target / "jjr1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff"},
target / "jjr2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff"},
}
assert set(p.name for p in target.iterdir()) == {"jjr1.tiff", "jjr2.tiff"}
with (target / "jjr1.tiff").open("rb") as f:
assert f.read() == TIFF_CONTENT
with (target / "jjr2.tiff").open("rb") as f:
assert f.read() == TIFF_CONTENT


def test_download_results(con100, requests_mock, tmp_path):
requests_mock.get(API_URL + "/jobs/jj/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
"2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"},
}})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT)
job = RESTJob("jj", connection=con100)
target = tmp_path / "folder"
target.mkdir()
downloads = job.download_results(target)
assert downloads == {
target / "1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
target / "2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"},
}
assert set(p.name for p in target.iterdir()) == {"1.tiff", "2.tiff"}
with (target / "1.tiff").open("rb") as f:
assert f.read() == TIFF_CONTENT

0 comments on commit 0976e2c

Please sign in to comment.