diff --git a/snapcraft/_store.py b/snapcraft/_store.py
index e9eda00cf8..ddcbfdf75a 100644
--- a/snapcraft/_store.py
+++ b/snapcraft/_store.py
@@ -26,7 +26,7 @@
import tempfile
from datetime import datetime
from subprocess import Popen
-from typing import Dict, Iterable, TextIO
+from typing import Dict, Iterable, Optional, TextIO
# Ideally we would move stuff into more logical components
from snapcraft.cli import echo
@@ -761,10 +761,19 @@ def close(snap_name, channel_names):
logger.info(msg)
-def download(snap_name, channel, download_path, arch, except_hash=""):
+def download(
+ snap_name,
+ *,
+ arch: str,
+ download_path: str,
+ risk: str,
+ track: Optional[str] = None,
+ except_hash=""
+):
"""Download snap from the store to download_path.
:param str snap_name: The snap name to download.
- :param str channel: the channel to get the snap from.
+ :param str risk: the channel risk get the snap from.
+ :param str track: the specific channel track get the snap from.
:param str download_path: the path to write the downloaded snap to.
:param str arch: the architecture of the download as a deb arch.
:param str except_hash: do not download if set to a sha3_384 hash that
@@ -775,7 +784,14 @@ def download(snap_name, channel, download_path, arch, except_hash=""):
:returns: A sha3_384 of the file that was or would have been downloaded.
"""
store = storeapi.StoreClient()
- return store.download(snap_name, channel, download_path, arch, except_hash)
+ return store.download(
+ snap_name,
+ risk=risk,
+ track=track,
+ download_path=download_path,
+ arch=arch,
+ except_hash=except_hash,
+ )
def status(snap_name, series, arch):
@@ -888,13 +904,13 @@ def validate(snap_name, validations, revoke=False, key=None):
for validation in validations:
gated_name, rev = validation.split("=", 1)
echo.info("Getting details for {}".format(gated_name))
- approved_data = store.cpi.get_package(gated_name, "stable")
+ approved_data = store.cpi.get_info(gated_name)
assertion = {
"type": "validation",
"authority-id": authority_id,
"series": release,
"snap-id": snap_id,
- "approved-snap-id": approved_data["snap_id"],
+ "approved-snap-id": approved_data.snap_id,
"approved-snap-revision": rev,
"timestamp": datetime.utcnow().isoformat() + "Z",
"revoked": "false",
diff --git a/snapcraft/file_utils.py b/snapcraft/file_utils.py
index e95024ce03..5094c09c2c 100644
--- a/snapcraft/file_utils.py
+++ b/snapcraft/file_utils.py
@@ -1,6 +1,6 @@
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
-# Copyright (C) 2016 Canonical Ltd
+# Copyright (C) 2016-2019 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
@@ -311,6 +311,14 @@ def requires_path_exists(path: str, error_fmt: str = None) -> Generator:
yield
+def _file_reader_iter(path: str, block_size=2 ** 20):
+ with open(path, "rb") as f:
+ block = f.read(block_size)
+ while len(block) > 0:
+ yield block
+ block = f.read(block_size)
+
+
def calculate_sha3_384(path: str) -> str:
"""Calculate sha3 384 hash, reading the file in 1MB chunks."""
return calculate_hash(path, algorithm="sha3_384")
@@ -321,13 +329,8 @@ def calculate_hash(path: str, *, algorithm: str) -> str:
# This will raise an AttributeError if algorithm is unsupported
hasher = getattr(hashlib, algorithm)()
- blocksize = 2 ** 20
- with open(path, "rb") as f:
- while True:
- buf = f.read(blocksize)
- if not buf:
- break
- hasher.update(buf)
+ for block in _file_reader_iter(path):
+ hasher.update(block)
return hasher.hexdigest()
diff --git a/snapcraft/internal/build_providers/_snap.py b/snapcraft/internal/build_providers/_snap.py
index c49940cba5..94546019f6 100644
--- a/snapcraft/internal/build_providers/_snap.py
+++ b/snapcraft/internal/build_providers/_snap.py
@@ -42,7 +42,37 @@ class _SnapOp(enum.Enum):
REFRESH = 3
-def _get_snap_channel(snap_name: str) -> str:
+_VALID_RISKS = ["stable", "candidate", "beta", "edge"]
+
+
+class _Channel:
+ def __str__(self) -> str:
+ return self._channel
+
+ def __init__(self, channel: str) -> None:
+ channel_parts = channel.split("/")
+ if len(channel_parts) == 1:
+ self.track = None
+ self.risk = channel_parts[0]
+ self.branch = None
+ elif len(channel_parts) == 3:
+ self.track = channel_parts[0]
+ self.risk = channel_parts[1]
+ self.branch = channel_parts[2]
+ elif len(channel_parts) == 2 and channel_parts[0] in _VALID_RISKS:
+ self.track = None
+ self.risk = channel_parts[0]
+ self.branch = channel_parts[1]
+ elif len(channel_parts) == 2 and channel_parts[1] in _VALID_RISKS:
+ self.track = channel_parts[0]
+ self.risk = channel_parts[1]
+ self.branch = None
+ else:
+ raise RuntimeError("Channel logic failed for: {!r]".format(channel))
+ self._channel = channel
+
+
+def _get_snap_channel(snap_name: str) -> _Channel:
"""Returns the channel to use for snap_name."""
env_channel = os.getenv("SNAPCRAFT_BUILD_ENVIRONMENT_CHANNEL_SNAPCRAFT", None)
if env_channel is not None and snap_name == "snapcraft":
@@ -54,7 +84,7 @@ def _get_snap_channel(snap_name: str) -> str:
else:
channel = "latest/stable"
- return channel
+ return _Channel(channel)
class _SnapManager:
@@ -144,7 +174,7 @@ def get_assertions(self) -> List[List[str]]:
if host_snap_info["revision"].startswith("x"):
return []
- assertions = [] # type: List[List[str]]
+ assertions = list() # type: List[List[str]]
assertions.append(
["snap-declaration", "snap-name={}".format(host_snap_repo.name)]
)
@@ -160,10 +190,12 @@ def get_assertions(self) -> List[List[str]]:
def _set_data(self) -> None:
op = self.get_op()
host_snap_repo = self._get_snap_repo()
- install_cmd = ["snap"]
+
+ install_cmd = [] # type: List[str]
+ snap_revision = None
if op == _SnapOp.INJECT:
- install_cmd.append("install")
+ install_cmd = ["snap", "install"]
host_snap_info = host_snap_repo.get_local_snap_info()
snap_revision = host_snap_info["revision"]
@@ -176,20 +208,17 @@ def _set_data(self) -> None:
snap_file_name = "{}_{}.snap".format(host_snap_repo.name, snap_revision)
install_cmd.append(os.path.join(self._snap_dir, snap_file_name))
elif op == _SnapOp.INSTALL or op == _SnapOp.REFRESH:
- install_cmd.append(op.name.lower())
+ install_cmd = ["snap", op.name.lower()]
snap_channel = _get_snap_channel(self.snap_name)
- store_snap_info = storeapi.StoreClient().cpi.get_package(
- self.snap_name, snap_channel, self._snap_arch
+ store_snap_info = storeapi.StoreClient().cpi.get_info(self.snap_name)
+ snap_channel_map = store_snap_info.get_channel_mapping(
+ risk=snap_channel.risk, track=snap_channel.track, arch=self._snap_arch
)
- snap_revision = store_snap_info["revision"]
- confinement = store_snap_info["confinement"]
- if confinement == "classic":
+ snap_revision = snap_channel_map.revision
+ if snap_channel_map.confinement == "classic":
install_cmd.append("--classic")
- install_cmd.extend(["--channel", snap_channel])
+ install_cmd.extend(["--channel", snap_channel_map.channel_details.name])
install_cmd.append(host_snap_repo.name)
- elif op == _SnapOp.NOP:
- install_cmd = []
- snap_revision = None
self.__install_cmd = install_cmd
self.__revision = snap_revision
diff --git a/snapcraft/plugins/kernel.py b/snapcraft/plugins/kernel.py
index 1bfdbded2d..4b09f3a588 100644
--- a/snapcraft/plugins/kernel.py
+++ b/snapcraft/plugins/kernel.py
@@ -507,7 +507,12 @@ def _do_check_initrd(self, builtin, modules):
def pull(self):
super().pull()
- snapcraft.download("core", "stable", self.os_snap, self.project.deb_arch)
+ snapcraft.download(
+ "core",
+ risk="stable",
+ download_path=self.os_snap,
+ arch=self.project.deb_arch,
+ )
def do_configure(self):
super().do_configure()
diff --git a/snapcraft/storeapi/_client.py b/snapcraft/storeapi/_client.py
index 88f84d3851..2cc6a1a89c 100644
--- a/snapcraft/storeapi/_client.py
+++ b/snapcraft/storeapi/_client.py
@@ -1,3 +1,19 @@
+# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
+#
+# Copyright 2016-2019 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
import logging
import requests
from requests.adapters import HTTPAdapter
@@ -12,6 +28,7 @@
# Set urllib3's logger to only emit errors, not warnings. Otherwise even
# retries are printed, and they're nasty.
logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR)
+logger = logging.getLogger(__name__)
class Client:
@@ -60,6 +77,11 @@ def request(self, method, url, params=None, headers=None, **kwargs):
headers = self._snapcraft_headers
final_url = urllib.parse.urljoin(self.root_url, url)
+ logger.debug(
+ "Calling {} with params {} and headers {}".format(
+ final_url, params, headers
+ )
+ )
try:
response = self.session.request(
method, final_url, headers=headers, params=params, **kwargs
diff --git a/snapcraft/storeapi/_snap_index_client.py b/snapcraft/storeapi/_snap_index_client.py
index d87462677b..7131ceb4eb 100644
--- a/snapcraft/storeapi/_snap_index_client.py
+++ b/snapcraft/storeapi/_snap_index_client.py
@@ -1,8 +1,25 @@
+# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
+#
+# Copyright 2016-2019 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
import contextlib
import os
from typing import Dict
from ._client import Client
+from .info import SnapInfo
from . import errors
from . import logger, _macaroon_auth
@@ -28,7 +45,7 @@ def __init__(self, conf):
),
)
- def get_default_headers(self):
+ def get_default_headers(self, api="v2"):
"""Return default headers for CPI requests.
Tries to build an 'Authorization' header with local credentials
if they are available.
@@ -42,44 +59,44 @@ def get_default_headers(self):
branded_store = os.getenv("SNAPCRAFT_UBUNTU_STORE")
if branded_store:
- headers["X-Ubuntu-Store"] = branded_store
+ if api == "v2":
+ headers["Snap-Device-Store"] = branded_store
+ elif api == "v1":
+ headers["X-Ubuntu-Store"] = branded_store
+ else:
+ logger.warning("Incorrect API version passed: {!r}.".format(api))
return headers
- def get_package(self, snap_name, channel=None, arch=None):
- """Get the details for the specified snap.
+ def get_info(self, snap_name: str, *, arch: str = None) -> SnapInfo:
+ """Get the information for the specified snap.
:param str snap_name: Name of the snap.
- :param str channel: Channel of the snap.
:param str arch: Architecture of the snap (none by default).
- :return Details for the snap.
- :rtype: dict
+ :return information for the snap.
+ :rtype: SnapInfo
"""
headers = self.get_default_headers()
headers.update(
{
- "Accept": "application/hal+json",
- "X-Ubuntu-Series": constants.DEFAULT_SERIES,
+ "Accept": "application/json",
+ "Snap-Device-Series": constants.DEFAULT_SERIES,
}
)
- if arch:
- headers["X-Ubuntu-Architecture"] = arch
-
- params = {
- # FIXME LP: #1662665
- "fields": "status,confinement,anon_download_url,download_url,"
- "download_sha3_384,download_sha512,snap_id,"
- "revision,release"
- }
- if channel is not None:
- params["channel"] = channel
- logger.debug("Getting details for {}".format(snap_name))
- url = "api/v1/snaps/details/{}".format(snap_name)
+
+ params = dict()
+ params["fields"] = "channel-map,snap-id,name,publisher,confinement,revision"
+ if arch is not None:
+ params["architecture"] = arch
+ logger.debug("Getting information for {}".format(snap_name))
+ url = "v2/snaps/info/{}".format(snap_name)
resp = self.get(url, headers=headers, params=params)
- if resp.status_code != 200:
- raise errors.SnapNotFoundError(snap_name, channel, arch)
- return resp.json()
+ if resp.status_code == 404:
+ raise errors.SnapNotFoundError(snap_name, arch)
+ resp.raise_for_status()
+
+ return SnapInfo(**resp.json())
def get_assertion(
self, assertion_type: str, snap_id: str
@@ -91,7 +108,7 @@ def get_assertion(
:return Assertion for the snap.
"""
- headers = self.get_default_headers()
+ headers = self.get_default_headers(api="v1")
logger.debug("Getting snap-declaration for {}".format(snap_id))
url = "/api/v1/snaps/assertions/{}/{}/{}".format(
assertion_type, constants.DEFAULT_SERIES, snap_id
diff --git a/snapcraft/storeapi/_store_client.py b/snapcraft/storeapi/_store_client.py
index b0552029f9..7ad7ef272e 100644
--- a/snapcraft/storeapi/_store_client.py
+++ b/snapcraft/storeapi/_store_client.py
@@ -1,4 +1,19 @@
-import hashlib
+# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
+#
+# Copyright 2016-2019 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
import os
import urllib.parse
from time import sleep
@@ -236,39 +251,45 @@ def close_channels(self, snap_id, channel_names):
self.sca.close_channels, snap_id, channel_names
)
- def download(self, snap_name, channel, download_path, arch=None, except_hash=""):
+ def download(
+ self,
+ snap_name,
+ *,
+ risk: str,
+ download_path: str,
+ track: str = None,
+ arch: str = None,
+ except_hash=""
+ ):
if arch is None:
arch = snapcraft.ProjectOptions().deb_arch
- package = self.cpi.get_package(snap_name, channel, arch)
- if package["download_sha3_384"] != except_hash:
- self._download_snap(
- snap_name,
- channel,
- arch,
- download_path,
- # FIXME LP: #1662665
- package["anon_download_url"],
- package["download_sha512"],
- )
- return package["download_sha3_384"]
+ snap_info = self.cpi.get_info(snap_name)
+ channel_mapping = snap_info.get_channel_mapping(
+ risk=risk, track=track, arch=arch
+ )
+ if channel_mapping.download.sha3_384 == except_hash:
+ return channel_mapping.download.sha3_384
- def _download_snap(
- self, name, channel, arch, download_path, download_url, expected_sha512
- ):
- if self._is_downloaded(download_path, expected_sha512):
- logger.info("Already downloaded {} at {}".format(name, download_path))
- return
- logger.info("Downloading {}".format(name))
+ try:
+ channel_mapping.download.verify(download_path)
+ except errors.StoreDownloadError:
+ self._download_snap(channel_mapping.download, download_path)
+
+ channel_mapping.download.verify(download_path)
+ return channel_mapping.download.sha3_384
+ def _download_snap(self, download_details, download_path):
# we only resume when redirected to our CDN since we use internap's
# special sauce.
- resume_possible = False
total_read = 0
- probe_url = requests.head(download_url)
+ probe_url = requests.head(download_details.url)
if probe_url.is_redirect and "internap" in probe_url.headers["Location"]:
download_url = probe_url.headers["Location"]
resume_possible = True
+ else:
+ download_url = download_details.url
+ resume_possible = False
# HttpAdapter cannot help here as this is a stream.
# LP: #1617765
@@ -301,21 +322,6 @@ def _download_snap(
raise e
sleep(1)
- if self._is_downloaded(download_path, expected_sha512):
- logger.info("Successfully downloaded {} at {}".format(name, download_path))
- else:
- raise errors.SHAMismatchError(download_path, expected_sha512)
-
- def _is_downloaded(self, path, expected_sha512):
- if not os.path.exists(path):
- return False
-
- file_sum = hashlib.sha512()
- with open(path, "rb") as f:
- for file_chunk in iter(lambda: f.read(file_sum.block_size * 128), b""):
- file_sum.update(file_chunk)
- return expected_sha512 == file_sum.hexdigest()
-
def push_assertion(self, snap_id, assertion, endpoint, force=False):
return self.sca.push_assertion(snap_id, assertion, endpoint, force)
diff --git a/snapcraft/storeapi/errors.py b/snapcraft/storeapi/errors.py
index ca0ba60998..c471fc7b73 100644
--- a/snapcraft/storeapi/errors.py
+++ b/snapcraft/storeapi/errors.py
@@ -1,6 +1,6 @@
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
-# Copyright 2016-2017 Canonical Ltd
+# Copyright 2016-2019 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
@@ -116,12 +116,26 @@ def __init__(self, snap_name):
super().__init__(snap_name=snap_name, forum_url=_FORUM_URL)
-class SHAMismatchError(StoreError):
+class StoreDownloadError(StoreError):
+ pass
- fmt = "SHA512 checksum for {path} is not {expected_sha}."
- def __init__(self, path, expected_sha):
- super().__init__(path=path, expected_sha=expected_sha)
+class DownloadNotFoundError(StoreDownloadError):
+
+ fmt = "Downloaded file not found {path!r}."
+
+ def __init__(self, *, path: str) -> None:
+ super().__init__(path=path)
+
+
+class SHAMismatchError(StoreDownloadError):
+
+ fmt = (
+ "The SHA3-384 checksum for {path!r} was {calculated!r}: expected {expected!r}."
+ )
+
+ def __init__(self, *, path: str, expected: str, calculated: str) -> None:
+ super().__init__(path=path, expected=expected, calculated=calculated)
class StoreAuthenticationError(StoreError):
diff --git a/snapcraft/storeapi/info.py b/snapcraft/storeapi/info.py
new file mode 100644
index 0000000000..5ec889fd35
--- /dev/null
+++ b/snapcraft/storeapi/info.py
@@ -0,0 +1,228 @@
+# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
+#
+# Copyright (C) 2019 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+from typing import List, Optional
+
+from . import errors
+from snapcraft.file_utils import calculate_hash
+
+
+"""
+This module holds representations for results from the v2 info API provided by
+the Snap Store.
+
+The full API is documented on https://api.snapcraft.io/api-docs/info.html
+"""
+
+# FIXME: attributes are manually created to keep the version of mypy we are using happy.
+# This might be a good thing in the long run.
+
+
+class SnapChannelDetails:
+ def __repr__(self) -> str:
+ return "".format(
+ self.name, self.architecture, self.released_at
+ )
+
+ def __init__(self, **details) -> None:
+ self._payload = details
+
+ @property
+ def architecture(self) -> str:
+ return self._payload["architecture"]
+
+ @property
+ def name(self) -> str:
+ return self._payload["name"]
+
+ @property
+ def released_at(self) -> str:
+ return self._payload["released-at"]
+
+ @property
+ def risk(self) -> str:
+ return self._payload["risk"]
+
+ @property
+ def track(self) -> str:
+ return self._payload["track"]
+
+
+class SnapDownloadDetails:
+ def __repr__(self) -> str:
+ return "".format(self.url, self.size)
+
+ def __init__(self, **download) -> None:
+ self._payload = download
+
+ @property
+ def url(self) -> str:
+ return self._payload["url"]
+
+ @property
+ def size(self) -> int:
+ return self._payload["size"]
+
+ @property
+ def sha3_384(self) -> str:
+ return self._payload["sha3-384"]
+
+ def verify(self, path: str) -> None:
+ if not os.path.exists(path):
+ raise errors.DownloadNotFoundError(path=path)
+
+ calculated_hash = calculate_hash(path, algorithm="sha3_384")
+ if self.sha3_384 != calculated_hash:
+ raise errors.SHAMismatchError(
+ path=path, expected=self.sha3_384, calculated=calculated_hash
+ )
+
+
+class SnapPublisherDetails:
+ def __repr__(self) -> str:
+ return "".format(self.username)
+
+ def __init__(self, **publisher) -> None:
+ self._payload = publisher
+
+ @property
+ def id(self) -> str:
+ return self._payload["id"]
+
+ @property
+ def display_name(self) -> str:
+ return self._payload["display-name"]
+
+ @property
+ def username(self) -> str:
+ return self._payload["username"]
+
+ @property
+ def validation(self) -> str:
+ return self._payload["validation"]
+
+
+class SnapDetails:
+ def __repr__(self) -> str:
+ return "".format(self.name, self.snap_id)
+
+ def __init__(self, **snap) -> None:
+ self._payload = snap
+ self._publisher = None # type: Optional[SnapPublisherDetails]
+
+ @property
+ def publisher(self) -> SnapPublisherDetails:
+ if self._publisher is None:
+ self._publisher = SnapPublisherDetails(**self._payload.get("publisher"))
+ return self._publisher
+
+ @property
+ def name(self) -> str:
+ return self._payload["name"]
+
+ @property
+ def snap_id(self) -> str:
+ return self._payload["snap-id"]
+
+
+class SnapChannelMapping:
+ def __repr__(self) -> str:
+ return "".format(
+ self.channel_details, self.revision
+ )
+
+ def __init__(self, **channel) -> None:
+ self._payload = channel
+ self._channel_details = None # type: Optional[SnapChannelDetails]
+ self._download = None # type: Optional[SnapDownloadDetails]
+
+ @property
+ def channel_details(self) -> SnapChannelDetails:
+ if self._channel_details is None:
+ self._channel_details = SnapChannelDetails(**self._payload.get("channel"))
+ return self._channel_details
+
+ @property
+ def download(self) -> SnapDownloadDetails:
+ if self._download is None:
+ self._download = SnapDownloadDetails(**self._payload.get("download"))
+ return self._download
+
+ @property
+ def revision(self) -> int:
+ return self._payload["revision"]
+
+ @property
+ def confinement(self) -> str:
+ return self._payload["confinement"]
+
+ @property
+ def version(self) -> str:
+ return self._payload["version"]
+
+
+class SnapInfo:
+ def __repr__(self) -> str:
+ return "".format(self.name)
+
+ def __init__(self, **snap_info_resp) -> None:
+ self._payload = snap_info_resp
+ self._channel_map = None # type: Optional[List[SnapChannelMapping]]
+ self._snap = None # type: Optional[SnapDetails]
+
+ @property
+ def channel_map(self) -> List[SnapChannelMapping]:
+ if self._channel_map is None:
+ self._channel_map = [
+ SnapChannelMapping(**i) for i in self._payload.get("channel-map", [])
+ ]
+ return self._channel_map
+
+ @property
+ def snap(self) -> SnapDetails:
+ if self._snap is None:
+ self._snap = SnapDetails(**self._payload.get("snap"))
+ return self._snap
+
+ @property
+ def name(self) -> str:
+ return self._payload["name"]
+
+ @property
+ def snap_id(self) -> str:
+ return self._payload["snap-id"]
+
+ def get_channel_mapping(
+ self, *, risk: str, arch: str, track: Optional[str] = None
+ ) -> SnapChannelMapping:
+ if track is None:
+ track_filter = "latest"
+ else:
+ track_filter = track
+
+ arch_match = (
+ c for c in self.channel_map if c.channel_details.architecture == arch
+ )
+ track_match = (c for c in arch_match if c.channel_details.track == track_filter)
+ risk_match = [c for c in track_match if c.channel_details.risk == risk]
+
+ if not risk_match:
+ channel = "{}/{}".format(track, risk) if track else risk
+ raise errors.SnapNotFoundError(name=self.name, channel=channel)
+
+ # We assume the API will not return duplicate mappings
+ return risk_match[0]
diff --git a/tests/fake_servers/search.py b/tests/fake_servers/search.py
index e4953d20a8..f8ebfefe78 100644
--- a/tests/fake_servers/search.py
+++ b/tests/fake_servers/search.py
@@ -1,6 +1,6 @@
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
-# Copyright (C) 2016, 2017-2018 Canonical Ltd
+# Copyright (C) 2016-2019 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
@@ -33,22 +33,16 @@ class FakeStoreSearchServer(base.BaseFakeServer):
# XXX This fake server as reused as download server, to avoid passing a
# port as an argument. --elopio - 2016-05-01
- _API_PATH = "/api/v1/"
-
def configure(self, configurator):
- configurator.add_route(
- "details",
- urllib.parse.urljoin(self._API_PATH, "snaps/details/{snap}"),
- request_method="GET",
- )
- configurator.add_view(self.details, route_name="details")
+ configurator.add_route("info", "/v2/snaps/info/{snap}", request_method="GET")
+ configurator.add_view(self.info, route_name="info")
configurator.add_route(
"download", "/download-snap/{snap}", request_method="GET"
)
configurator.add_view(self.download, route_name="download")
- def details(self, request):
+ def info(self, request):
snap = request.matchdict["snap"]
logger.debug(
"Handling details request for package {}, with headers {}".format(
@@ -58,7 +52,7 @@ def details(self, request):
if "User-Agent" not in request.headers:
response_code = 500
return response.Response(None, response_code)
- payload = self._get_details_payload(request)
+ payload = self._get_info_payload(request)
if payload is None:
response_code = 404
return response.Response(json.dumps({}).encode(), response_code)
@@ -68,50 +62,72 @@ def details(self, request):
payload, response_code, [("Content-Type", content_type)]
)
- def _get_details_payload(self, request):
+ def _get_info_payload(self, request):
# core snap is used in integration tests with fake servers.
snap = request.matchdict["snap"]
- # sha512sum tests/data/test-snap.snap
- test_sha512 = (
- "69d57dcacf4f126592d4e6ff689ad8bb8a083c7b9fe44f6e738ef"
- "d22a956457f14146f7f067b47bd976cf0292f2993ad864ccb498b"
- "fda4128234e4c201f28fe9"
+ # tests/data/test-snap.snap
+ test_sha3_384 = (
+ "8c0118831680a22090503ee5db98c88dd90ef551d80fc816"
+ "dec968f60527216199dacc040cddfe5cec6870db836cb908"
)
revision = "10000"
confinement = "strict"
if snap in ("test-snap", "core"):
- sha512 = test_sha512
+ sha3_384 = test_sha3_384
elif snap == "snapcraft":
- sha512 = test_sha512
+ sha3_384 = test_sha3_384
revision = "25"
confinement = "classic"
elif snap == "test-snap-with-wrong-sha":
- sha512 = "wrong sha"
- elif snap == "test-snap-branded-store":
- # Branded-store snaps require Store pinning and authorization.
- if (
- request.headers.get("X-Ubuntu-Store") != "Test-Branded"
- or request.headers.get("Authorization") is None
- ):
- return None
- sha512 = test_sha512
+ sha3_384 = "wrong sha"
+ elif (
+ snap == "test-snap-branded-store"
+ and request.headers.get("Snap-Device-Store") == "Test-Branded"
+ ):
+ sha3_384 = test_sha3_384
else:
return None
+ channel_map = list()
+ for arch in ("amd64", "i386", "s390x", "arm64", "armhf"):
+ for risk in ("stable", "edge"):
+ channel_map.append(
+ {
+ "channel": {
+ "architecture": arch,
+ "name": risk,
+ "released-at": "019-01-17T15:01:26.537392+00:00",
+ "risk": risk,
+ "track": "latest",
+ },
+ "download": {
+ "deltas": [],
+ "sha3-384": sha3_384,
+ "url": urllib.parse.urljoin(
+ "http://localhost:{}".format(self.server.server_port),
+ "download-snap/test-snap.snap",
+ ),
+ },
+ "created-at": "2019-01-16T14:59:16.711111+00:00",
+ "confinement": confinement,
+ "revision": revision,
+ }
+ )
+
return json.dumps(
{
- "anon_download_url": urllib.parse.urljoin(
- "http://localhost:{}".format(self.server.server_port),
- "download-snap/test-snap.snap",
- ),
- "download_sha3_384": "1234567890",
- "download_sha512": sha512,
- "snap_id": "good",
- "developer_id": snap + "-developer-id",
- "release": ["16"],
- "revision": revision,
- "confinement": confinement,
+ "channel-map": channel_map,
+ "snap": {
+ "name": snap,
+ "snap-id": "good",
+ "publisher": {
+ "id": snap + "-developer-id",
+ "validation": "unproven",
+ },
+ },
+ "snap-id": "good",
+ "name": snap,
}
).encode()
diff --git a/tests/integration/snaps/kernel-download/snapcraft.yaml b/tests/integration/snaps/kernel-download/snapcraft.yaml
index d777d0f4ae..d3ade6deea 100644
--- a/tests/integration/snaps/kernel-download/snapcraft.yaml
+++ b/tests/integration/snaps/kernel-download/snapcraft.yaml
@@ -1,4 +1,5 @@
name: kernel-download
+base: core
version: "0.1"
summary: Simple kernel snap to test download
description: test
diff --git a/tests/integration/store/test_store_download.py b/tests/integration/store/test_store_download.py
index 1774db013c..6654d2752f 100644
--- a/tests/integration/store/test_store_download.py
+++ b/tests/integration/store/test_store_download.py
@@ -18,7 +18,7 @@
from testtools.matchers import FileExists
-from tests import integration
+from tests import integration, skip
class DownloadTestCase(integration.StoreTestCase):
@@ -28,6 +28,7 @@ def setUp(self):
self.skipTest("There is no core snap in the staging server")
super().setUp()
+ @skip.skip_unless_codename("xenial", "test tailored for xenial")
def test_download_os_snap(self):
self.run_snapcraft("pull", "kernel-download")
self.assertThat(
diff --git a/tests/unit/build_providers/test_snap.py b/tests/unit/build_providers/test_snap.py
index 2ec23297df..b77b3e71f6 100644
--- a/tests/unit/build_providers/test_snap.py
+++ b/tests/unit/build_providers/test_snap.py
@@ -1,6 +1,6 @@
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
-# Copyright (C) 2018 Canonical Ltd
+# Copyright (C) 2018-2019 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
@@ -155,16 +155,9 @@ def test_snapcraft_installed_on_host_from_store_but_injection_disabled(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -287,16 +280,9 @@ def test_snapcraft_not_installed_on_host(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -343,16 +329,9 @@ def test_snapcraft_not_installed_on_host_with_channel_from_environment(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/edge",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "edge", "snapcraft"]
),
]
)
@@ -393,16 +372,9 @@ def test_no_registry(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -425,16 +397,9 @@ def test_no_registry(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -527,16 +492,9 @@ def test_snapcraft_installed_on_host_from_store_rerun_refreshes(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "refresh", "--channel", "latest/stable", "core"]),
+ call(["snap", "refresh", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "refresh",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "refresh", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -567,16 +525,9 @@ def test_snapd_not_on_host_installs_from_store(self):
[
call(["snap", "set", "core", ANY]),
call(["snap", "watch", "--last=auto-refresh"]),
- call(["snap", "install", "--channel", "latest/stable", "core"]),
+ call(["snap", "install", "--channel", "stable", "core"]),
call(
- [
- "snap",
- "install",
- "--classic",
- "--channel",
- "latest/stable",
- "snapcraft",
- ]
+ ["snap", "install", "--classic", "--channel", "stable", "snapcraft"]
),
]
)
@@ -606,7 +557,7 @@ def setUp(self):
self.useFixture(self.fake_logger)
def test_default_channel_for_snapcraft(self):
- self.assertThat(_get_snap_channel("snapcraft"), Equals("latest/stable"))
+ self.assertThat(str(_get_snap_channel("snapcraft")), Equals("latest/stable"))
self.assertThat(
self.fake_logger.output,
Not(
@@ -623,7 +574,7 @@ def test_channel_for_snapcraft_from_env(self):
"SNAPCRAFT_BUILD_ENVIRONMENT_CHANNEL_SNAPCRAFT", "latest/edge"
)
)
- self.assertThat(_get_snap_channel("snapcraft"), Equals("latest/edge"))
+ self.assertThat(str(_get_snap_channel("snapcraft")), Equals("latest/edge"))
self.assertThat(
self.fake_logger.output,
Contains(
@@ -633,7 +584,7 @@ def test_channel_for_snapcraft_from_env(self):
)
def test_default_channel_for_other_snap(self):
- self.assertThat(_get_snap_channel("core"), Equals("latest/stable"))
+ self.assertThat(str(_get_snap_channel("core")), Equals("latest/stable"))
self.assertThat(
self.fake_logger.output,
Not(
@@ -650,7 +601,7 @@ def test_channel_for_other_snap_not_affected_by_env(self):
"SNAPCRAFT_BUILD_ENVIRONMENT_CHANNEL_SNAPCRAFT", "latest/edge"
)
)
- self.assertThat(_get_snap_channel("core"), Equals("latest/stable"))
+ self.assertThat(str(_get_snap_channel("core")), Equals("latest/stable"))
self.assertThat(
self.fake_logger.output,
Not(
diff --git a/tests/unit/plugins/test_kernel.py b/tests/unit/plugins/test_kernel.py
index 5f8a5e0d50..def90443d8 100644
--- a/tests/unit/plugins/test_kernel.py
+++ b/tests/unit/plugins/test_kernel.py
@@ -1279,7 +1279,12 @@ def test_pull(self, download_mock):
plugin.pull()
download_mock.assert_called_once_with(
- "core", "stable", plugin.os_snap, self.project.deb_arch, ""
+ "core",
+ risk="stable",
+ track=None,
+ download_path=plugin.os_snap,
+ arch=self.project.deb_arch,
+ except_hash="",
)
def test_unsupported_base(self):
diff --git a/tests/unit/store/test_store_client.py b/tests/unit/store/test_store_client.py
index 689dc0d0bd..146f23abb5 100644
--- a/tests/unit/store/test_store_client.py
+++ b/tests/unit/store/test_store_client.py
@@ -1,6 +1,6 @@
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
-# Copyright 2016-2018 Canonical Ltd
+# Copyright 2016-2019 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
@@ -23,9 +23,9 @@
import fixtures
import pymacaroons
-from testtools.matchers import Contains, Equals
+from testtools.matchers import Contains, Equals, FileExists, Not
-from snapcraft import config, storeapi, ProjectOptions
+from snapcraft import config, storeapi
from snapcraft.storeapi import errors, constants
import tests
from tests import fixture_setup, unit
@@ -166,58 +166,43 @@ def test_failed_login_with_unregistered_snap(self):
class DownloadTestCase(StoreTestCase):
- # sha512 of tests/data/test-snap.snap
- EXPECTED_SHA512 = (
- "69D57DCACF4F126592D4E6FF689AD8BB8A083C7B9FE44F6E738EF"
- "d22a956457f14146f7f067b47bd976cf0292f2993ad864ccb498b"
- "fda4128234e4c201f28fe9"
- )
+ # sha3-384 of tests/data/test-snap.snap
+ EXPECTED_SHA3_384 = ""
- def test_download_unexisting_snap_raises_exception(self):
+ def test_download_nonexistent_snap_raises_exception(self):
self.client.login("dummy", "test correct password")
e = self.assertRaises(
errors.SnapNotFoundError,
self.client.download,
- "unexisting-snap",
- "test-channel",
- "dummy",
- "test-arch",
- )
- self.assertThat(
- str(e),
- Equals(
- "Snap 'unexisting-snap' for 'test-arch' cannot be found in "
- "the 'test-channel' channel."
- ),
+ "nonexistent-snap",
+ risk="stable",
+ download_path="dummy.snap",
+ arch="test-arch",
)
+ self.assertThat(str(e), Equals("Snap 'nonexistent-snap' was not found."))
def test_download_snap(self):
- self.fake_logger = fixtures.FakeLogger(level=logging.INFO)
- self.useFixture(self.fake_logger)
+ fake_logger = fixtures.FakeLogger(level=logging.INFO)
+ self.useFixture(fake_logger)
self.client.login("dummy", "test correct password")
download_path = os.path.join(self.path, "test-snap.snap")
- self.client.download("test-snap", "test-channel", download_path)
- self.assertIn(
- "Successfully downloaded test-snap at {}".format(download_path),
- self.fake_logger.output,
- )
+ self.client.download("test-snap", risk="stable", download_path=download_path)
+ self.assertThat(download_path, FileExists())
- def test_download_from_branded_store_requires_login(self):
- err = self.assertRaises(
+ def test_download_snap_missing_risk(self):
+ fake_logger = fixtures.FakeLogger(level=logging.INFO)
+ self.useFixture(fake_logger)
+ self.client.login("dummy", "test correct password")
+
+ e = self.assertRaises(
errors.SnapNotFoundError,
self.client.download,
- "test-snap-branded-store",
- "test-channel",
- "dummy",
+ "test-snap",
+ risk="beta",
+ download_path="dummy.snap",
)
-
- arch = ProjectOptions().deb_arch
self.assertThat(
- str(err),
- Equals(
- "Snap 'test-snap-branded-store' for '{}' cannot be found in "
- "the 'test-channel' channel.".format(arch)
- ),
+ str(e), Equals("Snap 'test-snap' was not found in the 'beta' channel.")
)
def test_download_from_branded_store_requires_store(self):
@@ -226,25 +211,20 @@ def test_download_from_branded_store_requires_store(self):
errors.SnapNotFoundError,
self.client.download,
"test-snap-branded-store",
- "test-channel",
- "dummy",
+ risk="stable",
+ download_path="dummy.snap",
)
- arch = ProjectOptions().deb_arch
self.assertThat(
- str(err),
- Equals(
- "Snap 'test-snap-branded-store' for '{}' cannot be found in "
- "the 'test-channel' channel.".format(arch)
- ),
+ str(err), Equals("Snap 'test-snap-branded-store' was not found.")
)
def test_download_from_branded_store(self):
- # Downloading from a branded-store requires login (authorization)
- # and setting 'SNAPCRAFT_UBUNTU_STORE' environment variable to the
+ # Downloading from a branded-store requires setting the
+ # 'SNAPCRAFT_UBUNTU_STORE' environment variable to the
# correct store 'slug' (the branded store identifier).
- self.fake_logger = fixtures.FakeLogger(level=logging.INFO)
- self.useFixture(self.fake_logger)
+ fake_logger = fixtures.FakeLogger(level=logging.INFO)
+ self.useFixture(fake_logger)
self.useFixture(
fixtures.EnvironmentVariable("SNAPCRAFT_UBUNTU_STORE", "Test-Branded")
@@ -252,41 +232,35 @@ def test_download_from_branded_store(self):
self.client.login("dummy", "test correct password")
download_path = os.path.join(self.path, "brand.snap")
- self.client.download("test-snap-branded-store", "test-channel", download_path)
-
- self.assertIn(
- "Successfully downloaded test-snap-branded-store at {}".format(
- download_path
- ),
- self.fake_logger.output,
+ self.client.download(
+ "test-snap-branded-store", risk="stable", download_path=download_path
)
+ self.assertThat(download_path, FileExists())
def test_download_already_downloaded_snap(self):
- self.fake_logger = fixtures.FakeLogger(level=logging.INFO)
- self.useFixture(self.fake_logger)
self.client.login("dummy", "test correct password")
download_path = os.path.join(self.path, "test-snap.snap")
# download first time.
- self.client.download("test-snap", "test-channel", download_path)
+ self.client.download("test-snap", risk="stable", download_path=download_path)
+ first_stat = os.stat(download_path)
# download again.
- self.client.download("test-snap", "test-channel", download_path)
- self.assertIn(
- "Already downloaded test-snap at {}".format(download_path),
- self.fake_logger.output,
- )
+ self.client.download("test-snap", risk="stable", download_path=download_path)
+ second_stat = os.stat(download_path)
+ # If these are equal it means a second download did not happen.
+ self.assertThat(second_stat, Equals(first_stat))
def test_download_on_sha_mismatch(self):
- self.fake_logger = fixtures.FakeLogger(level=logging.INFO)
- self.useFixture(self.fake_logger)
+ fake_logger = fixtures.FakeLogger(level=logging.INFO)
+ self.useFixture(fake_logger)
self.client.login("dummy", "test correct password")
download_path = os.path.join(self.path, "test-snap.snap")
# Write a wrong file in the download path.
open(download_path, "w").close()
- self.client.download("test-snap", "test-channel", download_path)
- self.assertIn(
- "Successfully downloaded test-snap at {}".format(download_path),
- self.fake_logger.output,
- )
+ first_stat = os.stat(download_path)
+ self.client.download("test-snap", risk="stable", download_path=download_path)
+ second_stat = os.stat(download_path)
+ # If these are different it means that the download did happen.
+ self.assertThat(second_stat, Not(Equals(first_stat)))
def test_download_with_hash_mismatch_raises_exception(self):
self.client.login("dummy", "test correct password")
@@ -295,8 +269,8 @@ def test_download_with_hash_mismatch_raises_exception(self):
errors.SHAMismatchError,
self.client.download,
"test-snap-with-wrong-sha",
- "test-channel",
- download_path,
+ risk="stable",
+ download_path=download_path,
)