Skip to content

Commit

Permalink
Index remote or local files via fsspec (#130)
Browse files Browse the repository at this point in the history
* add fsspec support for separate input, database and output folders, where the input could be anything supported by fsspec including https or s3 
* improve coverage
* support Python 3.8+
  • Loading branch information
dholth authored Apr 5, 2024
1 parent a464362 commit e9b7f88
Show file tree
Hide file tree
Showing 16 changed files with 420 additions and 126 deletions.
2 changes: 1 addition & 1 deletion conda_index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
conda index. Create repodata.json for collections of conda packages.
"""

__version__ = "0.4.0"
__version__ = "0.5.0"
75 changes: 41 additions & 34 deletions conda_index/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import time
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timezone
from numbers import Number
from os.path import abspath, basename, getmtime, getsize, isfile, join
from typing import NamedTuple
from os.path import basename, getmtime, getsize, isfile, join
from pathlib import Path
from typing import Iterable
from uuid import uuid4

import zstandard
Expand All @@ -35,6 +35,7 @@
CONDA_PACKAGE_EXTENSIONS,
)
from . import rss, sqlitecache
from .fs import FileInfo, MinimalFS

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,10 +87,6 @@ def result(self):
LOCK_TIMEOUT_SECS = 3 * 3600
LOCKFILE_NAME = ".lock"

# XXX conda-build calls its version of get_build_index. Appears to combine
# remote and local packages, updating the local index based on mtime. Standalone
# conda-index removes get_build_index() for now.


def _ensure_valid_channel(local_folder, subdir):
for folder in {subdir, "noarch"}:
Expand All @@ -98,23 +95,13 @@ def _ensure_valid_channel(local_folder, subdir):
os.makedirs(path)


class FileInfo(NamedTuple):
"""
Filename and a bit of stat information.
"""

fn: str
st_mtime: Number
st_size: Number


def update_index(
dir_path,
output_dir=None,
check_md5=False,
channel_name=None,
patch_generator=None,
threads: (int | None) = MAX_THREADS_DEFAULT,
threads: int | None = MAX_THREADS_DEFAULT,
verbose=False,
progress=False,
subdirs=None,
Expand Down Expand Up @@ -486,14 +473,24 @@ class ChannelIndex:
output.
See the implementation of ``conda_index.cli`` for usage.
:param channel_root: Path to channel, or just the channel cache if channel_url is provided.
:param channel_name: Name of channel; defaults to last path segment of channel_root.
:param subdirs: subdirs to index.
:param output_root: Path to write repodata.json etc; defaults to channel_root.
:param channel_url: fsspec URL where package files live. If provided, channel_root will only be used for cache and index output.
:param fs: ``MinimalFS`` instance to be used with channel_url. Wrap fsspec AbstractFileSystem with ``conda_index.index.fs.FsspecFS(fs)``.
"""

fs: MinimalFS | None = None
channel_url: str | None = None

def __init__(
self,
channel_root,
channel_name,
subdirs=None,
threads: (int | None) = MAX_THREADS_DEFAULT,
channel_root: Path,
channel_name: str | None,
subdirs: Iterable[str] | None = None,
threads: int | None = MAX_THREADS_DEFAULT,
deep_integrity_check=False,
debug=False,
output_root=None, # write repodata.json etc. to separate folder?
Expand All @@ -502,14 +499,22 @@ def __init__(
write_zst=False,
write_run_exports=False,
compact_json=True,
channel_url: str | None = None,
fs: MinimalFS | None = None,
):
if threads is None:
threads = MAX_THREADS_DEFAULT

if (fs or channel_url) and not (fs and channel_url):
raise TypeError("Both or none of fs, channel_url must be provided.")

self.fs = fs
self.channel_url = channel_url

self.channel_root = Path(channel_root)
self.cache_class = cache_class
self.channel_root = abspath(channel_root)
self.output_root = abspath(output_root) if output_root else self.channel_root
self.channel_name = channel_name or basename(channel_root.rstrip("/"))
self.output_root = Path(output_root) if output_root else self.channel_root
self.channel_name = channel_name or basename(str(channel_root).rstrip("/"))
self._subdirs = subdirs
# no lambdas in pickleable
self.thread_executor_factory = functools.partial(
Expand Down Expand Up @@ -627,7 +632,6 @@ def index_prepared_subdir(

log.info("%s Writing patched repodata", subdir)

log.debug("%s write patched repodata", subdir)
self._write_repodata(subdir, patched_repodata, REPODATA_JSON_FN)

log.info("%s Building current_repodata subset", subdir)
Expand Down Expand Up @@ -659,7 +663,6 @@ def index_prepared_subdir(

log.info("%s Writing index HTML", subdir)

log.debug("%s write index.html", subdir)
self._write_subdir_index_html(subdir, patched_repodata)

log.debug("%s finish", subdir)
Expand Down Expand Up @@ -728,11 +731,10 @@ def index_subdir(self, subdir, verbose=False, progress=False):
Must call `extract_subdir_to_cache()` first or will be outdated.
"""
subdir_path = join(self.channel_root, subdir)

cache = self.cache_for_subdir(subdir)

log.debug("Building repodata for %s", subdir_path)
log.debug("Building repodata for %s/%s", self.channel_name, subdir)

new_repodata_packages, new_repodata_conda_packages = cache.indexed_packages()

Expand All @@ -750,7 +752,10 @@ def index_subdir(self, subdir, verbose=False, progress=False):

def cache_for_subdir(self, subdir):
cache: sqlitecache.CondaIndexCache = self.cache_class(
channel_root=self.channel_root, subdir=subdir
channel_root=self.channel_root,
subdir=subdir,
fs=self.fs,
channel_url=self.channel_url,
)
if cache.cache_is_brand_new:
# guaranteed to be only thread doing this?
Expand Down Expand Up @@ -795,6 +800,9 @@ def extract_subdir_to_cache(
for fn, mtime, size, index_json in thread_executor.map(
extract_func, extract
):
# XXX allow size to be None or get from "bytes sent through
# checksum algorithm" e.g. for fsspec where size may not be
# known in advance
size_processed += size # even if processed incorrectly
# fn can be None if the file was corrupt or no longer there
if fn and mtime:
Expand Down Expand Up @@ -1135,10 +1143,9 @@ def _create_patch_instructions(self, subdir, repodata, patch_generator=None):
else:
if patch_generator:
raise ValueError(
"Specified metadata patch file '{}' does not exist. Please try an absolute "
"path, or examine your relative path carefully with respect to your cwd.".format(
patch_generator
)
f"Specified metadata patch file '{patch_generator}' does not exist. "
"Please try an absolute path, or examine your relative path carefully "
"with respect to your cwd."
)
return {}

Expand Down
14 changes: 6 additions & 8 deletions conda_index/index/convert_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ def remove_prefix(conn: sqlite3.Connection):
log.info("Migrate database")

def basename(path):
if not isinstance(path, str):
if not isinstance(path, str): # pragma: no cover
# if our custom sqlite function is passed a non-str field
return path
return path.rsplit("/")[-1]

try:
conn.create_function(
"migrate_basename", narg=1, func=basename, deterministic=True
)
except TypeError: # Python < 3.8
conn.create_function("migrate_basename", narg=1, func=basename)
conn.create_function(
"migrate_basename", narg=1, func=basename, deterministic=True
)

for table in TABLE_NAMES + ["stat"]:
conn.execute(
Expand Down Expand Up @@ -298,7 +296,7 @@ def merge_index_cache(channel_root, output_db="merged.db"):
try:
with combined_db:
combined_db.execute(query, (channel_prefix,))
except sqlite3.OperationalError:
except sqlite3.OperationalError: # pragma: no cover
log.error("OperationalError on %s", query)
raise

Expand Down
90 changes: 90 additions & 0 deletions conda_index/index/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
Minimal (just what conda-index uses) filesystem abstraction.
Allows `fsspec <https://filesystem-spec.readthedocs.io/>`_ to be used to index
remote repositories, without making it a required dependency.
"""

from __future__ import annotations

import os
import os.path
import typing
from dataclasses import dataclass
from numbers import Number
from pathlib import Path

if typing.TYPE_CHECKING: # pragma: no cover
from fsspec import AbstractFileSystem


# Note fsspec uses / as a path separator on all platforms


@dataclass
class FileInfo:
"""
Filename and a bit of stat information.
"""

fn: str
st_mtime: Number
st_size: Number


class MinimalFS:
"""
Filesystem API as needed by conda-index, for fsspec compatibility.
"""

def open(self, path: str, mode: str = "rb"):
return Path(path).open(mode)

def stat(self, path: str):
st_result = os.stat(path)
return {
"size": st_result.st_size,
"mtime": st_result.st_mtime,
}

def join(self, *paths):
return os.path.join(*paths)

def listdir(self, path) -> typing.Iterable[dict]:
for name in os.listdir(path):
stat_result = os.stat(os.path.join(path, name))
yield {
"name": name,
"mtime": stat_result.st_mtime,
"size": stat_result.st_size,
}

def basename(self, path) -> str:
return os.path.basename(path)


class FsspecFS(MinimalFS):
"""
Wrap a fsspec filesystem to pass to :class:`ChannelIndex`
"""

fsspec_fs: AbstractFileSystem

def __init__(self, fsspec_fs):
self.fsspec_fs = fsspec_fs

def open(self, path: str, mode: str = "rb"):
return self.fsspec_fs.open(path, mode)

def stat(self, path: str):
return self.fsspec_fs.stat(path)

def join(self, *paths):
# XXX
return "/".join(p.rstrip("/") for p in paths)

def listdir(self, path: str) -> list[dict]:
return self.fsspec_fs.listdir(path, details=True)

def basename(self, path: str) -> str:
return path.rsplit("/", 1)[-1]
Loading

0 comments on commit e9b7f88

Please sign in to comment.