Skip to content

Commit

Permalink
Sign raw STAC objects (#40)
Browse files Browse the repository at this point in the history
Adds support for

* Signing item-like, asset-like, and feature-collection-like mappings
* Zero-copy signing
  • Loading branch information
Tom Augspurger authored Jul 13, 2022
1 parent 14285bd commit b6a01c7
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 72 deletions.
147 changes: 95 additions & 52 deletions planetary_computer/sas.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Mapping
from typing import Any, Dict, Optional, Mapping, TypeVar, cast
import collections.abc
import copy
from copy import deepcopy
import warnings

from functools import singledispatch
from urllib.parse import urlparse, parse_qs
import requests
from pydantic import BaseModel, Field
from pystac import Asset, Item, ItemCollection
from pystac import Asset, Item, ItemCollection, STACObjectType
from pystac.utils import datetime_to_str
from pystac.serialization.identify import identify_stac_object_type
from pystac_client import ItemSearch

from planetary_computer.settings import Settings
Expand All @@ -24,6 +25,7 @@


BLOB_STORAGE_DOMAIN = ".blob.core.windows.net"
AssetLike = TypeVar("AssetLike", Asset, Dict[str, Any])


class SASBase(BaseModel):
Expand Down Expand Up @@ -66,13 +68,15 @@ def ttl(self) -> float:


@singledispatch
def sign(obj: Any) -> Any:
def sign(obj: Any, copy: bool = True) -> Any:
"""Sign the relevant URLs belonging to any supported object with a
Shared Access (SAS) Token, which allows for read access.
Args:
obj (Any): The object to sign. Must be one of:
str (URL), Asset, Item, ItemCollection, or ItemSearch, or a mapping.
copy (bool): Whether to sign the object in place, or make a copy.
Has no effect for immutable objects like strings.
Returns:
Any: A copy of the object where all relevant URLs have been signed
"""
Expand All @@ -83,7 +87,7 @@ def sign(obj: Any) -> Any:


@sign.register(str)
def sign_string(url: str) -> str:
def sign_string(url: str, copy: bool = True) -> str:
"""Sign a URL or VRT-like string containing URLs with a Shared Access (SAS) Token
Signing with a SAS token allows read access to files in blob storage.
Expand All @@ -98,6 +102,7 @@ def sign_string(url: str) -> str:
built quickly from the GDAL STACIT driver
https://gdal.org/drivers/raster/stacit.html. Each URL to Azure Blob Storage
within the VRT is signed.
copy (bool): No effect.
Returns:
str: The signed HREF or VRT
Expand All @@ -108,7 +113,7 @@ def sign_string(url: str) -> str:
return sign_url(url)


def sign_url(url: str) -> str:
def sign_url(url: str, copy: bool = True) -> str:
"""Sign a URL or with a Shared Access (SAS) Token
Signing with a SAS token allows read access to files in blob storage.
Expand All @@ -118,6 +123,7 @@ def sign_url(url: str) -> str:
Single URLs can be found on a STAC Item's Asset ``href`` value. Only URLs to
assets in Azure Blob Storage are signed, other URLs are returned unmodified.
copy (bool): No effect.
Returns:
str: The signed HREF
Expand All @@ -141,7 +147,7 @@ def _repl_vrt(m: re.Match) -> str:
return sign_url(m.string[slice(*m.span())])


def sign_vrt_string(vrt: str) -> str:
def sign_vrt_string(vrt: str, copy: bool = True) -> str:
"""Sign a VRT-like string containing URLs with a Shared Access (SAS) Token
Signing with a SAS token allows read access to files in blob storage.
Expand All @@ -153,6 +159,7 @@ def sign_vrt_string(vrt: str) -> str:
built quickly from the GDAL STACIT driver
https://gdal.org/drivers/raster/stacit.html. Each URL to Azure Blob Storage
within the VRT is signed.
copy (bool): No effect.
Returns:
str: The signed VRT
Expand All @@ -177,36 +184,41 @@ def sign_vrt_string(vrt: str) -> str:


@sign.register(Item)
def sign_item(item: Item) -> Item:
def sign_item(item: Item, copy: bool = True) -> Item:
"""Sign all assets within a PySTAC item
Args:
item (Item): The Item whose assets that will be signed
copy (bool): Whether to copy (clone) the item or mutate it inplace.
Returns:
Item: A new copy of the Item where all assets' HREFs have
Item: An Item where all assets' HREFs have
been replaced with a signed version. In addition, a "msft:expiry"
property is added to the Item properties indicating the earliest
expiry time for any assets that were signed.
"""
signed_item = item.clone()
for key in signed_item.assets:
_sign_asset_in_place(signed_item.assets[key])
return signed_item
if copy:
item = item.clone()
for key in item.assets:
_sign_asset_in_place(item.assets[key])
return item


@sign.register(Asset)
def sign_asset(asset: Asset) -> Asset:
def sign_asset(asset: Asset, copy: bool = True) -> Asset:
"""Sign a PySTAC asset
Args:
asset (Asset): The Asset to sign
copy (bool): Whether to copy (clone) the asset or mutate it inplace.
Returns:
Asset: A new copy of the Asset where the HREF is replaced with a
Asset: An asset where the HREF is replaced with a
signed version.
"""
return _sign_asset_in_place(asset.clone())
if copy:
asset = asset.clone()
return _sign_asset_in_place(asset)


def _sign_asset_in_place(asset: Asset) -> Asset:
Expand All @@ -220,36 +232,47 @@ def _sign_asset_in_place(asset: Asset) -> Asset:
with a signed version.
"""
asset.href = sign(asset.href)
if is_fsspec_asset(asset):
_sign_fsspec_asset_in_place(asset)
return asset


def _sign_fsspec_asset_in_place(asset: AssetLike) -> None:
if isinstance(asset, Asset):
extra_d = asset.extra_fields
href = asset.href
else:
extra_d = asset
href = asset["href"]

if is_fsspec_asset(extra_d):
key: Optional[str]

storage_options = None
for key in ["table:storage_options", "xarray:storage_options"]:
if key in asset.extra_fields:
storage_options = asset.extra_fields[key]

if key in extra_d:
storage_options = extra_d[key]
break
if storage_options is None:
storage_options = asset.extra_fields.get("xarray:open_kwargs", {}).get(
storage_options = extra_d.get("xarray:open_kwargs", {}).get(
"storage_options", None
)

if storage_options is None:
storage_options = (
asset.extra_fields.get("xarray:open_kwargs", {})
extra_d.get("xarray:open_kwargs", {})
.get("backend_kwargs", {})
.get("storage_options", None)
)

if storage_options is None:
return asset
return
account = storage_options.get("account_name")
container = parse_adlfs_url(asset.href)
container = parse_adlfs_url(href)
if account and container:
token = get_token(account, container)
storage_options["credential"] = token.token

return asset


def sign_assets(item: Item) -> Item:
warnings.warn(
Expand All @@ -262,31 +285,36 @@ def sign_assets(item: Item) -> Item:


@sign.register(ItemCollection)
def sign_item_collection(item_collection: ItemCollection) -> ItemCollection:
def sign_item_collection(
item_collection: ItemCollection, copy: bool = True
) -> ItemCollection:
"""Sign a PySTAC item collection
Args:
item_collection (ItemCollection): The ItemCollection whose assets will be signed
copy (bool): Whether to copy (clone) the ItemCollection or mutate it inplace.
Returns:
ItemCollection: A new copy of the ItemCollection where all assets'
ItemCollection: An ItemCollection where all assets'
HREFs for each item have been replaced with a signed version. In addition,
a "msft:expiry" property is added to the Item properties indicating the
earliest expiry time for any assets that were signed.
"""
new = item_collection.clone()
for item in new:
if copy:
item_collection = item_collection.clone()
for item in item_collection:
for key in item.assets:
_sign_asset_in_place(item.assets[key])
return new
return item_collection


@sign.register(ItemSearch)
def _search_and_sign(search: ItemSearch) -> ItemCollection:
def _search_and_sign(search: ItemSearch, copy: bool = True) -> ItemCollection:
"""Perform a PySTAC Client search, and sign the resulting item collection
Args:
search (ItemSearch): The ItemSearch whose resulting item assets will be signed
copy (bool): No effect.
Returns:
ItemCollection: The resulting ItemCollection of the search where all assets'
Expand All @@ -298,35 +326,50 @@ def _search_and_sign(search: ItemSearch) -> ItemCollection:


@sign.register(collections.abc.Mapping)
def sign_reference_file(references: Mapping) -> Mapping:
def sign_mapping(mapping: Mapping, copy: bool = True) -> Mapping:
"""
Sign a Kerchunk-style dictionary of references.
Sign a mapping.
Args:
references (Mapping): The dictionary containing the references.
mapping (Mapping):
Returns:
references (Mapping): The dictionary, now with signed URLs.
The mapping (e.g. dictionary) to sign. This method can sign
This method will sign all of the URLs under the ``"templates"`` key. Mappings
with other keys will be returned unchanged. See https://fsspec.github.io/kerchunk/
for more.
* Kerchunk-style references, which signs all URLs under the
``templates`` key. See https://fsspec.github.io/kerchunk/
for more.
* STAC items
* STAC collections
* STAC ItemCollections
copy: Whether to copy (clone) the mapping or mutate it inplace.
Returns:
signed (Mapping): The dictionary, now with signed URLs.
"""
# TODO(python3.8): Use TypedDict instead of Mapping to limit the kinds we accept.
# If we accept other mapping types in the future, we'll need to dispatch within
# the function.
if set(references) == {"version", "templates", "refs"}:
references = copy.deepcopy(references)
if copy:
mapping = deepcopy(mapping)

for k, v in references["templates"].items():
references["templates"][k] = sign_url(v)
if all(k in mapping for k in ["version", "templates", "refs"]):
for k, v in mapping["templates"].items():
mapping["templates"][k] = sign_url(v)

else:
raise TypeError(
"When providing a mapping, the keys should be 'version', 'templates', "
"and 'refs'."
)
return references
elif (
identify_stac_object_type(cast(Dict[str, Any], mapping)) == STACObjectType.ITEM
):
for k, v in mapping["assets"].items():
v["href"] = sign_url(v["href"])
_sign_fsspec_asset_in_place(v)

elif mapping.get("type") == "FeatureCollection" and mapping.get("features"):
for feature in mapping["features"]:
for k, v in feature.get("assets", {}).items():
v["href"] = sign_url(v["href"])
_sign_fsspec_asset_in_place(v)

return mapping


sign_reference_file = sign_mapping


def get_token(account_name: str, container_name: str) -> SASToken:
Expand Down
20 changes: 12 additions & 8 deletions planetary_computer/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re

from typing import Tuple, Optional
from typing import Any, Dict, Tuple, Optional, Union
from urllib.parse import ParseResult, urlunparse, urlparse

import pystac
Expand Down Expand Up @@ -49,7 +49,7 @@ def parse_adlfs_url(url: str) -> Optional[str]:
return None


def is_fsspec_asset(asset: pystac.Asset) -> bool:
def is_fsspec_asset(asset: Union[pystac.Asset, Dict[str, Any]]) -> bool:
"""
Determine if an Asset points to an fsspec URL.
Expand All @@ -60,18 +60,22 @@ def is_fsspec_asset(asset: pystac.Asset) -> bool:
* "xarray:open_kwargs.storage_options"
* "xarray:open_kwargs.backend_kwargs.storage_options"
"""
if isinstance(asset, pystac.Asset):
# backwards compat
extra_fields = asset.extra_fields
else:
extra_fields = asset

result = (
("account_name" in asset.extra_fields.get("table:storage_options", {}))
or ("account_name" in asset.extra_fields.get("xarray:storage_options", {}))
("account_name" in extra_fields.get("table:storage_options", {}))
or ("account_name" in extra_fields.get("xarray:storage_options", {}))
or (
"account_name"
in asset.extra_fields.get("xarray:open_kwargs", {}).get(
"storage_options", {}
)
in extra_fields.get("xarray:open_kwargs", {}).get("storage_options", {})
)
or (
"account_name"
in asset.extra_fields.get("xarray:open_kwargs", {})
in extra_fields.get("xarray:open_kwargs", {})
.get("backend_kwargs", {})
.get("storage_options", {})
)
Expand Down
Loading

0 comments on commit b6a01c7

Please sign in to comment.