diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9bcfffeb9cf5..5709a3b82b96 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1736,7 +1736,7 @@ def wait_until_ready(self): def storage_controller_ready(): assert self.ready() is True - wait_until(30, 1, storage_controller_ready) + wait_until(storage_controller_ready) return time.time() - t1 def attach_hook_issue( @@ -2574,7 +2574,7 @@ def complete(): log.info(f"any_unstable={any_unstable}") assert not any_unstable - wait_until(20, 0.5, complete) + wait_until(complete) def __enter__(self) -> Self: return self @@ -3973,7 +3973,7 @@ def check_migrations_done(): migration_id: int = cur.fetchall()[0][0] assert migration_id >= num_migrations - wait_until(20, 0.5, check_migrations_done) + wait_until(check_migrations_done) # Mock the extension part of spec passed from control plane for local testing # endpooint.rs adds content of this file as a part of the spec.json @@ -4489,12 +4489,10 @@ def are_lsns_advanced(): ) assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn() - # xxx: max wait is long because we might be waiting for reconnection from - # pageserver to this safekeeper - wait_until(30, 1, are_lsns_advanced) + wait_until(are_lsns_advanced) client.checkpoint(tenant_id, timeline_id) if wait_wal_removal: - wait_until(30, 1, are_segments_removed) + wait_until(are_segments_removed) def wait_until_paused(self, failpoint: str): msg = f"at failpoint {failpoint}" @@ -4503,7 +4501,7 @@ def paused(): log.info(f"waiting for hitting failpoint {failpoint}") self.assert_log_contains(msg) - wait_until(20, 0.5, paused) + wait_until(paused) class NeonBroker(LogUtils): diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 46700e3fe377..7c10edc5fc33 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -13,7 +13,7 @@ from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient -from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage +from fixtures.remote_storage import RemoteStorage, S3Storage from fixtures.utils import wait_until if TYPE_CHECKING: @@ -269,12 +269,7 @@ def wait_timeline_detail_404( pageserver_http: PageserverHttpClient, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, - iterations: int, - interval: float | None = None, ): - if interval is None: - interval = 0.25 - def timeline_is_missing(): data = {} try: @@ -287,19 +282,17 @@ def timeline_is_missing(): raise RuntimeError(f"Timeline exists state {data.get('state')}") - wait_until(iterations, interval, func=timeline_is_missing) + wait_until(timeline_is_missing) def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, - iterations: int = 20, - interval: float | None = None, **delete_args, ) -> None: pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) - wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval) + wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id) # remote_storage must not be None, but that's easier for callers to make mypy happy @@ -453,7 +446,3 @@ def many_small_layers_tenant_config() -> dict[str, Any]: "checkpoint_distance": 1024**2, "image_creation_threshold": 100, } - - -def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int: - return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 15 diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 094188c0b5f5..286f80ba69f1 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -175,7 +175,7 @@ def timeline_start_lsn_non_zero() -> Lsn: assert s > Lsn(0) return s - return wait_until(30, 1, timeline_start_lsn_non_zero) + return wait_until(timeline_start_lsn_non_zero) def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: return self.timeline_status(tenant_id, timeline_id).commit_lsn diff --git a/test_runner/fixtures/safekeeper/utils.py b/test_runner/fixtures/safekeeper/utils.py index 024691647033..922cdedccc21 100644 --- a/test_runner/fixtures/safekeeper/utils.py +++ b/test_runner/fixtures/safekeeper/utils.py @@ -19,4 +19,4 @@ def walreceivers_absent(): log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}") assert len(status.walreceivers) == 0 - wait_until(30, 0.5, walreceivers_absent) + wait_until(walreceivers_absent) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 04e98fe494db..c34ac298d1cc 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -9,6 +9,7 @@ import threading import time from collections.abc import Callable, Iterable +from datetime import datetime, timedelta from hashlib import sha256 from pathlib import Path from typing import TYPE_CHECKING, Any, TypeVar @@ -380,15 +381,10 @@ def start_in_background( if return_code is not None: error = f"expected subprocess to run but it exited with code {return_code}" else: - attempts = 10 try: - wait_until( - number_of_iterations=attempts, - interval=1, - func=is_started, - ) + wait_until(is_started, timeout=10) except Exception: - error = f"Failed to get correct status from subprocess in {attempts} attempts" + error = "Failed to get correct status from subprocess" except Exception as e: error = f"expected subprocess to start but it failed with exception: {e}" @@ -402,28 +398,31 @@ def start_in_background( def wait_until( - number_of_iterations: int, - interval: float, func: Callable[[], WaitUntilRet], - show_intermediate_error: bool = False, + name: str | None = None, + timeout: float = 20.0, # seconds + interval: float = 0.5, # seconds + status_interval: float = 1.0, # seconds ) -> WaitUntilRet: """ Wait until 'func' returns successfully, without exception. Returns the last return value from the function. """ + if name is None: + name = getattr(func, "__name__", repr(func)) + deadline = datetime.now() + timedelta(seconds=timeout) + next_status = datetime.now() last_exception = None - for i in range(number_of_iterations): + while datetime.now() <= deadline: try: - res = func() + return func() except Exception as e: - log.info("waiting for %s iteration %s failed: %s", func, i + 1, e) + if datetime.now() >= next_status: + log.info("waiting for %s: %s", name, e) + next_status = datetime.now() + timedelta(seconds=status_interval) last_exception = e - if show_intermediate_error: - log.info(e) time.sleep(interval) - continue - return res - raise Exception(f"timed out while waiting for {func}") from last_exception + raise Exception(f"timed out while waiting for {name}") from last_exception def assert_eq(a, b) -> None: diff --git a/test_runner/logical_repl/test_clickhouse.py b/test_runner/logical_repl/test_clickhouse.py index 8e03bbe5d4f1..6b522fa46d22 100644 --- a/test_runner/logical_repl/test_clickhouse.py +++ b/test_runner/logical_repl/test_clickhouse.py @@ -60,24 +60,22 @@ def test_clickhouse(remote_pg: RemotePostgres): "SETTINGS materialized_postgresql_tables_list = 'table1';" ) wait_until( - 120, - 0.5, lambda: query_clickhouse( client, "select * from db1_postgres.table1 order by 1", "ee600d8f7cd05bd0b169fa81f44300a9dd10085a", ), + timeout=60, ) cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');") conn.commit() wait_until( - 120, - 0.5, lambda: query_clickhouse( client, "select * from db1_postgres.table1 order by 1", "9eba2daaf7e4d7d27ac849525f68b562ab53947d", ), + timeout=60, ) log.debug("Sleeping before final checking if Neon is still alive") time.sleep(3) diff --git a/test_runner/logical_repl/test_debezium.py b/test_runner/logical_repl/test_debezium.py index d2cb087c92f5..8023d64d3d73 100644 --- a/test_runner/logical_repl/test_debezium.py +++ b/test_runner/logical_repl/test_debezium.py @@ -148,14 +148,12 @@ def test_debezium(debezium): ) conn.commit() wait_until( - 100, - 0.5, lambda: get_kafka_msg( consumer, ts_ms, after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"}, ), - show_intermediate_error=True, + timeout=60, ) ts_ms = time.time() * 1000 log.info("Insert 2 ts_ms: %s", ts_ms) @@ -165,28 +163,24 @@ def test_debezium(debezium): ) conn.commit() wait_until( - 100, - 0.5, lambda: get_kafka_msg( consumer, ts_ms, after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"}, ), - show_intermediate_error=True, + timeout=60, ) ts_ms = time.time() * 1000 log.info("Update ts_ms: %s", ts_ms) cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2") conn.commit() wait_until( - 100, - 0.5, lambda: get_kafka_msg( consumer, ts_ms, after={"first_name": "Alexander"}, ), - show_intermediate_error=True, + timeout=60, ) time.sleep(3) cur.execute("select 1") diff --git a/test_runner/performance/test_branch_creation.py b/test_runner/performance/test_branch_creation.py index c50c4ad4324e..3ce27d6cd3eb 100644 --- a/test_runner/performance/test_branch_creation.py +++ b/test_runner/performance/test_branch_creation.py @@ -137,7 +137,7 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: startup_line = "INFO version: git(-env)?:" # find the first line of the log file so we can find the next start later - _, first_start = wait_until(5, 1, lambda: env.pageserver.assert_log_contains(startup_line)) + _, first_start = wait_until(lambda: env.pageserver.assert_log_contains(startup_line)) # start without gc so we can time compaction with less noise; use shorter # period for compaction so it starts earlier @@ -156,7 +156,7 @@ def patch_default_tenant_config(config): ) _, second_start = wait_until( - 5, 1, lambda: env.pageserver.assert_log_contains(startup_line, first_start) + lambda: env.pageserver.assert_log_contains(startup_line, first_start), ) env.pageserver.quiesce_tenants() @@ -164,8 +164,6 @@ def patch_default_tenant_config(config): # wait for compaction to complete, which most likely has already done so multiple times msg, _ = wait_until( - 30, - 1, lambda: env.pageserver.assert_log_contains( f".*tenant_id={env.initial_tenant}.*: compaction iteration complete.*", second_start ), @@ -205,7 +203,7 @@ def metrics_are_filled() -> list[Sample]: assert len(matching) == len(expected_labels) return matching - samples = wait_until(10, 1, metrics_are_filled) + samples = wait_until(metrics_are_filled) for sample in samples: phase = sample.labels["phase"] diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 670c2698f5aa..45112fd67e2a 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -64,8 +64,6 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N ) wait_until( - 50, - 0.1, lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"), ) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 302a8fd0d1b4..b6741aed68cb 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -385,7 +385,7 @@ def assert_broken(): # Wait for enough failures to break the circuit breaker # This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s - wait_until(60, 1, assert_broken) + wait_until(assert_broken, timeout=60) # Sleep for a while, during which time we expect that compaction will _not_ be retried time.sleep(10) diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 18075110082b..05956b5b9378 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -211,7 +211,7 @@ def statvfs_called(): pageserver.assert_log_contains(".*running mocked statvfs.*") # we most likely have already completed multiple runs - wait_until(10, 1, statvfs_called) + wait_until(statvfs_called) def count_layers_per_tenant( @@ -772,14 +772,14 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): ) wait_until( - 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") ) def less_than_max_usage_pct(): post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) assert post_eviction_total_size < 0.33 * total_size, "we requested max 33% usage" - wait_until(2, 2, less_than_max_usage_pct) + wait_until(less_than_max_usage_pct, timeout=5) # Disk usage candidate collection only takes into account active tenants. # However, the statvfs call takes into account the entire tenants directory, @@ -825,7 +825,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): ) wait_until( - 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved"), ) def more_than_min_avail_bytes_freed(): @@ -834,7 +834,7 @@ def more_than_min_avail_bytes_freed(): total_size - post_eviction_total_size >= min_avail_bytes ), f"we requested at least {min_avail_bytes} worth of free space" - wait_until(2, 2, more_than_min_avail_bytes_freed) + wait_until(more_than_min_avail_bytes_freed, timeout=5) def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv): diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 0b1ac11c1653..4044f25b37b8 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -257,7 +257,7 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # Wait until we see that the pgbench_accounts is created + filled on replica *and* # index is created. Otherwise index creation would conflict with # read queries and hs feedback won't save us. - wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary)) + wait_until(partial(pgbench_accounts_initialized, secondary), timeout=60) # Test should fail if hs feedback is disabled anyway, but cross # check that walproposer sets some xmin. @@ -269,7 +269,7 @@ def xmin_is_not_null(): log.info(f"xmin is {slot_xmin}") assert int(slot_xmin) > 0 - wait_until(10, 1.0, xmin_is_not_null) + wait_until(xmin_is_not_null) for _ in range(1, 5): # in debug mode takes about 5-7s balance = secondary.safe_psql_scalar("select sum(abalance) from pgbench_accounts") @@ -286,7 +286,7 @@ def xmin_is_null(): log.info(f"xmin is {slot_xmin}") assert slot_xmin is None - wait_until(10, 1.0, xmin_is_null) + wait_until(xmin_is_null) # Test race condition between WAL replay and backends performing queries diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 761ec7568f4b..8818b407122b 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -206,7 +206,7 @@ def future_layer_is_gone_from_index_part(): future_layers = set(get_future_layers()) assert future_layer not in future_layers - wait_until(10, 0.5, future_layer_is_gone_from_index_part) + wait_until(future_layer_is_gone_from_index_part) # We already make deletion stuck here, but we don't necessarily hit the failpoint # because deletions are batched. diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py index f6fbdcabfd9d..d94c786f4983 100644 --- a/test_runner/regress/test_logging.py +++ b/test_runner/regress/test_logging.py @@ -37,7 +37,7 @@ def assert_logged(): return env.pageserver.assert_log_contains(f".*{msg_id}.*") - wait_until(10, 0.5, assert_logged) + wait_until(assert_logged) # make sure it's counted def assert_metric_value(): @@ -49,4 +49,4 @@ def assert_metric_value(): log.info("libmetrics_tracing_event_count: %s", val) assert val > (before or 0.0) - wait_until(10, 1, assert_metric_value) + wait_until(assert_metric_value) diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index ba471b7147d7..db18e1758c12 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -207,7 +207,7 @@ def slot_removed(ep: Endpoint): log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") - wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint)) + wait_until(partial(slot_removed, endpoint)) def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonEnvBuilder): @@ -519,7 +519,7 @@ def check_that_changes_propagated(): assert len(res) == 4 assert [r[0] for r in res] == [10, 20, 30, 40] - wait_until(10, 0.5, check_that_changes_propagated) + wait_until(check_that_changes_propagated) def logical_replication_wait_flush_lsn_sync(publisher: PgProtocol) -> Lsn: @@ -549,7 +549,7 @@ def check_caughtup(): ) assert flush_lsn >= publisher_flush_lsn - wait_until(30, 0.5, check_caughtup) + wait_until(check_caughtup) return publisher_flush_lsn diff --git a/test_runner/regress/test_lsn_mapping.py b/test_runner/regress/test_lsn_mapping.py index 7f0b54112820..e42e71646d97 100644 --- a/test_runner/regress/test_lsn_mapping.py +++ b/test_runner/regress/test_lsn_mapping.py @@ -169,7 +169,7 @@ def test_get_lsn_by_timestamp_cancelled(neon_env_builder: NeonEnvBuilder): ) _, offset = wait_until( - 20, 0.5, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") + lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") ) with pytest.raises(ReadTimeout): @@ -178,8 +178,6 @@ def test_get_lsn_by_timestamp_cancelled(neon_env_builder: NeonEnvBuilder): client.configure_failpoints((failpoint, "off")) _, offset = wait_until( - 20, - 0.5, lambda: env.pageserver.assert_log_contains( "Cancelled request finished with an error: Cancelled$", offset ), diff --git a/test_runner/regress/test_neon_superuser.py b/test_runner/regress/test_neon_superuser.py index 7118127a1ffe..49cd91906f8f 100644 --- a/test_runner/regress/test_neon_superuser.py +++ b/test_runner/regress/test_neon_superuser.py @@ -77,7 +77,7 @@ def check_that_changes_propagated(): assert len(res) == 4 assert [r[0] for r in res] == [10, 20, 30, 40] - wait_until(10, 0.5, check_that_changes_propagated) + wait_until(check_that_changes_propagated) # Test that pg_monitor is working for neon_superuser role cur.execute("SELECT query from pg_stat_activity LIMIT 1") diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index e1caaeb6c1f6..028d1c2e49b8 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -256,7 +256,7 @@ def get_resident_physical_size(): ##### Second start, restore the data and ensure it's the same env.pageserver.start() - wait_until(10, 0.2, lambda: assert_tenant_state(client, tenant_id, "Active")) + wait_until(lambda: assert_tenant_state(client, tenant_id, "Active")) # The current_physical_size reports the sum of layers loaded in the layer # map, regardless of where the layer files are located. So even though we @@ -413,7 +413,7 @@ def get_resident_physical_size(): ] ) - wait_until(10, 0.2, lambda: assert_tenant_state(client, tenant_id, "Active")) + wait_until(lambda: assert_tenant_state(client, tenant_id, "Active")) ###### Phase 1: exercise download error code path @@ -705,7 +705,7 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu ) _, offset = wait_until( - 20, 0.5, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") + lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") ) location_conf = {"mode": "Detached", "tenant_conf": {}} @@ -713,8 +713,6 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu detach = exec.submit(client.tenant_location_conf, env.initial_tenant, location_conf) _, offset = wait_until( - 20, - 0.5, lambda: env.pageserver.assert_log_contains( "closing is taking longer than expected", offset ), @@ -734,8 +732,6 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu client.configure_failpoints((failpoint, "pause")) _, offset = wait_until( - 20, - 0.5, lambda: env.pageserver.assert_log_contains(f"cfg failpoint: {failpoint} pause", offset), ) @@ -750,8 +746,6 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu warmup = exec.submit(client.tenant_secondary_download, env.initial_tenant, wait_ms=30000) _, offset = wait_until( - 20, - 0.5, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}", offset), ) @@ -805,7 +799,7 @@ def test_layer_download_timeouted(neon_env_builder: NeonEnvBuilder): ) _, offset = wait_until( - 20, 0.5, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") + lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") ) # ensure enough time while paused to trip the timeout time.sleep(2) @@ -824,8 +818,6 @@ def test_layer_download_timeouted(neon_env_builder: NeonEnvBuilder): # capture the next offset for a new synchronization with the failpoint _, offset = wait_until( - 20, - 0.5, lambda: env.pageserver.assert_log_contains(f"cfg failpoint: {failpoint} pause", offset), ) diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 05e81b82e07a..55fd7a8608b4 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -117,19 +117,11 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # We need to wait here because it's possible that we don't have access to # the latest WAL yet, when the `timeline_detail` API is first called. # See: https://github.com/neondatabase/neon/issues/1768. - lsn = wait_until( - number_of_iterations=5, - interval=1, - func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None), - ) + lsn = wait_until(lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None)) # Make a DB modification then expect getting a new WAL receiver's data. endpoint.safe_psql("INSERT INTO t VALUES (1, 'hey')") - wait_until( - number_of_iterations=5, - interval=1, - func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn), - ) + wait_until(lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn)) def test_pageserver_http_api_client(neon_simple_env: NeonEnv): diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 6ba5753420c7..7e5bb45242ff 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -352,7 +352,7 @@ def test_deletion_queue_recovery( def assert_some_validations(): assert get_deletion_queue_validated(ps_http) > 0 - wait_until(20, 1, assert_some_validations) + wait_until(assert_some_validations) # The validatated keys statistic advances before the header is written, so we # also wait to see the header hit the disk: this seems paranoid but the race @@ -360,7 +360,7 @@ def assert_some_validations(): def assert_header_written(): assert (main_pageserver.workdir / "deletion" / "header-01").exists() - wait_until(20, 1, assert_header_written) + wait_until(assert_header_written) # If we will lose attachment, then our expectation on restart is that only the ones # we already validated will execute. Act like only those were present in the queue. @@ -382,11 +382,11 @@ def assert_deletions_submitted(n: int) -> None: # After restart, issue a flush to kick the deletion frontend to do recovery. # It should recover all the operations we submitted before the restart. ps_http.deletion_queue_flush(execute=False) - wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth)) + wait_until(lambda: assert_deletions_submitted(before_restart_depth)) # The queue should drain through completely if we flush it ps_http.deletion_queue_flush(execute=True) - wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0)) + wait_until(lambda: assert_deletion_queue(ps_http, lambda n: n == 0)) if keep_attachment == KeepAttachment.KEEP: # - If we kept the attachment, then our pre-restart deletions should execute @@ -564,7 +564,7 @@ def test_multi_attach( ) # Initially, the tenant will be attached to the first pageserver (first is default in our test harness) - wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[0], tenant_id, "Active")) + wait_until(lambda: assert_tenant_state(http_clients[0], tenant_id, "Active")) _detail = http_clients[0].timeline_detail(tenant_id, timeline_id) with pytest.raises(PageserverApiException): http_clients[1].timeline_detail(tenant_id, timeline_id) @@ -579,8 +579,8 @@ def test_multi_attach( pageservers[1].tenant_attach(env.initial_tenant) pageservers[2].tenant_attach(env.initial_tenant) - wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[1], tenant_id, "Active")) - wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[2], tenant_id, "Active")) + wait_until(lambda: assert_tenant_state(http_clients[1], tenant_id, "Active")) + wait_until(lambda: assert_tenant_state(http_clients[2], tenant_id, "Active")) # Now they all have it attached _details = list([c.timeline_detail(tenant_id, timeline_id) for c in http_clients]) diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index f1aad85fe98a..ba6a1d90451e 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -81,9 +81,7 @@ def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: i marker = uuid.uuid4().hex ps_http.post_tracing_event("info", marker) - _, marker_offset = wait_until( - 10, 0.5, lambda: env.pageserver.assert_log_contains(marker, offset=None) - ) + _, marker_offset = wait_until(lambda: env.pageserver.assert_log_contains(marker, offset=None)) log.info("run pagebench") duration_secs = 10 @@ -103,12 +101,11 @@ def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: i log.info("validate that we logged the throttling") wait_until( - 10, - compaction_period / 10, lambda: env.pageserver.assert_log_contains( f".*{tenant_id}.*shard was throttled in the last n_seconds.*", offset=marker_offset, ), + timeout=compaction_period, ) log.info("validate that the metric doesn't include throttle wait time") diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index f6a7bfa1ade5..706da1e35e00 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -84,7 +84,7 @@ def query(): # The metric gets initialised on the first update. # Retry a few times, but return 0 if it's stable. try: - return float(wait_until(3, 0.5, query)) + return float(wait_until(query, timeout=2, interval=0.5)) except Exception: return 0 @@ -131,7 +131,7 @@ def test_pageserver_small_inmemory_layers( wait_until_pageserver_is_caught_up(env, last_flush_lsns) # We didn't write enough data to trigger a size-based checkpoint: we should see dirty data. - wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) + wait_until(lambda: assert_dirty_bytes_nonzero(env)) ps_http_client = env.pageserver.http_client() total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client) @@ -139,7 +139,7 @@ def test_pageserver_small_inmemory_layers( # Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed, # such that there are zero bytes of ephemeral layer left on the pageserver log.info("Waiting for background checkpoints...") - wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) + wait_until(lambda: assert_dirty_bytes(env, 0), timeout=2 * CHECKPOINT_TIMEOUT_SECONDS) # Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they # must be uploaded to remain visible to the pageserver after restart. @@ -180,7 +180,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): wait_until_pageserver_is_caught_up(env, last_flush_lsns) # We didn't write enough data to trigger a size-based checkpoint: we should see dirty data. - wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) + wait_until(lambda: assert_dirty_bytes_nonzero(env)) # Stop the safekeepers, so that we cannot have any more WAL receiver connections for sk in env.safekeepers: @@ -193,7 +193,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): # Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed, # such that there are zero bytes of ephemeral layer left on the pageserver log.info("Waiting for background checkpoints...") - wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) + wait_until(lambda: assert_dirty_bytes(env, 0), timeout=2 * CHECKPOINT_TIMEOUT_SECONDS) # The code below verifies that we do not flush on the first write # after an idle period longer than the checkpoint timeout. @@ -210,7 +210,7 @@ def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder): run_worker_for_tenant(env, 5, tenant_with_extra_writes, offset=ENTRIES_PER_TIMELINE) ) - dirty_after_write = wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) + dirty_after_write = wait_until(lambda: assert_dirty_bytes_nonzero(env)) # We shouldn't flush since we've just opened a new layer waited_for = 0 @@ -305,11 +305,11 @@ def assert_bytes_rolled(): # Wait until enough layers have rolled that the amount of dirty data is under the threshold. # We do this indirectly via layer maps, rather than the dirty bytes metric, to avoid false-passing # if that metric isn't updated quickly enough to reflect the dirty bytes exceeding the limit. - wait_until(compaction_period_s * 2, 1, assert_bytes_rolled) + wait_until(assert_bytes_rolled, timeout=2 * compaction_period_s) # The end state should also have the reported metric under the limit def assert_dirty_data_limited(): dirty_bytes = get_dirty_bytes(env) assert dirty_bytes < max_dirty_data - wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited()) + wait_until(lambda: assert_dirty_data_limited(), timeout=2 * compaction_period_s) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 4bf570551731..835ccbd5d430 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -103,7 +103,7 @@ def assert_complete(): raise AssertionError("No 'complete' metric yet") - wait_until(30, 1.0, assert_complete) + wait_until(assert_complete) # Expectation callbacks: arg t is sample value, arg p is the previous phase's sample value expectations = [ diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index a264f4d3c9c2..1292682f9e3d 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -356,7 +356,7 @@ def caught_up(): ) assert destination_lsn >= origin_lsn - wait_until(100, 0.1, caught_up) + wait_until(caught_up) # The destination should accept writes workload.churn_rows(64, pageserver_b.id) @@ -411,7 +411,7 @@ def blocked_deletions_drained(): assert submitted is not None assert submitted > 0 - wait_until(10, 0.1, blocked_deletions_drained) + wait_until(blocked_deletions_drained) workload.churn_rows(64, pageserver_b.id) workload.validate(pageserver_b.id) @@ -702,7 +702,7 @@ def await_log(pageserver, deadline, expression): else: timeout = int(deadline - now) + 1 try: - wait_until(timeout, 1, lambda: pageserver.assert_log_contains(expression)) + wait_until(lambda: pageserver.assert_log_contains(expression), timeout=timeout) except: log.error(f"Timed out waiting for '{expression}'") raise diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index 70d558ac5af0..c13bea7ee178 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -215,8 +215,6 @@ def trigger_gc_and_select( # wait for lease renewal before running query. _, offset = wait_until( - 20, - 0.5, lambda: ep_static.assert_log_contains( "lsn_lease_bg_task.*Request succeeded", offset=offset ), diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 137e75f78446..76a42ef4a2a2 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -300,9 +300,9 @@ def get_queued_count(file_kind, op_kind): print_gc_result(gc_result) assert gc_result["layers_removed"] > 0 - wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) - wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until(lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) + wait_until(lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # let all future operations queue up configure_storage_sync_failpoints("return") @@ -333,16 +333,28 @@ def churn_while_failpoints_active(result): # wait for churn thread's data to get stuck in the upload queue # Exponential back-off in upload queue, so, gracious timeouts. - wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1)) - wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until( + lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0), timeout=30 + ) + wait_until( + lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1), timeout=30 + ) + wait_until( + lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0), timeout=30 + ) # unblock churn operations configure_storage_sync_failpoints("off") - wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) - wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until( + lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0), timeout=30 + ) + wait_until( + lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0), timeout=30 + ) + wait_until( + lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0), timeout=30 + ) # The churn thread doesn't make progress once it blocks on the first wait_completion() call, # so, give it some time to wrap up. @@ -580,7 +592,7 @@ def assert_compacted_and_uploads_queued(): > 0 ) - wait_until(200, 0.1, assert_compacted_and_uploads_queued) + wait_until(assert_compacted_and_uploads_queued) # Regardless, give checkpoint some time to block for good. # Not strictly necessary, but might help uncover failure modes in the future. @@ -598,9 +610,7 @@ def assert_compacted_and_uploads_queued(): ] ) - # Generous timeout, because currently deletions can get blocked waiting for compaction - # This can be reduced when https://github.com/neondatabase/neon/issues/4998 is fixed. - timeline_delete_wait_completed(client, tenant_id, timeline_id, iterations=30, interval=1) + timeline_delete_wait_completed(client, tenant_id, timeline_id) assert not timeline_path.exists() @@ -826,22 +836,16 @@ def wait_upload_queue_empty( client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId ): wait_until( - 2, - 1, lambda: assert_eq( get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="upload"), 0 ), ) wait_until( - 2, - 1, lambda: assert_eq( get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload"), 0 ), ) wait_until( - 2, - 1, lambda: assert_eq( get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="delete"), 0 ), diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py index 8e7c01f95029..e2a22cc769f7 100644 --- a/test_runner/regress/test_replica_start.py +++ b/test_runner/regress/test_replica_start.py @@ -378,7 +378,7 @@ def check_replica_crashed(): return None raise RuntimeError("connection succeeded") - wait_until(20, 0.5, check_replica_crashed) + wait_until(check_replica_crashed) assert secondary.log_contains("too many KnownAssignedXids") # Replica is crashed, so ignore stop result diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 411574bd8621..c86ba0d4ea65 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -836,7 +836,7 @@ def assert_restart_notification(): assert len(notifications) == 3 assert notifications[2] == expect_after - wait_until(10, 1, assert_restart_notification) + wait_until(assert_restart_notification) # The quantity of data isn't huge, but debug can be _very_ slow, and the things we're @@ -1025,7 +1025,7 @@ def assert_all_disk_consistent(): assert Lsn(timeline_detail["disk_consistent_lsn"]) >= expect_lsn # We set a short checkpoint timeout: expect things to get frozen+flushed within that - wait_until(checkpoint_interval_secs * 3, 1, assert_all_disk_consistent) + wait_until(assert_all_disk_consistent, timeout=3 * checkpoint_interval_secs) def assert_all_remote_consistent(): """ @@ -1037,7 +1037,7 @@ def assert_all_remote_consistent(): assert Lsn(timeline_detail["remote_consistent_lsn"]) >= expect_lsn # We set a short checkpoint timeout: expect things to get frozen+flushed within that - wait_until(checkpoint_interval_secs * 3, 1, assert_all_remote_consistent) + wait_until(assert_all_remote_consistent, timeout=3 * checkpoint_interval_secs) workload.validate() @@ -1405,14 +1405,14 @@ def finish_split(): # e.g. while waiting for a storage controller to re-attach a parent shard if we failed # inside the pageserver and the storage controller responds by detaching children and attaching # parents concurrently (https://github.com/neondatabase/neon/issues/7148) - wait_until(10, 1, lambda: workload.churn_rows(10, upload=False, ingest=False)) + wait_until(lambda: workload.churn_rows(10, upload=False, ingest=False)) workload.validate() if failure.fails_forward(env): log.info("Fail-forward failure, checking split eventually completes...") # A failure type which results in eventual completion of the split - wait_until(30, 1, assert_split_done) + wait_until(assert_split_done) elif failure.can_mitigate(): log.info("Mitigating failure...") # Mitigation phase: we expect to be able to proceed with a successful shard split @@ -1420,21 +1420,21 @@ def finish_split(): # The split should appear to be rolled back from the point of view of all pageservers # apart from the one that is offline - wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id)) + wait_until(lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id)) finish_split() - wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id)) + wait_until(lambda: assert_split_done(exclude_ps_id=failure.pageserver_id)) # Having cleared the failure, everything should converge to a pristine state failure.clear(env) - wait_until(30, 1, assert_split_done) + wait_until(assert_split_done) else: # Once we restore the faulty pageserver's API to good health, rollback should # eventually complete. log.info("Clearing failure...") failure.clear(env) - wait_until(30, 1, assert_rolled_back) + wait_until(assert_rolled_back) # Having rolled back, the tenant should be working workload.churn_rows(10) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 13bc54a1146d..e93e251b4fa7 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -154,7 +154,7 @@ def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, tenant_ids) assert counts[node_id] == 0 - wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + wait_until(lambda: node_evacuated(env.pageservers[0].id)) # Let all the reconciliations after marking the node offline complete env.storage_controller.reconcile_until_idle() @@ -222,7 +222,7 @@ def test_node_status_after_restart( def is_ready(): assert env.storage_controller.ready() is True - wait_until(30, 1, is_ready) + wait_until(is_ready) # We loaded nodes from database on restart nodes = env.storage_controller.node_list() @@ -606,7 +606,7 @@ def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, [env.initial_tenant]) assert counts[node_id] == 0 - wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + wait_until(lambda: node_evacuated(env.pageservers[0].id)) # Additional notification from migration log.info(f"notifications: {notifications}") @@ -620,7 +620,7 @@ def received_migration_notification(): assert len(notifications) == 2 assert notifications[1] == expect - wait_until(20, 0.25, received_migration_notification) + wait_until(received_migration_notification) # When we restart, we should re-emit notifications for all tenants env.storage_controller.stop() @@ -630,7 +630,7 @@ def received_restart_notification(): assert len(notifications) == 3 assert notifications[2] == expect - wait_until(10, 1, received_restart_notification) + wait_until(received_restart_notification) # Splitting a tenant should cause its stripe size to become visible in the compute notification env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=2) @@ -647,7 +647,7 @@ def received_split_notification(): assert len(notifications) == 4 assert notifications[3] == expect - wait_until(10, 1, received_split_notification) + wait_until(received_split_notification) # If the compute hook is unavailable, that should not block creating a tenant and # creating a timeline. This simulates a control plane refusing to accept notifications @@ -736,7 +736,7 @@ def handler(request: Request): def logged_stuck(): env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG) - wait_until(10, 0.25, logged_stuck) + wait_until(logged_stuck) contains_r = env.storage_controller.log_contains(NOTIFY_BLOCKED_LOG) assert contains_r is not None # Appease mypy (_, log_cursor) = contains_r @@ -764,7 +764,7 @@ def logged_stuck(): def logged_stuck_again(): env.storage_controller.assert_log_contains(NOTIFY_BLOCKED_LOG, offset=log_cursor) - wait_until(10, 0.25, logged_stuck_again) + wait_until(logged_stuck_again) assert migrate_fut.running() # This time, the compute hook remains stuck, but we mark the origin node offline: this should @@ -865,7 +865,7 @@ def notified_ps(ps_id: int) -> None: assert latest["shards"] is not None assert latest["shards"][0]["node_id"] == ps_id - wait_until(30, 1, lambda: notified_ps(pageserver_a.id)) + wait_until(lambda: notified_ps(pageserver_a.id)) env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG) env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS) @@ -880,7 +880,7 @@ def notified_ps(ps_id: int) -> None: # Although the migration API failed, the hook should still see pageserver B (it remembers what # was posted even when returning an error code) - wait_until(30, 1, lambda: notified_ps(pageserver_b.id)) + wait_until(lambda: notified_ps(pageserver_b.id)) # Although the migration API failed, the tenant should still have moved to the right pageserver assert len(pageserver_b.http_client().tenant_list()) == 1 @@ -898,7 +898,7 @@ def notified_ps(ps_id: int) -> None: def logged_giving_up(): env.storage_controller.assert_log_contains(".*Giving up on compute notification.*") - wait_until(30, 1, logged_giving_up) + wait_until(logged_giving_up) pageserver_a.start() @@ -919,7 +919,7 @@ def logged_giving_up(): handle_params["status"] = 200 env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_a.id) - wait_until(30, 1, lambda: notified_ps(pageserver_a.id)) + wait_until(lambda: notified_ps(pageserver_a.id)) def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): @@ -1453,7 +1453,7 @@ def tenants_placed(): # Check that each node got one tenant assert all(len(ts) == 1 for ts in node_to_tenants.values()) - wait_until(10, 1, tenants_placed) + wait_until(tenants_placed) # ... then we apply the failure offline_node_ids = set(failure.nodes()) @@ -1476,7 +1476,7 @@ def nodes_offline(): assert node["availability"] == "Offline" start = time.time() - wait_until(failure.offline_timeout, 1, nodes_offline) + wait_until(nodes_offline, timeout=failure.offline_timeout) detected_after = time.time() - start log.info(f"Detected node failures after {detected_after}s") @@ -1497,7 +1497,7 @@ def tenant_migrated(): assert observed_tenants == set(tenant_ids) - wait_until(10, 1, tenant_migrated) + wait_until(tenant_migrated) # ... then we clear the failure failure.clear(env) @@ -1509,7 +1509,7 @@ def nodes_online(): if node["id"] in online_node_ids: assert node["availability"] == "Active" - wait_until(10, 1, nodes_online) + wait_until(nodes_online) time.sleep(5) @@ -1562,7 +1562,7 @@ def failed_over(): # We could pre-empty this by configuring the node to Offline, but it's preferable to test # the realistic path we would take when a node restarts uncleanly. # The delay here will be ~NEON_LOCAL_MAX_UNAVAILABLE_INTERVAL in neon_local - wait_until(30, 1, failed_over) + wait_until(failed_over) reconciles_before_restart = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} @@ -1640,12 +1640,12 @@ def assert_errors_gt(n) -> int: assert e > n return e - errs = wait_until(10, 1, lambda: assert_errors_gt(0)) + errs = wait_until(lambda: assert_errors_gt(0)) # Try reconciling again, it should fail again with pytest.raises(StorageControllerApiException): env.storage_controller.reconcile_all() - errs = wait_until(10, 1, lambda: assert_errors_gt(errs)) + errs = wait_until(lambda: assert_errors_gt(errs)) # Configure the tenant to disable reconciles env.storage_controller.tenant_policy_update( @@ -1674,7 +1674,7 @@ def assert_ok_gt(n) -> int: return o # We should see a successful reconciliation - wait_until(10, 1, lambda: assert_ok_gt(0)) + wait_until(lambda: assert_ok_gt(0)) # And indeed the tenant should be attached assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1 @@ -2073,7 +2073,7 @@ def secondary_is_lagging(): raise Exception(f"Secondary lag not big enough: {lag}") log.info(f"Looking for lag to develop on the secondary {secondary}") - wait_until(10, 1, secondary_is_lagging) + wait_until(secondary_is_lagging) log.info(f"Starting drain of primary {primary} with laggy secondary {secondary}") env.storage_controller.retryable_node_operation( @@ -2107,7 +2107,7 @@ def lag_is_acceptable(): if lag > 1 * 1024 * 1024: raise Exception(f"Secondary lag not big enough: {lag}") - wait_until(10, 1, lag_is_acceptable) + wait_until(lag_is_acceptable) env.storage_controller.node_configure(primary, {"scheduling": "Active"}) @@ -2227,7 +2227,7 @@ def assert_shards_migrated(): log.info(f"Shards on nodes other than on victim: {elsewhere}") assert elsewhere == tenant_count * shard_count_per_tenant - wait_until(30, 1, assert_shards_migrated) + wait_until(assert_shards_migrated) log.info(f"Deleting pageserver {victim.id}") env.storage_controller.node_delete(victim.id) @@ -2240,7 +2240,7 @@ def assert_victim_evacuated(): log.info(f"Shards on node {victim.id}: {count}") assert count == 0 - wait_until(30, 1, assert_victim_evacuated) + wait_until(assert_victim_evacuated) # The node should be gone from the list API assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] @@ -2569,7 +2569,7 @@ def previous_stepped_down(): == StorageControllerLeadershipStatus.STEPPED_DOWN ) - wait_until(5, 1, previous_stepped_down) + wait_until(previous_stepped_down) storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") @@ -2579,7 +2579,7 @@ def new_becomes_leader(): == StorageControllerLeadershipStatus.LEADER ) - wait_until(15, 1, new_becomes_leader) + wait_until(new_becomes_leader) leader = env.storage_controller.get_leader() assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/" @@ -2624,7 +2624,7 @@ def attached_is_draining(): env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)")) env.storage_controller.node_drain(attached.id) - wait_until(10, 0.5, attached_is_draining) + wait_until(attached_is_draining) attached.restart() @@ -2646,7 +2646,7 @@ def reconfigure_node_again(): env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"}) # allow for small delay between actually having cancelled and being able reconfigure again - wait_until(4, 0.5, reconfigure_node_again) + wait_until(reconfigure_node_again) def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder): @@ -2691,7 +2691,7 @@ def has_hit_failpoint(): ps.log_contains(f"at failpoint {failpoint}") is not None for ps in env.pageservers ) - wait_until(10, 1, has_hit_failpoint) + wait_until(has_hit_failpoint) # Migrate the tenant while the timeline creation is in progress: this migration will complete once it # can detach from the old pageserver, which will happen once the failpoint completes. @@ -2775,7 +2775,7 @@ def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvB def has_hit_compaction_failpoint(): assert origin_pageserver.log_contains(f"at failpoint {compaction_failpoint}") - wait_until(10, 1, has_hit_compaction_failpoint) + wait_until(has_hit_compaction_failpoint) # While the compaction is running, start a live migration which will pause long enough for the compaction to sleep, # after incrementing generation and attaching the new location @@ -2794,7 +2794,7 @@ def has_hit_migration_failpoint(): # before it reaches this point. The timeout is because the AttachedStale transition includes # a flush of remote storage, and if the compaction already enqueued an index upload this cannot # make progress. - wait_until(60, 1, has_hit_migration_failpoint) + wait_until(has_hit_migration_failpoint, timeout=60) # Origin pageserver has succeeded with compaction before the migration completed. It has done all the writes it wanted to do in its own (stale) generation origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off")) @@ -2917,7 +2917,7 @@ def has_hit_migration_failpoint(): log.info(expr) assert env.storage_controller.log_contains(expr) - wait_until(10, 1, has_hit_migration_failpoint) + wait_until(has_hit_migration_failpoint) # This request should be routed to whichever pageserver holds the highest generation tenant_info = env.storage_controller.pageserver_api().tenant_status( @@ -2934,7 +2934,7 @@ def has_hit_migration_failpoint(): # We expect request to land on the origin assert tenant_info["generation"] == 1 - wait_until(10, 1, long_migration_metric_published) + wait_until(long_migration_metric_published) # Eventually migration completes env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) @@ -3113,7 +3113,7 @@ def has_hit_migration_failpoint(): log.info(expr) assert env.storage_controller.log_contains(expr) - wait_until(10, 1, has_hit_migration_failpoint) + wait_until(has_hit_migration_failpoint) env.storage_controller.pageserver_api().timeline_delete( tenant_id=tenant_id, timeline_id=timeline_id @@ -3182,7 +3182,7 @@ def has_hit_migration_failpoint(): log.info(expr) assert env.storage_controller.log_contains(expr) - wait_until(10, 1, has_hit_migration_failpoint) + wait_until(has_hit_migration_failpoint) timeline_id = TimelineId.generate() env.storage_controller.pageserver_api().timeline_create( diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 3991bd7061b9..b16dc54c248b 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -431,8 +431,6 @@ def stuck_split(): # Let the controller reach the failpoint wait_until( - 10, - 1, lambda: env.storage_controller.assert_log_contains( 'failpoint "shard-split-post-remote-sleep": sleeping' ), diff --git a/test_runner/regress/test_subscriber_restart.py b/test_runner/regress/test_subscriber_restart.py index d37eeb1e6ebb..7d4f66d04448 100644 --- a/test_runner/regress/test_subscriber_restart.py +++ b/test_runner/regress/test_subscriber_restart.py @@ -56,4 +56,4 @@ def insert_data(pub): pcur.execute(f"INSERT into t values ({n_records}, 0)") n_records += 1 with sub.cursor() as scur: - wait_until(60, 0.5, check_that_changes_propagated) + wait_until(check_that_changes_propagated) diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index 1dd46ec3d111..f8f240cfdcc4 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -234,11 +234,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder): assert not config_path.exists(), "detach did not remove config file" env.pageserver.tenant_attach(tenant_id) - wait_until( - number_of_iterations=5, - interval=1, - func=lambda: assert_tenant_state(http_client, tenant_id, "Active"), - ) + wait_until(lambda: assert_tenant_state(http_client, tenant_id, "Active")) env.config_tenant(tenant_id, {"gc_horizon": "1000000"}) contents_first = config_path.read_text() diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 47df3ead7020..48e55c1ab15b 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -185,21 +185,21 @@ def tenant_is_deleted(): deletion = None try: - wait_until(10, 1, has_hit_failpoint) + wait_until(has_hit_failpoint) # it should start ok, sync up with the stuck creation, then hang waiting for the timeline # to shut down. deletion = Thread(target=start_deletion) deletion.start() - wait_until(10, 1, deletion_has_started_waiting_for_timelines) + wait_until(deletion_has_started_waiting_for_timelines) pageserver_http.configure_failpoints((failpoint, "off")) creation.join() deletion.join() - wait_until(10, 1, tenant_is_deleted) + wait_until(tenant_is_deleted) finally: creation.join() if deletion is not None: @@ -264,7 +264,7 @@ def timeline_create(): def hit_initdb_upload_failpoint(): env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") - wait_until(100, 0.1, hit_initdb_upload_failpoint) + wait_until(hit_initdb_upload_failpoint) def creation_connection_timed_out(): env.pageserver.assert_log_contains( @@ -273,7 +273,7 @@ def creation_connection_timed_out(): # Wait so that we hit the timeout and the connection is dropped # (But timeline creation still continues) - wait_until(100, 0.1, creation_connection_timed_out) + wait_until(creation_connection_timed_out) ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause")) @@ -281,7 +281,7 @@ def tenant_delete(): def tenant_delete_inner(): ps_http.tenant_delete(tenant_id) - wait_until(100, 0.5, tenant_delete_inner) + wait_until(tenant_delete_inner) Thread(target=tenant_delete).start() @@ -290,7 +290,7 @@ def deletion_arrived(): f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause" ) - wait_until(100, 0.1, deletion_arrived) + wait_until(deletion_arrived) ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off")) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 8d7ca7bc4e14..3f21dc895a3b 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -212,7 +212,7 @@ async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: T nonlocal updates_started, updates_finished, updates_to_perform # Wait until we have performed some updates - wait_until(20, 0.5, lambda: updates_finished > 500) + wait_until(lambda: updates_finished > 500) log.info("Detaching tenant") pageserver_http.tenant_detach(tenant_id) @@ -512,7 +512,7 @@ def found_broken(): ) assert only_int(active) == 0 and only_int(broken) == 1 and only_int(broken_set) == 1 - wait_until(10, 0.5, found_broken) + wait_until(found_broken) client.tenant_detach(env.initial_tenant) @@ -524,7 +524,7 @@ def found_cleaned_up(): ) assert only_int(broken) == 0 and len(broken_set) == 0 - wait_until(10, 0.5, found_cleaned_up) + wait_until(found_cleaned_up) env.pageserver.tenant_attach(env.initial_tenant) @@ -536,4 +536,4 @@ def found_active(): ) assert only_int(active) == 1 and len(broken_set) == 0 - wait_until(10, 0.5, found_active) + wait_until(found_active) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index bf6120aa0aab..df53a98e926c 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -298,11 +298,7 @@ def test_tenant_relocation( destination_ps.tenant_attach(tenant_id) # wait for tenant to finish attaching - wait_until( - number_of_iterations=10, - interval=1, - func=lambda: assert_tenant_state(destination_http, tenant_id, "Active"), - ) + wait_until(lambda: assert_tenant_state(destination_http, tenant_id, "Active")) check_timeline_attached( destination_http, diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 8b733da0c67f..713f89c60f6a 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -638,7 +638,7 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder): with ThreadPoolExecutor(max_workers=1) as exec: completion = exec.submit(client.tenant_size, env.initial_tenant) _, last_offset = wait_until( - 10, 1.0, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") + lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}") ) timeline_delete_wait_completed(client, env.initial_tenant, branch_id) @@ -656,8 +656,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder): with ThreadPoolExecutor(max_workers=1) as exec: completion = exec.submit(client.tenant_size, env.initial_tenant) wait_until( - 10, - 1.0, lambda: env.pageserver.assert_log_contains( f"at failpoint {failpoint}", offset=last_offset ), diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 72183f5778b9..4c26b64d22b9 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -77,4 +77,4 @@ def assert_tasks_finish(): assert tasks_started == tasks_ended assert tasks_panicked is None or int(tasks_panicked) == 0 - wait_until(10, 0.2, assert_tasks_finish) + wait_until(assert_tasks_finish) diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 158c3fddb0b3..d31901b384e5 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -330,7 +330,7 @@ def not_attaching(): assert len(tenants) == 1 assert all(t["state"]["slug"] != "Attaching" for t in tenants) - wait_until(10, 0.2, not_attaching) + wait_until(not_attaching) tenants = client.tenant_list() diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 8d3ddf7e54a5..6b27c41d1c3b 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -178,11 +178,7 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder): env.pageserver.start() client = env.pageserver.http_client() - wait_until( - number_of_iterations=5, - interval=1, - func=lambda: assert_tenant_state(client, tenant_id, "Active"), - ) + wait_until(lambda: assert_tenant_state(client, tenant_id, "Active")) restored_timelines = client.timeline_list(tenant_id) assert ( @@ -257,11 +253,7 @@ def test_tenant_redownloads_truncated_file_on_startup( env.pageserver.start() client = env.pageserver.http_client() - wait_until( - number_of_iterations=5, - interval=1, - func=lambda: assert_tenant_state(client, tenant_id, "Active"), - ) + wait_until(lambda: assert_tenant_state(client, tenant_id, "Active")) restored_timelines = client.timeline_list(tenant_id) assert ( diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index bc2e048f6942..5a1e493bbec7 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -227,8 +227,8 @@ def leaf_offloaded(): ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=leaf_timeline_id) assert timeline_offloaded_logged(leaf_timeline_id) - wait_until(30, 1, leaf_offloaded) - wait_until(30, 1, parent_offloaded) + wait_until(leaf_offloaded) + wait_until(parent_offloaded) # Offloaded child timelines should still prevent deletion with pytest.raises( @@ -331,7 +331,7 @@ def child_offloaded(): ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id) assert timeline_offloaded_api(child_timeline_id) - wait_until(30, 1, child_offloaded) + wait_until(child_offloaded) assert timeline_offloaded_api(child_timeline_id) assert not timeline_offloaded_api(root_timeline_id) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 155709e1066d..fbece6836729 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -21,7 +21,6 @@ assert_prefix_empty, assert_prefix_not_empty, many_small_layers_tenant_config, - poll_for_remote_storage_iterations, timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, @@ -94,12 +93,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): assert timeline_path.exists() # retry deletes when compaction or gc is running in pageserver - # TODO: review whether this wait_until is actually necessary, we do an await() internally - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id), - ) + timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id) assert not timeline_path.exists() @@ -111,13 +105,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) assert exc.value.status_code == 404 - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: timeline_delete_wait_completed( - ps_http, env.initial_tenant, parent_timeline_id - ), - ) + timeline_delete_wait_completed(ps_http, env.initial_tenant, parent_timeline_id) # Check that we didn't pick up the timeline again after restart. # See https://github.com/neondatabase/neon/issues/3560 @@ -226,8 +214,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints( ps_http.configure_failpoints((failpoint, "return")) - iterations = poll_for_remote_storage_iterations(remote_storage_kind) - # These failpoints are earlier than background task is spawned. # so they result in api request failure. if failpoint in ( @@ -244,7 +230,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints( tenant_id=env.initial_tenant, timeline_id=timeline_id, expected_state="Broken", - iterations=iterations, + iterations=40, ) reason = timeline_info["state"]["Broken"]["reason"] @@ -257,25 +243,21 @@ def test_delete_timeline_exercise_crash_safety_failpoints( env.pageserver.stop() env.pageserver.start() - wait_until_tenant_active(ps_http, env.initial_tenant, iterations=iterations) + wait_until_tenant_active(ps_http, env.initial_tenant) if failpoint == "timeline-delete-before-index-deleted-at": # We crashed before persisting this to remote storage, need to retry delete request timeline_delete_wait_completed(ps_http, env.initial_tenant, timeline_id) else: # Pageserver should've resumed deletion after restart. - wait_timeline_detail_404( - ps_http, env.initial_tenant, timeline_id, iterations=iterations - ) + wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) elif check is Check.RETRY_WITHOUT_RESTART: # this should succeed # this also checks that delete can be retried even when timeline is in Broken state ps_http.configure_failpoints((failpoint, "off")) - timeline_delete_wait_completed( - ps_http, env.initial_tenant, timeline_id, iterations=iterations - ) + timeline_delete_wait_completed(ps_http, env.initial_tenant, timeline_id) # Check remote is empty if remote_storage_kind is RemoteStorageKind.MOCK_S3: @@ -378,7 +360,7 @@ def test_timeline_resurrection_on_attach( env.pageserver.tenant_attach(tenant_id=tenant_id) - wait_until_tenant_active(ps_http, tenant_id=tenant_id, iterations=10, period=0.5) + wait_until_tenant_active(ps_http, tenant_id=tenant_id) timelines = ps_http.timeline_list(tenant_id=tenant_id) assert {TimelineId(tl["timeline_id"]) for tl in timelines} == { @@ -439,7 +421,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild # Wait for tenant to finish loading. wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) - wait_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id, iterations=4) + wait_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) assert ( not leaf_timeline_path.exists() @@ -481,11 +463,10 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) # for some reason the check above doesnt immediately take effect for the below. - # Assume it is mock server incosistency and check twice. + # Assume it is mock server incosistency and check a few times. wait_until( - 2, - 0.5, lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage), + timeout=2, ) # We deleted our only tenant, and the scrubber fails if it detects nothing @@ -544,7 +525,7 @@ def first_call_hit_failpoint(): f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) - wait_until(50, 0.1, first_call_hit_failpoint) + wait_until(first_call_hit_failpoint, interval=0.1, status_interval=1.0) # make the second call and assert behavior log.info("second call start") @@ -613,7 +594,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): def hit_failpoint(): env.pageserver.assert_log_contains(at_failpoint_log_message) - wait_until(50, 0.1, hit_failpoint) + wait_until(hit_failpoint, interval=0.1) # we log this error if a client hangs up # might as well use it as another indicator that the test works @@ -623,7 +604,7 @@ def hit_failpoint(): def got_hangup_log_message(): env.pageserver.assert_log_contains(hangup_log_message) - wait_until(50, 0.1, got_hangup_log_message) + wait_until(got_hangup_log_message, interval=0.1) # check that the timeline is still present ps_http.timeline_detail(env.initial_tenant, child_timeline_id) @@ -635,10 +616,10 @@ def first_request_finished(): message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished" env.pageserver.assert_log_contains(message) - wait_until(50, 0.1, first_request_finished) + wait_until(first_request_finished, interval=0.1) # check that the timeline is gone - wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=10) + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id) def test_timeline_delete_works_for_remote_smoke( @@ -707,7 +688,7 @@ def test_timeline_delete_works_for_remote_smoke( # for some reason the check above doesnt immediately take effect for the below. # Assume it is mock server inconsistency and check twice. - wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage)) + wait_until(lambda: assert_prefix_empty(neon_env_builder.pageserver_remote_storage)) # We deleted our only tenant, and the scrubber fails if it detects nothing neon_env_builder.disable_scrub_on_exit() @@ -753,15 +734,13 @@ def test_delete_orphaned_objects( env.pageserver.allowed_errors.append(f".*failpoint: {failpoint}") - iterations = poll_for_remote_storage_iterations(remote_storage_kind) - ps_http.timeline_delete(env.initial_tenant, timeline_id) timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id, expected_state="Broken", - iterations=iterations, + iterations=40, ) reason = timeline_info["state"]["Broken"]["reason"] @@ -827,8 +806,6 @@ def test_timeline_delete_resumed_on_attach( ) ) - iterations = poll_for_remote_storage_iterations(remote_storage_kind) - ps_http.timeline_delete(tenant_id, timeline_id) timeline_info = wait_until_timeline_state( @@ -836,7 +813,7 @@ def test_timeline_delete_resumed_on_attach( tenant_id=env.initial_tenant, timeline_id=timeline_id, expected_state="Broken", - iterations=iterations, + iterations=40, ) reason = timeline_info["state"]["Broken"]["reason"] @@ -871,7 +848,7 @@ def test_timeline_delete_resumed_on_attach( env.pageserver.tenant_attach(tenant_id=tenant_id) # delete should be resumed - wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations) + wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) tenant_path = env.pageserver.timeline_dir(tenant_id, timeline_id) assert not tenant_path.exists() diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index 9c7e851ba87b..2c3ee38baef1 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -203,7 +203,7 @@ def test_ancestor_detach_branched_from( ) client.timeline_delete(env.initial_tenant, env.initial_timeline) - wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0) + wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline) # because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different # as there is always "PREV_LSN: invalid" for "before" @@ -336,10 +336,10 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder): # delete the timelines to confirm detach actually worked client.timeline_delete(env.initial_tenant, after) - wait_timeline_detail_404(client, env.initial_tenant, after, 10, 1.0) + wait_timeline_detail_404(client, env.initial_tenant, after) client.timeline_delete(env.initial_tenant, env.initial_timeline) - wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0) + wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline) def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEnvBuilder): @@ -973,17 +973,17 @@ def is_deleted(): with ThreadPoolExecutor(max_workers=2) as pool: try: fut = pool.submit(detach_ancestor) - offset = wait_until(10, 1.0, at_failpoint) + offset = wait_until(at_failpoint) delete = pool.submit(start_delete) - offset = wait_until(10, 1.0, lambda: at_waiting_on_gate_close(offset)) + offset = wait_until(lambda: at_waiting_on_gate_close(offset)) victim_http.configure_failpoints((pausepoint, "off")) delete.result() - assert wait_until(10, 1.0, is_deleted), f"unimplemented mode {mode}" + assert wait_until(is_deleted), f"unimplemented mode {mode}" # TODO: match the error with pytest.raises(PageserverApiException) as exc: @@ -1115,11 +1115,11 @@ def first_branch_gone(): with ThreadPoolExecutor(max_workers=1) as pool: try: fut = pool.submit(detach_timeline) - wait_until(10, 1.0, paused_at_failpoint) + wait_until(paused_at_failpoint) # let stuck complete stuck_http.configure_failpoints((pausepoint, "off")) - wait_until(10, 1.0, first_completed) + wait_until(first_completed) if mode == "delete_reparentable_timeline": assert first_branch is not None @@ -1127,7 +1127,7 @@ def first_branch_gone(): env.initial_tenant, first_branch ) victim_http.configure_failpoints((pausepoint, "off")) - wait_until(10, 1.0, first_branch_gone) + wait_until(first_branch_gone) elif mode == "create_reparentable_timeline": first_branch = create_reparentable_timeline() victim_http.configure_failpoints((pausepoint, "off")) @@ -1271,11 +1271,11 @@ def first_completed(): with ThreadPoolExecutor(max_workers=1) as pool: try: fut = pool.submit(detach_timeline) - wait_until(10, 1.0, paused_at_failpoint) + wait_until(paused_at_failpoint) # let stuck complete stuck_http.configure_failpoints((pausepoint, "off")) - wait_until(10, 1.0, first_completed) + wait_until(first_completed) victim_http.configure_failpoints((pausepoint, "off")) @@ -1456,7 +1456,7 @@ def try_detach(): # other tests take the "detach? reparent complete", but this only hits # "complete". http.timeline_delete(env.initial_tenant, env.initial_timeline) - wait_timeline_detail_404(http, env.initial_tenant, env.initial_timeline, 20) + wait_timeline_detail_404(http, env.initial_tenant, env.initial_timeline) http.configure_failpoints(("timeline-detach-ancestor::complete_before_uploading", "off")) @@ -1518,7 +1518,7 @@ def delete_detached(): with ThreadPoolExecutor(max_workers=1) as pool: detach = pool.submit(detach_and_get_stuck) - offset = wait_until(10, 1.0, request_processing_noted_in_log) + offset = wait_until(request_processing_noted_in_log) # make this named fn tor more clear failure test output logging def pausepoint_hit_with_gc_paused() -> LogCursor: @@ -1529,11 +1529,11 @@ def pausepoint_hit_with_gc_paused() -> LogCursor: ) return at - offset = wait_until(10, 1.0, pausepoint_hit_with_gc_paused) + offset = wait_until(pausepoint_hit_with_gc_paused) delete_detached() - wait_timeline_detail_404(http, env.initial_tenant, detached, 10, 1.0) + wait_timeline_detail_404(http, env.initial_tenant, detached) http.configure_failpoints((failpoint, "off")) diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 5a5ca3290a07..7605e1f758b9 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -61,7 +61,7 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool # deletion unblocks gc http.timeline_delete(env.initial_tenant, foo_branch) - wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0) + wait_timeline_detail_404(http, env.initial_tenant, foo_branch) wait_for_another_gc_round() pss.assert_log_contains(gc_active_line) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 4528bc618044..95bf9106cd94 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -396,11 +396,7 @@ def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder): # Wait for the tenant to be loaded client = env.pageserver.http_client() - wait_until( - number_of_iterations=5, - interval=1, - func=lambda: assert_tenant_state(client, env.initial_tenant, "Active"), - ) + wait_until(lambda: assert_tenant_state(client, env.initial_tenant, "Active")) assert_physical_size_invariants( get_physical_size_values(env, env.initial_tenant, new_timeline_id), @@ -433,7 +429,7 @@ def check(): get_physical_size_values(env, env.initial_tenant, new_timeline_id), ) - wait_until(10, 1, check) + wait_until(check) def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder): @@ -721,7 +717,7 @@ def wait_for_tenant_startup_completions(client: PageserverHttpClient, count: int def condition(): assert client.get_metric_value("pageserver_tenant_startup_complete_total") == count - wait_until(5, 1.0, condition) + wait_until(condition) def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): @@ -768,7 +764,7 @@ def at_least_one_active(): assert "Active" in set(get_tenant_states().values()) # One tenant should activate, then get stuck in their logical size calculation - wait_until(10, 1, at_least_one_active) + wait_until(at_least_one_active) # Wait some walltime to gain confidence that other tenants really are stuck and not proceeding to activate time.sleep(5) @@ -836,13 +832,13 @@ def at_least_one_active(): def all_active(): assert all(s == "Active" for s in get_tenant_states().values()) - wait_until(10, 1, all_active) + wait_until(all_active) # Final control check: restarting with no failpoints at all results in all tenants coming active # without being prompted by client I/O env.pageserver.stop() env.pageserver.start() - wait_until(10, 1, all_active) + wait_until(all_active) assert ( pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants @@ -856,7 +852,7 @@ def all_active(): extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"} ) - wait_until(10, 1, at_least_one_active) + wait_until(at_least_one_active) detach_tenant_id = list( [(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"] @@ -881,7 +877,7 @@ def all_active(): # Check that all the stuck tenants proceed to active (apart from the one that deletes, and the one # we detached) - wait_until(10, 1, all_active) + wait_until(all_active) assert len(get_tenant_states()) == n_tenants - 2 @@ -908,7 +904,7 @@ def delete_tenant(): try: # Deletion will get to the point in shutdown where it's waiting for timeline shutdown, then # hang because of our failpoint blocking activation. - wait_until(10, 1, shutting_down) + wait_until(shutting_down) finally: log.info("Clearing failpoint") pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off")) @@ -1030,13 +1026,13 @@ def one_is_active(): log.info(f"{states}") assert len(states["Active"]) == 1 - wait_until(10, 1, one_is_active) + wait_until(one_is_active) def other_is_attaching(): states = get_tenant_states() assert len(states["Attaching"]) == 1 - wait_until(10, 1, other_is_attaching) + wait_until(other_is_attaching) def eager_tenant_is_active(): resp = client.tenant_status(eager_tenant) @@ -1053,7 +1049,7 @@ def eager_tenant_is_active(): }, lazy=False, ) - wait_until(10, 1, eager_tenant_is_active) + wait_until(eager_tenant_is_active) other_is_attaching() @@ -1096,7 +1092,7 @@ def initial_tenant_is_active(): resp = client.tenant_status(env.initial_tenant) assert resp["state"]["slug"] == "Active" - wait_until(10, 1, initial_tenant_is_active) + wait_until(initial_tenant_is_active) # even though the initial tenant is now active, because it was startup time # attach, it will consume the only permit because logical size calculation @@ -1119,7 +1115,7 @@ def lazy_tenant_is_attaching(): assert resp["state"]["slug"] == "Attaching" # paused logical size calculation of env.initial_tenant is keeping it attaching - wait_until(10, 1, lazy_tenant_is_attaching) + wait_until(lazy_tenant_is_attaching) for _ in range(5): lazy_tenant_is_attaching() @@ -1132,10 +1128,10 @@ def lazy_tenant_is_active(): if activation_method == "endpoint": with env.endpoints.create_start("main", tenant_id=lazy_tenant): # starting up the endpoint should make it jump the queue - wait_until(10, 1, lazy_tenant_is_active) + wait_until(lazy_tenant_is_active) elif activation_method == "branch": env.create_timeline("second_branch", lazy_tenant) - wait_until(10, 1, lazy_tenant_is_active) + wait_until(lazy_tenant_is_active) elif activation_method == "delete": delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True) else: diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8fa33b81a9dc..23d4f23cdb84 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2136,7 +2136,7 @@ def evicted_on_source(): # Check that on source no segment files are present assert src_sk.list_segments(tenant_id, timeline_id) == [] - wait_until(60, 1, evicted_on_source) + wait_until(evicted_on_source, timeout=60) # Invoke pull_timeline: source should serve snapshot request without promoting anything to local disk, # destination should import the control file only & go into evicted mode immediately @@ -2155,7 +2155,7 @@ def evicted_on_destination(): # This should be fast, it is a wait_until because eviction state is updated # in the background wrt pull_timeline. - wait_until(10, 0.1, evicted_on_destination) + wait_until(evicted_on_destination, timeout=1.0, interval=0.1) # Delete the timeline on the source, to prove that deletion works on an # evicted timeline _and_ that the final compute test is really not using @@ -2178,7 +2178,7 @@ def unevicted_on_dest(): n_evicted = dst_sk.http_client().get_metric_value("safekeeper_evicted_timelines") assert n_evicted == 0 - wait_until(10, 1, unevicted_on_dest) + wait_until(unevicted_on_dest, interval=0.1, timeout=1.0) # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries @@ -2606,10 +2606,10 @@ def all_evicted(): assert n_evicted # make mypy happy assert int(n_evicted) == n_timelines - wait_until(60, 0.5, all_evicted) + wait_until(all_evicted, timeout=30) # restart should preserve the metric value sk.stop().start() - wait_until(60, 0.5, all_evicted) + wait_until(all_evicted) # and endpoint start should reduce is endpoints[0].start() @@ -2618,7 +2618,7 @@ def one_unevicted(): assert n_evicted # make mypy happy assert int(n_evicted) < n_timelines - wait_until(60, 0.5, one_unevicted) + wait_until(one_unevicted) # Test resetting uploaded partial segment state. @@ -2666,7 +2666,7 @@ def evicted(): if isinstance(eviction_state, str) and eviction_state == "Present": raise Exception("eviction didn't happen yet") - wait_until(30, 1, evicted) + wait_until(evicted) # it must have uploaded something uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id) log.info(f"uploaded segments before reset: {uploaded_segs}") @@ -2763,7 +2763,7 @@ def source_partial_segment_uploaded(): raise Exception("Partial segment not uploaded yet") - source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded) + source_partial_segment = wait_until(source_partial_segment_uploaded) log.info( f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" ) @@ -2787,7 +2787,7 @@ def evicted(): if evictions is None or evictions == 0: raise Exception("Eviction did not happen on source safekeeper yet") - wait_until(30, 1, evicted) + wait_until(evicted) endpoint.start(safekeepers=[2, 3]) @@ -2804,7 +2804,7 @@ def new_partial_segment_uploaded(): ) endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'") - wait_until(15, 1, new_partial_segment_uploaded) + wait_until(new_partial_segment_uploaded) log.info( f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" @@ -2833,4 +2833,4 @@ def unevicted(): if unevictions is None or unevictions == 0: raise Exception("Uneviction did not happen on source safekeeper yet") - wait_until(10, 1, unevicted) + wait_until(unevicted) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 294f86ffa720..d22a900c5923 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -97,7 +97,7 @@ def all_sks_in_wareceiver_state(): str(safekeeper.id) in exception_string ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout" - wait_until(60, 0.5, all_sks_in_wareceiver_state) + wait_until(all_sks_in_wareceiver_state, timeout=30) stopped_safekeeper = env.safekeepers[-1] stopped_safekeeper_id = stopped_safekeeper.id @@ -124,7 +124,7 @@ def all_but_stopped_sks_in_wareceiver_state(): str(safekeeper.id) in exception_string ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" - wait_until(60, 0.5, all_but_stopped_sks_in_wareceiver_state) + wait_until(all_but_stopped_sks_in_wareceiver_state, timeout=30) def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int):