Skip to content

Commit

Permalink
chore: add transparent location strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
smotornyuk committed Jun 3, 2024
1 parent ea35b86 commit 03ac04b
Show file tree
Hide file tree
Showing 19 changed files with 387 additions and 419 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,9 @@ plugin.
ckan config declaration files -d
```
Because redis storage adapter is enabled, you'll see all the options
Because Redis storage adapter is enabled, you'll see all the options
regsitered by Redis adapter alongside with the global options:


```ini
## ckanext-files ###############################################################
## ...
Expand Down
189 changes: 71 additions & 118 deletions ckanext/files/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import abc
import copy
import dataclasses
import hashlib
import os
import uuid
from datetime import datetime
from typing import IO, Any, Iterable, Literal
from typing import IO, Any, Generic, Iterable, Literal, Protocol, TypeVar

import pytz

Expand All @@ -29,64 +28,66 @@

from ckanext.files import config, exceptions, model, utils

CHUNK_SIZE = 16 * 1024

adapters: utils.Registry[type[Storage]] = utils.Registry({})
storages: utils.Registry[Storage] = utils.Registry({})


@dataclasses.dataclass
class FileData:
class PFileModel(Protocol):
location: str
size: int = 0
content_type: str = "application/octet-stream"
hash: str = ""
storage_data: dict[str, Any] = dataclasses.field(default_factory=dict)
size: int
content_type: str
hash: str
storage_data: dict[str, Any]

@classmethod
def from_file(cls, file: model.File):
return cls(
file.location,
file.size,
file.content_type,
file.hash,
copy.deepcopy(file.storage_data),
)

def into_file(self, file: model.File):
file.location = self.location
file.size = self.size
file.content_type = self.content_type
file.hash = self.hash
file.storage_data = copy.deepcopy(self.storage_data)
return file
TFileModel = TypeVar("TFileModel", bound=PFileModel)


@dataclasses.dataclass
class MultipartData:
location: str = ""
class BaseData(Generic[TFileModel]):
location: str
size: int = 0
content_type: str = ""
hash: str = ""
storage_data: dict[str, Any] = dataclasses.field(default_factory=dict)

@classmethod
def from_mulltipart(cls, item: model.Multipart):
def from_dict(cls, record: dict[str, Any]):
return cls(
record["location"],
record["size"],
record["content_type"],
record["hash"],
copy.deepcopy(record["storage_data"]),
)

@classmethod
def from_model(cls, record: TFileModel):
return cls(
item.location,
item.size,
item.content_type,
item.hash,
copy.deepcopy(item.storage_data),
record.location,
record.size,
record.content_type,
record.hash,
copy.deepcopy(record.storage_data),
)

def into_multipart(self, item: model.Multipart):
item.location = self.location
item.size = self.size
item.content_type = self.content_type
item.hash = self.hash
item.storage_data = copy.deepcopy(self.storage_data)
return item
def into_model(self, record: TFileModel):
record.location = self.location
record.size = self.size
record.content_type = self.content_type
record.hash = self.hash
record.storage_data = copy.deepcopy(self.storage_data)
return record


@dataclasses.dataclass
class FileData(BaseData[model.File]):
content_type: str = "application/octet-stream"


@dataclasses.dataclass
class MultipartData(BaseData[model.Multipart]):
location: str = ""


def make_storage(name: str, settings: dict[str, Any]) -> Storage:
Expand Down Expand Up @@ -133,61 +134,6 @@ def get_storage(name: str | None = None) -> Storage:
return storage


class HashingReader:
"""IO stream wrapper that computes content hash while stream is consumed.
Example:
>>> reader = HashingReader(readable_stream)
>>> for chunk in reader:
>>> ...
>>> print(f"Hash: {reader.get_hash()}")
"""

def __init__(
self,
stream: IO[bytes],
chunk_size: int = CHUNK_SIZE,
algorithm: str = "md5",
) -> None:
self.stream = stream
self.chunk_size = chunk_size
self.algorithm = algorithm
self.hashsum = hashlib.new(algorithm)
self.position = 0

def __iter__(self):
return self

def __next__(self):
chunk = self.stream.read(self.chunk_size)
if not chunk:
raise StopIteration

self.position += len(chunk)
self.hashsum.update(chunk)
return chunk

next = __next__

def reset(self):
"""Rewind underlying stream and reset hash to initial state."""
self.position = 0
self.hashsum = hashlib.new(self.algorithm)
self.stream.seek(0)

def get_hash(self):
"""Get content hash as a string."""
return self.hashsum.hexdigest()

def exhaust(self):
"""Exhaust internal stream to compute final version of content hash."""

for _ in self:
pass


class OptionChecker(object):
"""Mixin for standard access to required settings.
Expand Down Expand Up @@ -258,21 +204,21 @@ def initialize_multipart_upload(
raise NotImplementedError

# TODO: rename to refresh or something
def show_multipart_upload(self, upload_data: MultipartData) -> MultipartData:
def show_multipart_upload(self, data: MultipartData) -> MultipartData:
"""Show details of the incomplete upload."""
raise NotImplementedError

def update_multipart_upload(
self,
upload_data: MultipartData,
data: MultipartData,
extras: dict[str, Any],
) -> MultipartData:
"""Add data to the incomplete upload."""
raise NotImplementedError

def complete_multipart_upload(
self,
upload_data: MultipartData,
data: MultipartData,
extras: dict[str, Any],
) -> FileData:
"""Verify file integrity and finalize incomplete upload."""
Expand All @@ -287,7 +233,6 @@ def remove(self, data: FileData) -> bool:

def exists(self, data: FileData) -> bool:
"""Check if file exists in the storage."""

raise NotImplementedError

def compose(
Expand Down Expand Up @@ -340,33 +285,26 @@ def analyze(self, filename: str) -> FileData:
class Reader(StorageService):
def stream(self, data: FileData) -> IO[bytes]:
"""Return byte-stream of the file content."""

raise NotImplementedError

def content(self, data: FileData) -> bytes:
"""Return file content as a single byte object."""

return self.stream(data).read()

def permanent_link(self, data: FileData) -> str:
"""Return permanent download link."""

raise NotImplementedError

def temporal_link(self, data: FileData) -> str:
"""Return temporal download link."""

raise NotImplementedError

def one_time_link(self, data: FileData) -> str:
"""Return one-time download link."""

raise NotImplementedError


class Storage(OptionChecker):
__metaclass__ = abc.ABCMeta

class Storage(OptionChecker, abc.ABC):
def __init__(self, **settings: Any) -> None:
self.settings = settings

Expand Down Expand Up @@ -424,26 +362,30 @@ def unsupported_operations(self):

def compute_location(
self,
name: str,
location: str,
extras: dict[str, Any],
upload: utils.Upload | None = None,
) -> str:
strategy = self.settings.get("location_strategy", "uuid")

if strategy == "transparent":
return location

if strategy == "uuid":
return str(uuid.uuid4())

if strategy == "uuid_prefix":
return str(uuid.uuid4()) + name

if strategy == "datetime_prefix":
return datetime.now(pytz.utc).isoformat() + name
return str(uuid.uuid4()) + location

if strategy == "uuid_with_extension":
_path, ext = os.path.splitext(name)
_path, ext = os.path.splitext(location)
return str(uuid.uuid4()) + ext

if strategy == "datetime_prefix":
return datetime.now(pytz.utc).isoformat() + location

if strategy == "datetime_with_extension":
_path, ext = os.path.splitext(name)
_path, ext = os.path.splitext(location)
return datetime.now(pytz.utc).isoformat() + ext

raise exceptions.NameStrategyError(strategy)
Expand Down Expand Up @@ -553,19 +495,30 @@ def compose(
if storage is self and self.supports(utils.Capability.COMPOSE):
return self.manager.compose(datas, location, extras)

if self.supports(utils.Capability.STREAM) and storage.supports(
utils.Capability.combine(utils.Capability.CREATE, utils.Capability.APPEND),
):
dest_data = storage.upload(location, utils.make_upload(""), extras)
for data in datas:
dest_data = storage.append(
dest_data,
utils.make_upload(self.stream(data)),
extras,
)
return dest_data

raise exceptions.UnsupportedOperationError("compose", type(self))

def append(
self,
data: FileData,
storage: Storage,
upload: utils.Upload,
extras: dict[str, Any],
) -> FileData:
if storage is self and self.supports(utils.Capability.APPEND):
if self.supports(utils.Capability.APPEND):
return self.manager.append(data, upload, extras)

raise exceptions.UnsupportedOperationError("compose", type(self))
raise exceptions.UnsupportedOperationError("append", type(self))

def move(
self,
Expand Down
4 changes: 2 additions & 2 deletions ckanext/files/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def stream(file_id: str):
raise click.Abort() from err

try:
content_stream = storage.stream(shared.FileData.from_file(file))
content_stream = storage.stream(shared.FileData.from_model(file))
except exceptions.UnsupportedOperationError as err:
tk.error_shout(err)
raise click.Abort() from err
Expand Down Expand Up @@ -138,7 +138,7 @@ def scan(
name=os.path.basename(name),
storage=storage_name,
)
data.into_file(fileobj)
data.into_model(fileobj)

model.Session.add(fileobj)

Expand Down
Loading

0 comments on commit 03ac04b

Please sign in to comment.