From a6390ce5acf1804daef4d55a8710d20a7c9d51db Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Fri, 13 Sep 2024 11:33:53 -0700 Subject: [PATCH] fixup! [u r] Eliminate RepositoryPlugin.list_partitions (#6531) --- deployments/dev/environment.py | 4 +- scripts/post_deploy_tdr.py | 5 ++- src/azul/azulclient.py | 2 +- src/azul/indexer/__init__.py | 29 ++++++++----- src/azul/plugins/__init__.py | 24 ++++++++--- test/integration_test.py | 79 +++++++++++++--------------------- 6 files changed, 73 insertions(+), 70 deletions(-) diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index 980cf1dc5..58b2d77e7 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -171,8 +171,8 @@ def mkdict(previous_catalog: dict[str, str], ])) lungmap_sources = mkdict({}, 2, mkdelta([ - mksrc('bigquery', 'datarepo-dev-5d9526e0', 'lungmap_dev_1bdcecde16be420888f478cd2133d11d__20220401_20220404', 1), - mksrc('bigquery', 'datarepo-dev-8de6d66b', 'lungmap_dev_2620497955a349b28d2b53e0bdfcb176__20220404_20220404', 1) + mksrc('bigquery', 'datarepo-dev-5d9526e0', 'lungmap_dev_1bdcecde16be420888f478cd2133d11d__20220401_20220404'), + mksrc('bigquery', 'datarepo-dev-8de6d66b', 'lungmap_dev_2620497955a349b28d2b53e0bdfcb176__20220404_20220404') ])) lm2_sources = mkdict(lungmap_sources, 5, mkdelta([ diff --git a/scripts/post_deploy_tdr.py b/scripts/post_deploy_tdr.py index cd2b29160..efc7eca38 100644 --- a/scripts/post_deploy_tdr.py +++ b/scripts/post_deploy_tdr.py @@ -58,7 +58,8 @@ def catalogs(self) -> Collection[CatalogName]: result = [ catalog.name for catalog in config.catalogs.values() - if config.is_tdr_enabled(catalog.name) + if (config.is_tdr_enabled(catalog.name) + and catalog.name not in config.integration_test_catalogs) ] assert result, config.catalogs return result @@ -93,7 +94,7 @@ def verify_source(self, plugin = self.repository_plugin(catalog) ref = plugin.resolve_source(str(source_spec)) log.info('TDR client is authorized for API access to %s.', source_spec) - ref = plugin.partition_source(ref) + ref = plugin.partition_source(catalog, ref) prefix = ref.spec.prefix if config.deployment.is_main: require(prefix.common == '', source_spec) diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index efbb1f14c..2c1f7d877 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -245,7 +245,7 @@ def remote_reindex(self, plugin = self.repository_plugin(catalog) for source in sources: source = plugin.resolve_source(source) - source = plugin.partition_source(source) + source = plugin.partition_source(catalog, source) def message(partition_prefix: str) -> JSON: log.info('Remotely reindexing prefix %r of source %r into catalog %r', diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py index 96a402326..90d42b576 100644 --- a/src/azul/indexer/__init__.py +++ b/src/azul/indexer/__init__.py @@ -80,7 +80,6 @@ class Prefix: of_everything: ClassVar['Prefix'] digits = '0123456789abcdef' - max_partition_size = 8192 def __attrs_post_init__(self): validate_uuid_prefix(self.common) @@ -147,6 +146,8 @@ def parse(cls, prefix: str) -> Self: @classmethod def for_main_source(cls, subgraphs: int) -> Self: """ + A prefix that is expected to rarely exceed 8192 subgraphs per partition + >>> str(Prefix.for_main_source(0)) Traceback (most recent call last): ... @@ -155,26 +156,28 @@ def for_main_source(cls, subgraphs: int) -> Self: >>> str(Prefix.for_main_source(1)) '/0' - >>> m = Prefix.max_partition_size + >>> n = 8192 >>> cases = [-1, 0, 1, 2] - >>> [str(Prefix.for_main_source(m + i)) for i in cases] + >>> [str(Prefix.for_main_source(n + i)) for i in cases] ['/0', '/0', '/1', '/1'] >>> # Sources with this many bundles are very rare, so we have a >>> # generous margin of error surrounding this cutoff point - >>> [str(Prefix.for_main_source(m * 16 + i)) for i in cases] + >>> n *= 16 # 131072 + >>> [str(Prefix.for_main_source(n + i)) for i in cases] ['/1', '/1', '/2', '/2'] """ - partition = max(0, math.ceil(math.log(subgraphs / cls.max_partition_size, 16))) + partition = cls._prefix_length(subgraphs, 8192) return cls(common='', partition=partition) @classmethod def for_other_source(cls, subgraphs: int) -> Self: """ - An experimentally derived formula designed to minimize manual adjustment - of the computed common prefixes, yielding an average of 24 subgraphs per - source. The partition prefix is always 1, even though some partitions - may be empty, to ensure test coverage for handling multiple partitions. + A prefix that yields an average of approximately 24 subgraphs per + source, using an experimentally derived heuristic formula designed to + minimize manual adjustment of the computed common prefixes. The + partition prefix length is always 1, even though some partitions may be + empty, to provide test coverage for handling multiple partitions. >>> str(Prefix.for_other_source(0)) Traceback (most recent call last): @@ -189,15 +192,19 @@ def for_other_source(cls, subgraphs: int) -> Self: >>> [str(Prefix.for_other_source(n + i)) for i in cases] ['/1', '/1', '0/1', '1/1'] - >>> n = 1024 + >>> n *= 16 # 1024 >>> [str(Prefix.for_other_source(n + i)) for i in cases] ['e/1', 'f/1', '00/1', '10/1'] """ digits = f'{subgraphs - 1:x}'[::-1] - length = max(0, math.ceil(math.log(subgraphs, 16) - 1.5)) + length = cls._prefix_length(subgraphs, 64) assert length < len(digits), subgraphs return cls(common=digits[:length], partition=1) + @classmethod + def _prefix_length(cls, n, m) -> int: + return max(0, math.ceil(math.log(n / m, len(cls.digits)))) + def partition_prefixes(self) -> Iterator[str]: """ >>> list(Prefix.parse('/0').partition_prefixes()) diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index a976bd7b0..62485c760 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -511,10 +511,14 @@ def sources(self) -> AbstractSet[SOURCE_SPEC]: raise NotImplementedError def _assert_source(self, source: SOURCE_REF): + """ + Assert that the given source is present in the plugin configuration. + """ assert source.spec.prefix is not None, source for configured_spec in self.sources: if configured_spec == source.spec: break + # Most configured sources lack an explicit prefix elif configured_spec.eq_ignoring_prefix(source.spec): assert configured_spec.prefix is None, (configured_spec, source) break @@ -614,17 +618,25 @@ def _count_subgraphs(self, source: SOURCE_SPEC) -> int: """ raise NotImplementedError - def partition_source(self, source: SOURCE_REF) -> SOURCE_REF: + def partition_source(self, + catalog: CatalogName, + source: SOURCE_REF + ) -> SOURCE_REF: """ - If the source already has a prefix, return it. Otherwise, return an - updated copy of the source with a heuristically computed prefix that - should be appropriate for indexing. + If the source already has a prefix, return the source. Otherwise, return + an updated copy of the source with a heuristically computed prefix that + should be appropriate for indexing in the given catalog. """ if source.spec.prefix is None: count = self._count_subgraphs(source.spec) is_main = config.deployment.is_main - method = Prefix.for_main_source if is_main else Prefix.for_other_source - prefix = method(count) + is_it = catalog in config.integration_test_catalogs + # We use the "other" heuristic during IT to avoid indexing an + # excessive number of bundles + if is_main and not is_it: + prefix = Prefix.for_main_source(count) + else: + prefix = Prefix.for_other_source(count) source = attr.evolve(source, spec=attr.evolve(source.spec, prefix=prefix)) return source diff --git a/test/integration_test.py b/test/integration_test.py index f10ef1c01..35638fc26 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -20,6 +20,7 @@ BytesIO, TextIOWrapper, ) +import itertools from itertools import ( count, starmap, @@ -286,41 +287,17 @@ def managed_access_sources_by_catalog(self managed_access_sources[catalog].add(ref) return managed_access_sources - def _list_partition_bundles(self, - catalog: CatalogName, - source: str - ) -> tuple[SourceRef, str, list[SourcedBundleFQID]]: + def _list_partitions(self, + catalog: CatalogName, + *, + min_bundles: int, + public_1st: bool + ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]: """ - Randomly select a partition of bundles from the specified source and - return the FQIDs of the bundles in that partition. The partition is - guaranteed to contain at least one bundle. + Iterate through the sources in the given catalog and yield partitions of + bundle FQIDs until a desired minimum number of bundles are found. For + each emitted source, every partition is included, even if it's empty. """ - plugin = self.azul_client.repository_plugin(catalog) - source = plugin.resolve_source(source) - source = plugin.partition_source(source) - prefix = source.spec.prefix - partition_prefixes = list(prefix.partition_prefixes()) - self.random.shuffle(partition_prefixes) - for partition_prefix in partition_prefixes: - fqids = self.azul_client.list_bundles(catalog, source, partition_prefix) - # To achieve IT coverage of handling multiple partitions per source, all - # sources on personal and sandbox deployments a use partition prefix of - # 1. The desired maximum number of subgraphs per source for these - # deployments in only 512, so we expect many partitions to be empty. For - # full-scale deployments, the desired maximum partition size is 8192. - # We don't verify/enforce those limits here because the prefixes are - # computed heuristically. - if fqids: - return source, partition_prefix, fqids - assert False, 'All partitions were empty' - - def _list_bundles(self, - catalog: CatalogName, - *, - min_bundles: int, - check_all: bool, - public_1st: bool - ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]: total_bundles = 0 sources = sorted(config.sources(catalog)) self.random.shuffle(sources) @@ -335,18 +312,22 @@ def _list_bundles(self, if source not in managed_access_sources ) sources[0], sources[index] = sources[index], sources[0] + plugin = self.azul_client.repository_plugin(catalog) # This iteration prefers sources occurring first, so we shuffle them # above to neutralize the bias. for source in sources: - source, prefix, new_fqids = self._list_partition_bundles(catalog, source) - if total_bundles < min_bundles: + source = plugin.resolve_source(source) + source = plugin.partition_source(catalog, source) + for prefix in source.spec.prefix.partition_prefixes(): + new_fqids = self.azul_client.list_bundles(catalog, source, prefix) total_bundles += len(new_fqids) yield source, prefix, new_fqids - # If `check_all` is True, keep looping to verify the size of a - # partition for all sources - elif not check_all: + # We postpone this check until after we've yielded all partitions in + # the current source to ensure test coverage for handling multiple + # partitions per source + if total_bundles >= min_bundles: break - if total_bundles < min_bundles: + else: log.warning('Checked all sources and found only %d bundles instead of the ' 'expected minimum %d', total_bundles, min_bundles) @@ -359,7 +340,7 @@ def _list_managed_access_bundles(self, # prefix as possible. for source in self.managed_access_sources_by_catalog[catalog]: assert str(source.spec) in sources - source = self.repository_plugin(catalog).partition_source(source) + source = self.repository_plugin(catalog).partition_source(catalog, source) bundle_fqids = sorted( bundle_fqid for bundle_fqid in self.azul_client.list_bundles(catalog, source, prefix='') @@ -1242,10 +1223,9 @@ def update(source: SourceRef, # public and managed access data. `public_1st` ensures that at least # one of the sources will be public because sources are indexed starting # with the first one yielded by the iteration. - list(starmap(update, self._list_bundles(catalog, - min_bundles=num_bundles, - check_all=True, - public_1st=True))) + list(starmap(update, self._list_partitions(catalog, + min_bundles=num_bundles, + public_1st=True))) # Index some bundles again to test that we handle duplicate additions. # Note: random.choices() may pick the same element multiple times so @@ -1954,10 +1934,13 @@ def test_can_bundle_canned_repository(self): self._test_catalog(mock_catalog) def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID: - source, prefix, bundle_fqids = next(self._list_bundles(catalog, - min_bundles=1, - check_all=False, - public_1st=False)) + # Skip through empty partitions + bundle_fqids = next(itertools.chain.from_iterable(( + bundle_fqids + for _, _, bundle_fqids in self._list_partitions(catalog, + min_bundles=1, + public_1st=False) + ))) return self.random.choice(sorted(bundle_fqids)) def _can_bundle(self,