Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_runner: improve wait_until #9936

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ def wait_until_ready(self):
def storage_controller_ready():
assert self.ready() is True

wait_until(30, 1, storage_controller_ready)
wait_until(storage_controller_ready)
return time.time() - t1

def attach_hook_issue(
Expand Down Expand Up @@ -2574,7 +2574,7 @@ def complete():
log.info(f"any_unstable={any_unstable}")
assert not any_unstable

wait_until(20, 0.5, complete)
wait_until(complete)

def __enter__(self) -> Self:
return self
Expand Down Expand Up @@ -3972,7 +3972,7 @@ def check_migrations_done():
migration_id: int = cur.fetchall()[0][0]
assert migration_id >= num_migrations

wait_until(20, 0.5, check_migrations_done)
wait_until(check_migrations_done)

# Mock the extension part of spec passed from control plane for local testing
# endpooint.rs adds content of this file as a part of the spec.json
Expand Down Expand Up @@ -4484,12 +4484,10 @@ def are_lsns_advanced():
)
assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn()

# xxx: max wait is long because we might be waiting for reconnection from
# pageserver to this safekeeper
wait_until(30, 1, are_lsns_advanced)
wait_until(are_lsns_advanced)
client.checkpoint(tenant_id, timeline_id)
if wait_wal_removal:
wait_until(30, 1, are_segments_removed)
wait_until(are_segments_removed)

def wait_until_paused(self, failpoint: str):
msg = f"at failpoint {failpoint}"
Expand All @@ -4498,7 +4496,7 @@ def paused():
log.info(f"waiting for hitting failpoint {failpoint}")
self.assert_log_contains(msg)

wait_until(20, 0.5, paused)
wait_until(paused)


class NeonBroker(LogUtils):
Expand Down
17 changes: 3 additions & 14 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
from fixtures.remote_storage import RemoteStorage, S3Storage
from fixtures.utils import wait_until

if TYPE_CHECKING:
Expand Down Expand Up @@ -269,12 +269,7 @@ def wait_timeline_detail_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
iterations: int,
interval: float | None = None,
):
if interval is None:
interval = 0.25

def timeline_is_missing():
data = {}
try:
Expand All @@ -287,19 +282,17 @@ def timeline_is_missing():

raise RuntimeError(f"Timeline exists state {data.get('state')}")

wait_until(iterations, interval, func=timeline_is_missing)
wait_until(timeline_is_missing)


def timeline_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
iterations: int = 20,
interval: float | None = None,
**delete_args,
) -> None:
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)


# remote_storage must not be None, but that's easier for callers to make mypy happy
Expand Down Expand Up @@ -453,7 +446,3 @@ def many_small_layers_tenant_config() -> dict[str, Any]:
"checkpoint_distance": 1024**2,
"image_creation_threshold": 100,
}


def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 15
2 changes: 1 addition & 1 deletion test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def timeline_start_lsn_non_zero() -> Lsn:
assert s > Lsn(0)
return s

return wait_until(30, 1, timeline_start_lsn_non_zero)
return wait_until(timeline_start_lsn_non_zero)

def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
Expand Down
2 changes: 1 addition & 1 deletion test_runner/fixtures/safekeeper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def walreceivers_absent():
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
assert len(status.walreceivers) == 0

wait_until(30, 0.5, walreceivers_absent)
wait_until(walreceivers_absent)
34 changes: 17 additions & 17 deletions test_runner/fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import threading
import time
from collections.abc import Callable, Iterable
from datetime import datetime, timedelta
from hashlib import sha256
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeVar
Expand Down Expand Up @@ -380,15 +381,10 @@ def start_in_background(
if return_code is not None:
error = f"expected subprocess to run but it exited with code {return_code}"
else:
attempts = 10
try:
wait_until(
number_of_iterations=attempts,
interval=1,
func=is_started,
)
wait_until(is_started, timeout=10)
except Exception:
error = f"Failed to get correct status from subprocess in {attempts} attempts"
error = "Failed to get correct status from subprocess"
except Exception as e:
error = f"expected subprocess to start but it failed with exception: {e}"

Expand All @@ -402,28 +398,32 @@ def start_in_background(


def wait_until(
number_of_iterations: int,
interval: float,
func: Callable[[], WaitUntilRet],
show_intermediate_error: bool = False,
name: str | None = None,
timeout: float = 20.0, # seconds
interval: float = 0.5, # seconds
status_interval: float = 1.0, # seconds
) -> WaitUntilRet:
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
if name is None:
name = getattr(func, "__name__", repr(func))
deadline = datetime.now() + timedelta(seconds=timeout)
next_status = datetime.now()
last_exception = None
for i in range(number_of_iterations):
while datetime.now() <= deadline:
try:
res = func()
return func()
except Exception as e:
log.info("waiting for %s iteration %s failed: %s", func, i + 1, e)
if datetime.now() >= next_status:
log.info("waiting for %s: %s", name, e)
next_status = datetime.now() + timedelta(seconds=status_interval)
last_exception = e
if show_intermediate_error:
log.info(e)
time.sleep(interval)
continue
return res
raise Exception(f"timed out while waiting for {func}") from last_exception
raise Exception(f"timed out while waiting for {name}") from last_exception


def assert_eq(a, b) -> None:
Expand Down
6 changes: 2 additions & 4 deletions test_runner/logical_repl/test_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,22 @@ def test_clickhouse(remote_pg: RemotePostgres):
"SETTINGS materialized_postgresql_tables_list = 'table1';"
)
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
),
timeout=60,
)
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
conn.commit()
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
),
timeout=60,
)
log.debug("Sleeping before final checking if Neon is still alive")
time.sleep(3)
Expand Down
12 changes: 3 additions & 9 deletions test_runner/logical_repl/test_debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ def test_debezium(debezium):
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"},
),
show_intermediate_error=True,
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Insert 2 ts_ms: %s", ts_ms)
Expand All @@ -165,28 +163,24 @@ def test_debezium(debezium):
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"},
),
show_intermediate_error=True,
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Update ts_ms: %s", ts_ms)
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alexander"},
),
show_intermediate_error=True,
timeout=60,
)
time.sleep(3)
cur.execute("select 1")
8 changes: 3 additions & 5 deletions test_runner/performance/test_branch_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape:
startup_line = "INFO version: git(-env)?:"

# find the first line of the log file so we can find the next start later
_, first_start = wait_until(5, 1, lambda: env.pageserver.assert_log_contains(startup_line))
_, first_start = wait_until(lambda: env.pageserver.assert_log_contains(startup_line))

# start without gc so we can time compaction with less noise; use shorter
# period for compaction so it starts earlier
Expand All @@ -156,16 +156,14 @@ def patch_default_tenant_config(config):
)

_, second_start = wait_until(
5, 1, lambda: env.pageserver.assert_log_contains(startup_line, first_start)
lambda: env.pageserver.assert_log_contains(startup_line, first_start),
)
env.pageserver.quiesce_tenants()

wait_and_record_startup_metrics(env.pageserver, neon_compare.zenbenchmark, "restart_after")

# wait for compaction to complete, which most likely has already done so multiple times
msg, _ = wait_until(
30,
1,
lambda: env.pageserver.assert_log_contains(
f".*tenant_id={env.initial_tenant}.*: compaction iteration complete.*", second_start
),
Expand Down Expand Up @@ -205,7 +203,7 @@ def metrics_are_filled() -> list[Sample]:
assert len(matching) == len(expected_labels)
return matching

samples = wait_until(10, 1, metrics_are_filled)
samples = wait_until(metrics_are_filled)

for sample in samples:
phase = sample.labels["phase"]
Expand Down
2 changes: 0 additions & 2 deletions test_runner/regress/test_attach_tenant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N
)

wait_until(
50,
0.1,
lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"),
)

Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def assert_broken():

# Wait for enough failures to break the circuit breaker
# This wait is fairly long because we back off on compaction failures, so 5 retries takes ~30s
wait_until(60, 1, assert_broken)
wait_until(assert_broken, timeout=60)

# Sleep for a while, during which time we expect that compaction will _not_ be retried
time.sleep(10)
Expand Down
10 changes: 5 additions & 5 deletions test_runner/regress/test_disk_usage_eviction.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def statvfs_called():
pageserver.assert_log_contains(".*running mocked statvfs.*")

# we most likely have already completed multiple runs
wait_until(10, 1, statvfs_called)
wait_until(statvfs_called)


def count_layers_per_tenant(
Expand Down Expand Up @@ -772,14 +772,14 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
)

wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
)

def less_than_max_usage_pct():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert post_eviction_total_size < 0.33 * total_size, "we requested max 33% usage"

wait_until(2, 2, less_than_max_usage_pct)
wait_until(less_than_max_usage_pct, timeout=5)

# Disk usage candidate collection only takes into account active tenants.
# However, the statvfs call takes into account the entire tenants directory,
Expand Down Expand Up @@ -825,7 +825,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
)

wait_until(
10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved")
lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved"),
)

def more_than_min_avail_bytes_freed():
Expand All @@ -834,7 +834,7 @@ def more_than_min_avail_bytes_freed():
total_size - post_eviction_total_size >= min_avail_bytes
), f"we requested at least {min_avail_bytes} worth of free space"

wait_until(2, 2, more_than_min_avail_bytes_freed)
wait_until(more_than_min_avail_bytes_freed, timeout=5)


def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
Expand Down
6 changes: 3 additions & 3 deletions test_runner/regress/test_hot_standby.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Wait until we see that the pgbench_accounts is created + filled on replica *and*
# index is created. Otherwise index creation would conflict with
# read queries and hs feedback won't save us.
wait_until(60, 1.0, partial(pgbench_accounts_initialized, secondary))
wait_until(partial(pgbench_accounts_initialized, secondary), timeout=60)

# Test should fail if hs feedback is disabled anyway, but cross
# check that walproposer sets some xmin.
Expand All @@ -269,7 +269,7 @@ def xmin_is_not_null():
log.info(f"xmin is {slot_xmin}")
assert int(slot_xmin) > 0

wait_until(10, 1.0, xmin_is_not_null)
wait_until(xmin_is_not_null)
for _ in range(1, 5):
# in debug mode takes about 5-7s
balance = secondary.safe_psql_scalar("select sum(abalance) from pgbench_accounts")
Expand All @@ -286,7 +286,7 @@ def xmin_is_null():
log.info(f"xmin is {slot_xmin}")
assert slot_xmin is None

wait_until(10, 1.0, xmin_is_null)
wait_until(xmin_is_null)


# Test race condition between WAL replay and backends performing queries
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_layers_from_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def future_layer_is_gone_from_index_part():
future_layers = set(get_future_layers())
assert future_layer not in future_layers

wait_until(10, 0.5, future_layer_is_gone_from_index_part)
wait_until(future_layer_is_gone_from_index_part)

# We already make deletion stuck here, but we don't necessarily hit the failpoint
# because deletions are batched.
Expand Down
Loading
Loading