Skip to content

Commit

Permalink
fixup! [u r] Eliminate RepositoryPlugin.list_partitions (#6531)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 17, 2024
1 parent d0a89fd commit a6390ce
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 70 deletions.
4 changes: 2 additions & 2 deletions deployments/dev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ def mkdict(previous_catalog: dict[str, str],
]))

lungmap_sources = mkdict({}, 2, mkdelta([
mksrc('bigquery', 'datarepo-dev-5d9526e0', 'lungmap_dev_1bdcecde16be420888f478cd2133d11d__20220401_20220404', 1),
mksrc('bigquery', 'datarepo-dev-8de6d66b', 'lungmap_dev_2620497955a349b28d2b53e0bdfcb176__20220404_20220404', 1)
mksrc('bigquery', 'datarepo-dev-5d9526e0', 'lungmap_dev_1bdcecde16be420888f478cd2133d11d__20220401_20220404'),
mksrc('bigquery', 'datarepo-dev-8de6d66b', 'lungmap_dev_2620497955a349b28d2b53e0bdfcb176__20220404_20220404')
]))

lm2_sources = mkdict(lungmap_sources, 5, mkdelta([
Expand Down
5 changes: 3 additions & 2 deletions scripts/post_deploy_tdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def catalogs(self) -> Collection[CatalogName]:
result = [
catalog.name
for catalog in config.catalogs.values()
if config.is_tdr_enabled(catalog.name)
if (config.is_tdr_enabled(catalog.name)
and catalog.name not in config.integration_test_catalogs)
]
assert result, config.catalogs
return result
Expand Down Expand Up @@ -93,7 +94,7 @@ def verify_source(self,
plugin = self.repository_plugin(catalog)
ref = plugin.resolve_source(str(source_spec))
log.info('TDR client is authorized for API access to %s.', source_spec)
ref = plugin.partition_source(ref)
ref = plugin.partition_source(catalog, ref)
prefix = ref.spec.prefix
if config.deployment.is_main:
require(prefix.common == '', source_spec)
Expand Down
2 changes: 1 addition & 1 deletion src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def remote_reindex(self,
plugin = self.repository_plugin(catalog)
for source in sources:
source = plugin.resolve_source(source)
source = plugin.partition_source(source)
source = plugin.partition_source(catalog, source)

def message(partition_prefix: str) -> JSON:
log.info('Remotely reindexing prefix %r of source %r into catalog %r',
Expand Down
29 changes: 18 additions & 11 deletions src/azul/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class Prefix:
of_everything: ClassVar['Prefix']

digits = '0123456789abcdef'
max_partition_size = 8192

def __attrs_post_init__(self):
validate_uuid_prefix(self.common)
Expand Down Expand Up @@ -147,6 +146,8 @@ def parse(cls, prefix: str) -> Self:
@classmethod
def for_main_source(cls, subgraphs: int) -> Self:
"""
A prefix that is expected to rarely exceed 8192 subgraphs per partition
>>> str(Prefix.for_main_source(0))
Traceback (most recent call last):
...
Expand All @@ -155,26 +156,28 @@ def for_main_source(cls, subgraphs: int) -> Self:
>>> str(Prefix.for_main_source(1))
'/0'
>>> m = Prefix.max_partition_size
>>> n = 8192
>>> cases = [-1, 0, 1, 2]
>>> [str(Prefix.for_main_source(m + i)) for i in cases]
>>> [str(Prefix.for_main_source(n + i)) for i in cases]
['/0', '/0', '/1', '/1']
>>> # Sources with this many bundles are very rare, so we have a
>>> # generous margin of error surrounding this cutoff point
>>> [str(Prefix.for_main_source(m * 16 + i)) for i in cases]
>>> n *= 16 # 131072
>>> [str(Prefix.for_main_source(n + i)) for i in cases]
['/1', '/1', '/2', '/2']
"""
partition = max(0, math.ceil(math.log(subgraphs / cls.max_partition_size, 16)))
partition = cls._prefix_length(subgraphs, 8192)
return cls(common='', partition=partition)

@classmethod
def for_other_source(cls, subgraphs: int) -> Self:
"""
An experimentally derived formula designed to minimize manual adjustment
of the computed common prefixes, yielding an average of 24 subgraphs per
source. The partition prefix is always 1, even though some partitions
may be empty, to ensure test coverage for handling multiple partitions.
A prefix that yields an average of approximately 24 subgraphs per
source, using an experimentally derived heuristic formula designed to
minimize manual adjustment of the computed common prefixes. The
partition prefix length is always 1, even though some partitions may be
empty, to provide test coverage for handling multiple partitions.
>>> str(Prefix.for_other_source(0))
Traceback (most recent call last):
Expand All @@ -189,15 +192,19 @@ def for_other_source(cls, subgraphs: int) -> Self:
>>> [str(Prefix.for_other_source(n + i)) for i in cases]
['/1', '/1', '0/1', '1/1']
>>> n = 1024
>>> n *= 16 # 1024
>>> [str(Prefix.for_other_source(n + i)) for i in cases]
['e/1', 'f/1', '00/1', '10/1']
"""
digits = f'{subgraphs - 1:x}'[::-1]
length = max(0, math.ceil(math.log(subgraphs, 16) - 1.5))
length = cls._prefix_length(subgraphs, 64)
assert length < len(digits), subgraphs
return cls(common=digits[:length], partition=1)

@classmethod
def _prefix_length(cls, n, m) -> int:
return max(0, math.ceil(math.log(n / m, len(cls.digits))))

def partition_prefixes(self) -> Iterator[str]:
"""
>>> list(Prefix.parse('/0').partition_prefixes())
Expand Down
24 changes: 18 additions & 6 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,14 @@ def sources(self) -> AbstractSet[SOURCE_SPEC]:
raise NotImplementedError

def _assert_source(self, source: SOURCE_REF):
"""
Assert that the given source is present in the plugin configuration.
"""
assert source.spec.prefix is not None, source
for configured_spec in self.sources:
if configured_spec == source.spec:
break
# Most configured sources lack an explicit prefix
elif configured_spec.eq_ignoring_prefix(source.spec):
assert configured_spec.prefix is None, (configured_spec, source)
break
Expand Down Expand Up @@ -614,17 +618,25 @@ def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
"""
raise NotImplementedError

def partition_source(self, source: SOURCE_REF) -> SOURCE_REF:
def partition_source(self,
catalog: CatalogName,
source: SOURCE_REF
) -> SOURCE_REF:
"""
If the source already has a prefix, return it. Otherwise, return an
updated copy of the source with a heuristically computed prefix that
should be appropriate for indexing.
If the source already has a prefix, return the source. Otherwise, return
an updated copy of the source with a heuristically computed prefix that
should be appropriate for indexing in the given catalog.
"""
if source.spec.prefix is None:
count = self._count_subgraphs(source.spec)
is_main = config.deployment.is_main
method = Prefix.for_main_source if is_main else Prefix.for_other_source
prefix = method(count)
is_it = catalog in config.integration_test_catalogs
# We use the "other" heuristic during IT to avoid indexing an
# excessive number of bundles
if is_main and not is_it:
prefix = Prefix.for_main_source(count)
else:
prefix = Prefix.for_other_source(count)
source = attr.evolve(source, spec=attr.evolve(source.spec, prefix=prefix))
return source

Expand Down
79 changes: 31 additions & 48 deletions test/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
BytesIO,
TextIOWrapper,
)
import itertools
from itertools import (
count,
starmap,
Expand Down Expand Up @@ -286,41 +287,17 @@ def managed_access_sources_by_catalog(self
managed_access_sources[catalog].add(ref)
return managed_access_sources

def _list_partition_bundles(self,
catalog: CatalogName,
source: str
) -> tuple[SourceRef, str, list[SourcedBundleFQID]]:
def _list_partitions(self,
catalog: CatalogName,
*,
min_bundles: int,
public_1st: bool
) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]:
"""
Randomly select a partition of bundles from the specified source and
return the FQIDs of the bundles in that partition. The partition is
guaranteed to contain at least one bundle.
Iterate through the sources in the given catalog and yield partitions of
bundle FQIDs until a desired minimum number of bundles are found. For
each emitted source, every partition is included, even if it's empty.
"""
plugin = self.azul_client.repository_plugin(catalog)
source = plugin.resolve_source(source)
source = plugin.partition_source(source)
prefix = source.spec.prefix
partition_prefixes = list(prefix.partition_prefixes())
self.random.shuffle(partition_prefixes)
for partition_prefix in partition_prefixes:
fqids = self.azul_client.list_bundles(catalog, source, partition_prefix)
# To achieve IT coverage of handling multiple partitions per source, all
# sources on personal and sandbox deployments a use partition prefix of
# 1. The desired maximum number of subgraphs per source for these
# deployments in only 512, so we expect many partitions to be empty. For
# full-scale deployments, the desired maximum partition size is 8192.
# We don't verify/enforce those limits here because the prefixes are
# computed heuristically.
if fqids:
return source, partition_prefix, fqids
assert False, 'All partitions were empty'

def _list_bundles(self,
catalog: CatalogName,
*,
min_bundles: int,
check_all: bool,
public_1st: bool
) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]:
total_bundles = 0
sources = sorted(config.sources(catalog))
self.random.shuffle(sources)
Expand All @@ -335,18 +312,22 @@ def _list_bundles(self,
if source not in managed_access_sources
)
sources[0], sources[index] = sources[index], sources[0]
plugin = self.azul_client.repository_plugin(catalog)
# This iteration prefers sources occurring first, so we shuffle them
# above to neutralize the bias.
for source in sources:
source, prefix, new_fqids = self._list_partition_bundles(catalog, source)
if total_bundles < min_bundles:
source = plugin.resolve_source(source)
source = plugin.partition_source(catalog, source)
for prefix in source.spec.prefix.partition_prefixes():
new_fqids = self.azul_client.list_bundles(catalog, source, prefix)
total_bundles += len(new_fqids)
yield source, prefix, new_fqids
# If `check_all` is True, keep looping to verify the size of a
# partition for all sources
elif not check_all:
# We postpone this check until after we've yielded all partitions in
# the current source to ensure test coverage for handling multiple
# partitions per source
if total_bundles >= min_bundles:
break
if total_bundles < min_bundles:
else:
log.warning('Checked all sources and found only %d bundles instead of the '
'expected minimum %d', total_bundles, min_bundles)

Expand All @@ -359,7 +340,7 @@ def _list_managed_access_bundles(self,
# prefix as possible.
for source in self.managed_access_sources_by_catalog[catalog]:
assert str(source.spec) in sources
source = self.repository_plugin(catalog).partition_source(source)
source = self.repository_plugin(catalog).partition_source(catalog, source)
bundle_fqids = sorted(
bundle_fqid
for bundle_fqid in self.azul_client.list_bundles(catalog, source, prefix='')
Expand Down Expand Up @@ -1242,10 +1223,9 @@ def update(source: SourceRef,
# public and managed access data. `public_1st` ensures that at least
# one of the sources will be public because sources are indexed starting
# with the first one yielded by the iteration.
list(starmap(update, self._list_bundles(catalog,
min_bundles=num_bundles,
check_all=True,
public_1st=True)))
list(starmap(update, self._list_partitions(catalog,
min_bundles=num_bundles,
public_1st=True)))

# Index some bundles again to test that we handle duplicate additions.
# Note: random.choices() may pick the same element multiple times so
Expand Down Expand Up @@ -1954,10 +1934,13 @@ def test_can_bundle_canned_repository(self):
self._test_catalog(mock_catalog)

def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID:
source, prefix, bundle_fqids = next(self._list_bundles(catalog,
min_bundles=1,
check_all=False,
public_1st=False))
# Skip through empty partitions
bundle_fqids = next(itertools.chain.from_iterable((
bundle_fqids
for _, _, bundle_fqids in self._list_partitions(catalog,
min_bundles=1,
public_1st=False)
)))
return self.random.choice(sorted(bundle_fqids))

def _can_bundle(self,
Expand Down

0 comments on commit a6390ce

Please sign in to comment.