Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add getting and setting assertions #6

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions snap_http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
list,
get_conf,
set_conf,
get_assertion_types,
get_assertions,
add_assertion,
)

from .http import SnapdHttpException
Expand All @@ -35,5 +38,6 @@
SnapdResponse,
FormData,
JsonData,
AssertionData,
FileUpload,
)
35 changes: 34 additions & 1 deletion snap_http/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Dict, List, Literal, Optional, Union

from . import http
from .types import FileUpload, FormData, SnapdResponse
from .types import AssertionData, FileUpload, FormData, SnapdResponse


def check_change(cid: str) -> SnapdResponse:
Expand Down Expand Up @@ -286,3 +286,36 @@ def set_conf(name: str, config: Dict[str, Any]) -> SnapdResponse:
Keys can be dotted, `None` can be used to unset config options.
"""
return http.put(f"/snaps/{name}/conf", config)


# Assertions: list and add assertions


def get_assertion_types() -> SnapdResponse:
"""GETs the list of assertion types."""
return http.get("/assertions")


def get_assertions(
assertion_type: str, filters: Optional[Dict[str, Any]] = None
) -> SnapdResponse:
"""GETs all the assertions of the given type.

The response is a stream of assertions separated by double newlines.

:param assertion_type: The type of the assertion.
:param filters: A (assertion-header, filter-value) mapping to filter
assertions with. Examples of headers are: username, authority-id,
account-id, series, publisher, snap-name, and publisher-id.
"""
return http.get(f"/assertions/{assertion_type}", query_params=filters)


def add_assertion(assertion: str) -> SnapdResponse:
"""Add an assertion to the system assertion database.

:param assertion: The assertion to add. It may also be a newer revision
of a pre-existing assertion that it will replace.
"""
body = AssertionData(assertion)
return http.post("/assertions", body)
15 changes: 13 additions & 2 deletions snap_http/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Lower-level functions for making actual HTTP requests to snapd's REST API."""
import json
import socket
from http.client import HTTPResponse
from http.client import HTTPResponse, responses
from io import BytesIO
from typing import Any, Dict, Optional
from urllib.parse import urlencode
Expand Down Expand Up @@ -86,4 +86,15 @@ def _make_request(
if response.status >= 400:
raise SnapdHttpException(response_body)

return json.loads(response_body)
response_type = response.getheader("Content-Type")
if response_type == "application/json":
return json.loads(response_body)
else: # other types like application/x.ubuntu.assertion
response_code = response.getcode()
is_async = response_code == 202
return {
"type": "async" if is_async else "sync",
"status_code": response_code,
"status": responses[response_code],
"result": response_body,
}
22 changes: 21 additions & 1 deletion snap_http/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
SUCCESS_STATUSES = {"Done"}
ERROR_STATUSES = {"Error", "Hold", "Unknown"}

SnapdRequestBody = Union[Dict[str, Any], "JsonData", "FormData"]
SnapdRequestBody = Union[
Dict[str, Any],
"JsonData",
"FormData",
"AssertionData",
]


@dataclass
Expand Down Expand Up @@ -123,6 +128,21 @@ def content_type_header(self) -> str:
return f"{self.content_type}; boundary={self.boundary}"


class AssertionData(AbstractRequestBody):
"""A `SnapdRequestBody` for an assertion payload."""

content_type = "application/x.ubuntu.assertion"

def __init__(self, assertion: str):
"""Initialize the class with `data`."""
self.assertion = assertion

@cached_property
def serialized(self) -> bytes:
"""Serialize the assertion to bytes."""
return self.assertion.encode()


@dataclass
class FileUpload:
"""A file to upload to snapd's REST API."""
Expand Down
20 changes: 20 additions & 0 deletions tests/integration/assets/hello_world_snap_declaration.assert
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type: snap-declaration
authority-id: canonical
revision: 4
series: 16
snap-id: buPKUD3TKqCOgLEjjHx5kSiCpIs5cMuQ
publisher-id: canonical
snap-name: hello-world
timestamp: 2016-09-05T18:40:07.614230Z
sign-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0hqUel3m8ul

AcLBUgQAAQoABgUCV828BwAA4ZYQAL1VYQbmY1WDPPuVDISGzDDQUxxf37gYVDcoU8lj23/WvcBw
CPmU5UyoljcJQ/EZaU3dFuVRDfSn7BmtjiNxiHazPLKZIqh1vxY8lUaR5ms+jgai/CAGJI5pa4pA
vEZnVWbS4YcjiL5woA4GdIOFyFliwfJcRW+1QUJlEuziGZ3Bdozm5TuuD+eI1jJnEVomWOiaXv3+
QM4SEjVFqBLWfMCtIRa15Htlwr73DDh5XjmcaP/aHFLU94FrgQ9pKjlmJSdnMk6/b9GLzwcN++x7
4+OoKvBhheszNa0uQk79yEntRq2QU1HYlUNhtngqF2i1Fonw8mZG9xey9sarXLDiNNn/P2iC5A5D
JySRmSvD4Kx1fzLkA7iiB5vF+SHzJL+T7cyeOvpV0wIBKj0lmgwEmcMd3rffVYhtVWcF5gdPujb6
wgHsz6QNrv6Ecp5z/D1GcZ1INbW9fSTw2NjlESVf6xOFRDvCBDw3DktTBft4CCy39ubqdCS8mE4h
HU2AIP7umxJeltwwvAPpgHKIZGJyd9MP97b3lIfTgE3XT84K3pxjMEta1JKIOToY3VojrvvkJ/ot
msNPWZaHtP0IyuVk6badmA8rsu1lAG62sE+GYU+s87YEG/ErYeYAPO9MG9jd6pFzm/DmTkTLP6Ix
GV35tEK+iG5vXq2axxZ9y2LlMwSw
26 changes: 24 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from typing import Dict

import pytest

import snap_http

from tests.utils import is_snap_installed, wait_for
from tests.utils import is_snap_installed, remove_assertion, wait_for

TEST_SNAPS = ["test-snap", "hello-world"]

HELLO_WORLD_SNAP_DECLARATION_ASSERTION = {
"assertion_type": "snap-declaration",
"snap_id": "buPKUD3TKqCOgLEjjHx5kSiCpIs5cMuQ",
"series": "16",
}
TEST_ASSERTIONS = [HELLO_WORLD_SNAP_DECLARATION_ASSERTION]


def pytest_configure():
"""Make sure the test environment is clean before running tests."""
Expand All @@ -15,6 +23,10 @@ def pytest_configure():
if snap in installed:
wait_for(snap_http.remove)(snap)

# remove test assertions if they exist
for assertion in TEST_ASSERTIONS:
remove_assertion(**assertion)


@pytest.fixture
def local_test_snap_path():
Expand All @@ -37,3 +49,13 @@ def test_snap(local_test_snap_path):
# teardown
if is_snap_installed("test-snap"):
wait_for(snap_http.remove)("test-snap")


@pytest.fixture
def hello_world_snap_declaration_assertion() -> (str, Dict[str, str]):
path = "tests/integration/assets/hello_world_snap_declaration.assert"
with open(path, "r") as f:
yield (f.read(), HELLO_WORLD_SNAP_DECLARATION_ASSERTION)

# teardown
remove_assertion(**HELLO_WORLD_SNAP_DECLARATION_ASSERTION)
83 changes: 79 additions & 4 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import snap_http

from tests.utils import get_snap_details, is_snap_installed, wait_for
from tests.utils import (
assertion_exists,
get_snap_details,
is_snap_installed,
remove_assertion,
wait_for,
)


# configuration: get and set snap options
Expand Down Expand Up @@ -134,7 +140,7 @@ def test_list_snaps():
assert "snapd" in installed_snaps


def test_install_snap_from_the_store():
def test_install_snap_from_the_store(hello_world_snap_declaration_assertion):
"""Test installing a snap from the store."""
assert is_snap_installed("hello-world") is False

Expand All @@ -143,6 +149,7 @@ def test_install_snap_from_the_store():
assert is_snap_installed("hello-world") is True

wait_for(snap_http.remove)("hello-world")
remove_assertion(**hello_world_snap_declaration_assertion[1])


def test_remove_snap(test_snap):
Expand All @@ -154,10 +161,16 @@ def test_remove_snap(test_snap):
assert is_snap_installed("test-snap") is False


def test_sideload_snap_no_flags(local_hello_world_snap_path):
def test_sideload_snap_no_flags(
local_hello_world_snap_path,
hello_world_snap_declaration_assertion,
):
"""Test sideloading a snap with no flags specified."""
assert is_snap_installed("hello-world") is False

# ack the assertion
snap_http.add_assertion(hello_world_snap_declaration_assertion[0])
# sideload
response = wait_for(snap_http.sideload)(
file_paths=[local_hello_world_snap_path],
)
Expand Down Expand Up @@ -210,10 +223,16 @@ def test_sideload_dangerous_snap(local_hello_world_snap_path):
wait_for(snap_http.remove)("hello-world")


def test_sideload_snap_with_enforced_confinement(local_hello_world_snap_path):
def test_sideload_snap_with_enforced_confinement(
local_hello_world_snap_path,
hello_world_snap_declaration_assertion,
):
"""Test sideloading a snap with enforced confinement."""
assert is_snap_installed("hello-world") is False

# ack the assertion
snap_http.add_assertion(hello_world_snap_declaration_assertion[0])
# sideload
response = wait_for(snap_http.sideload)(
file_paths=[local_hello_world_snap_path],
jailmode=True,
Expand All @@ -232,11 +251,15 @@ def test_sideload_snap_with_enforced_confinement(local_hello_world_snap_path):
def test_sideload_multiple_snaps(
local_test_snap_path,
local_hello_world_snap_path,
hello_world_snap_declaration_assertion,
):
"""Test sideloading multiple snaps."""
assert is_snap_installed("test-snap") is False
assert is_snap_installed("hello-world") is False

# ack the assertion
snap_http.add_assertion(hello_world_snap_declaration_assertion[0])
# sideload
response = wait_for(snap_http.sideload)(
file_paths=[local_test_snap_path, local_hello_world_snap_path],
devmode=True,
Expand All @@ -247,3 +270,55 @@ def test_sideload_multiple_snaps(
assert is_snap_installed("hello-world") is True

wait_for(snap_http.remove_all)(["test-snap", "hello-world"])


# Assertions: list and add assertions


def test_get_assertion_types():
"""Test getting assertion types."""
response = snap_http.get_assertion_types()
assert response.status_code == 200
types = response.result["types"]
assert len(types) > 0
assert "account" in types
assert "model" in types
assert "snap-declaration" in types
assert "store" in types


def test_get_assertions():
"""Test getting assertions."""
response = snap_http.get_assertions("snap-declaration")
assert response.status_code == 200
assert len(response.result) > 0
assert b"type: snap-declaration" in response.result


def test_get_assertions_with_filters(hello_world_snap_declaration_assertion):
"""Test getting assertions with filters."""
assertion, metadata = hello_world_snap_declaration_assertion

before = snap_http.get_assertions(
"snap-declaration", filters={"snap-id": metadata["snap_id"]}
)
assert before.result == b""

response = snap_http.add_assertion(assertion)
assert response.status_code == 200

after = snap_http.get_assertions(
"snap-declaration",
filters={"snap-id": metadata["snap_id"], "series": metadata["series"]},
)
assert after.result.decode() == assertion


def test_add_an_assertion(hello_world_snap_declaration_assertion):
"""Test adding an assertion."""
assertion, metadata = hello_world_snap_declaration_assertion
assert assertion_exists(**metadata) is False

response = snap_http.add_assertion(assertion)
assert response.status_code == 200
assert assertion_exists(**metadata) is True
Loading