Skip to content

Commit

Permalink
race test cache of unauthed query & refactor Pause library (#33289)
Browse files Browse the repository at this point in the history
Test a race condition that's happening with the query cache, where unauthed queries don't wait for each other.

To test this race condition, I needed a more powerful PauseController, so that's the majority of this PR.

Motivating example: I want to run two queries in parallel, pause them both right before they start executing the function (after they have checked the cache and determined it's a miss), then run them both to completion. In order to do that, I need two PauseGuards at the same time for the same label.

Issues with existing PauseController impl:
1. when PauseController is created, all labels become breakpoints that the test code will pause on. You don't have the control to only enable breakpoints after database setup, for example. if there's a `.wait` call you don't care about, you must wait for it in parallel or you'll get a deadlock.
2. while you're paused on one breakpoint, you can't set any other breakpoints or pause on the same breakpoint again, because the PauseGuard holds a mut ref to PauseController.
3. there's a deadlock if you try to do this pattern, which looks reasonable:

```rust
let mut fut = Box::pin(do_something_that_waits(pause_client));
let mut pause_guard = select! {
  pause_guard = pause_controller.wait_for_blocked("label") => pause_guard,
  _ = &mut fut => panic!("fut should wait"),
};
pause_guard.unpause();
fut.await;
```

(if you're interested, you can try to find the deadlock. hint: in logs you can see [this log line](https://github.com/get-convex/convex/blob/9dabf6eb63a57ab2fbd9eaa1787aa57931a88902/crates/common/src/pause.rs#L98).

My new version fixes these issues, and incidentally it's closer to Dropbox's library with the same purpose.

1. by default all breakpoints are inactive. activate a breakpoint with `pause_controller.hold(label)`. Then it's active until `.wait(label)` has been called, after which it becomes inactive unless the controller calls `.hold` again. This gives the test framework fine-grained control over which breakpoints are active.
2. while you're paused on one breakpoint, you can set other breakpoints or re-activate the same breakpoint with `.hold`. This was useful for my auth-caching test because I want to pause two parallel-running queries after they have checked the cache, and only then unpause them to execute the functions.
3. fixed the deadlock in the pattern above by only using the `rendezvous` for the initial pause. for the unpause we can use a `oneshot` so it doesn't block.

GitOrigin-RevId: 57c15b2648570929e16e6f82e6644316d2316f42
  • Loading branch information
ldanilek authored and Convex, Inc. committed Jan 17, 2025
1 parent ca32673 commit 9ddd4d7
Show file tree
Hide file tree
Showing 19 changed files with 334 additions and 235 deletions.
1 change: 0 additions & 1 deletion crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
context.clone(),
);
log_occ_retries(backoff.failures() as usize);
pause_client.close("retry_mutation_loop_start");
return Ok(result);
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/application/src/snapshot_import/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ Interrupting `npx convex import` will not cancel it."#
// Hard to control timing in race test with background job moving state forward.
#[convex_macro::test_runtime]
async fn import_races_with_schema_update(rt: TestRuntime) -> anyhow::Result<()> {
let (mut pause_controller, pause_client) = PauseController::new(vec!["before_finalize_import"]);
let (pause_controller, pause_client) = PauseController::new();
let app = Application::new_for_tests_with_args(
&rt,
ApplicationFixtureArgs {
Expand All @@ -497,14 +497,17 @@ a
);

activate_schema(&app, initial_schema).await?;

let hold_guard = pause_controller.hold("before_finalize_import");

let mut import_fut = run_csv_import(&app, table_name, test_csv).boxed();

select! {
r = import_fut.as_mut().fuse() => {
anyhow::bail!("import finished before pausing: {r:?}");
},
pause_guard = pause_controller.wait_for_blocked("before_finalize_import").fuse() => {
let mut pause_guard = pause_guard.unwrap();
pause_guard = hold_guard.wait_for_blocked().fuse() => {
let pause_guard = pause_guard.unwrap();
let mismatch_schema = db_schema!(
table_name => DocumentSchema::Union(
vec![
Expand Down
20 changes: 4 additions & 16 deletions crates/application/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ use crate::{
StartPushRequest,
},
log_visibility::AllowLogging,
scheduled_jobs::{
ScheduledJobExecutor,
SCHEDULED_JOB_COMMITTING,
SCHEDULED_JOB_EXECUTED,
},
scheduled_jobs::ScheduledJobExecutor,
Application,
};

Expand All @@ -129,22 +125,13 @@ pub struct ApplicationFixtureArgs {
pub tp: Option<TestPersistence>,
pub snapshot_import_pause_client: Option<PauseClient>,
pub scheduled_jobs_pause_client: PauseClient,
pub function_runner_pause_client: PauseClient,
pub event_logger: Option<Arc<dyn UsageEventLogger>>,
}

impl ApplicationFixtureArgs {
pub fn with_scheduled_jobs_pause_client() -> (Self, PauseController) {
let (pause_controller, pause_client) = PauseController::new(vec![SCHEDULED_JOB_EXECUTED]);
let args = ApplicationFixtureArgs {
scheduled_jobs_pause_client: pause_client,
..Default::default()
};
(args, pause_controller)
}

pub fn with_scheduled_jobs_fault_client() -> (Self, PauseController) {
let (pause_controller, pause_client) =
PauseController::new(vec![SCHEDULED_JOB_COMMITTING, SCHEDULED_JOB_EXECUTED]);
let (pause_controller, pause_client) = PauseController::new();
let args = ApplicationFixtureArgs {
scheduled_jobs_pause_client: pause_client,
..Default::default()
Expand Down Expand Up @@ -263,6 +250,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
},
database.clone(),
fetch_client,
args.function_runner_pause_client,
)
.await?,
);
Expand Down
25 changes: 16 additions & 9 deletions crates/application/src/tests/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@ async fn test_mutation_occ_fail(rt: TestRuntime) -> anyhow::Result<()> {
.await?;
application.load_udf_tests_modules().await?;

let (mut pause, pause_client) = PauseController::new(["retry_mutation_loop_start"]);
let (pause, pause_client) = PauseController::new();
let hold_guard = pause.hold("retry_mutation_loop_start");
let fut1 = insert_and_count(&application, pause_client);
let fut2 = async {
let mut hold_guard = hold_guard;
for i in 0..*UDF_EXECUTOR_OCC_MAX_RETRIES + 1 {
let mut guard = pause
.wait_for_blocked("retry_mutation_loop_start")
let guard = hold_guard
.wait_for_blocked()
.await
.context("Didn't hit breakpoint?")?;
hold_guard = pause.hold("retry_mutation_loop_start");

// Do an entire mutation while we're paused - to create an OCC conflict on
// the original insertion.
Expand Down Expand Up @@ -186,14 +189,17 @@ async fn test_mutation_occ_success(rt: TestRuntime) -> anyhow::Result<()> {
.await?;
application.load_udf_tests_modules().await?;

let (mut pause, pause_client) = PauseController::new(["retry_mutation_loop_start"]);
let (pause, pause_client) = PauseController::new();
let hold_guard = pause.hold("retry_mutation_loop_start");
let fut1 = insert_and_count(&application, pause_client);
let fut2 = async {
let mut hold_guard = hold_guard;
for i in 0..*UDF_EXECUTOR_OCC_MAX_RETRIES + 1 {
let mut guard = pause
.wait_for_blocked("retry_mutation_loop_start")
let guard = hold_guard
.wait_for_blocked()
.await
.context("Didn't hit breakpoint?")?;
hold_guard = pause.hold("retry_mutation_loop_start");

// N-1 retries, Nth one allow it to succeed
if i < *UDF_EXECUTOR_OCC_MAX_RETRIES {
Expand Down Expand Up @@ -268,11 +274,12 @@ async fn test_multiple_inserts_dont_occ(rt: TestRuntime) -> anyhow::Result<()> {
// Insert an object to create the table (otherwise it'll OCC on table creation).
insert_object(&application, PauseClient::new()).await?;

let (mut pause, pause_client) = PauseController::new(["retry_mutation_loop_start"]);
let (pause, pause_client) = PauseController::new();
let hold_guard = pause.hold("retry_mutation_loop_start");
let fut1 = insert_object(&application, pause_client);
let fut2 = async {
let mut guard = pause
.wait_for_blocked("retry_mutation_loop_start")
let guard = hold_guard
.wait_for_blocked()
.await
.context("Didn't hit breakpoint?")?;

Expand Down
18 changes: 12 additions & 6 deletions crates/application/src/tests/occ_retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ async fn test_occ_fails(rt: TestRuntime) -> anyhow::Result<()> {
.await?;
application.commit_test(tx).await?;

let (mut pause, pause_client) = PauseController::new(["retry_tx_loop_start"]);
let (pause, pause_client) = PauseController::new();
let hold_guard = pause.hold("retry_tx_loop_start");
let fut1 = test_replace_with_retries(&application, pause_client, id, "value".try_into()?);

let fut2 = async {
let mut hold_guard = hold_guard;
for _i in 0..MAX_OCC_FAILURES {
let mut guard = pause
.wait_for_blocked("retry_tx_loop_start")
let guard = hold_guard
.wait_for_blocked()
.await
.context("Didn't hit breakpoint?")?;
hold_guard = pause.hold("retry_tx_loop_start");
let mut tx = application.begin(identity.clone()).await?;
test_replace_tx(&mut tx, id, "value2".try_into()?).await?;
application.commit_test(tx).await?;
Expand All @@ -101,15 +104,18 @@ async fn test_occ_succeeds(rt: TestRuntime) -> anyhow::Result<()> {
.await?;
application.commit_test(tx).await?;

let (mut pause, pause_client) = PauseController::new(["retry_tx_loop_start"]);
let (pause, pause_client) = PauseController::new();
let hold_guard = pause.hold("retry_tx_loop_start");
let fut1 = test_replace_with_retries(&application, pause_client, id, "value".try_into()?);

let fut2 = async {
let mut hold_guard = hold_guard;
for i in 0..MAX_OCC_FAILURES {
let mut guard = pause
.wait_for_blocked("retry_tx_loop_start")
let guard = hold_guard
.wait_for_blocked()
.await
.context("Didn't hit breakpoint?")?;
hold_guard = pause.hold("retry_tx_loop_start");
if i < MAX_OCC_FAILURES - 1 {
let mut tx = application.begin(identity.clone()).await?;
test_replace_tx(&mut tx, id, "value2".try_into()?).await?;
Expand Down
104 changes: 102 additions & 2 deletions crates/application/src/tests/query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use common::{
ComponentPath,
PublicFunctionPath,
},
pause::PauseClient,
pause::{
PauseClient,
PauseController,
},
types::FunctionCaller,
RequestId,
};
Expand All @@ -24,7 +27,10 @@ use serde_json::{
use value::ConvexValue;

use crate::{
test_helpers::ApplicationTestExt,
test_helpers::{
ApplicationFixtureArgs,
ApplicationTestExt,
},
Application,
};

Expand Down Expand Up @@ -316,6 +322,100 @@ async fn test_query_cache_without_checking_auth(rt: TestRuntime) -> anyhow::Resu
Ok(())
}

#[convex_macro::test_runtime]
async fn test_query_cache_unauthed_race(rt: TestRuntime) -> anyhow::Result<()> {
let (pause_controller, pause_client) = PauseController::new();
let application = Application::new_for_tests_with_args(
&rt,
ApplicationFixtureArgs {
function_runner_pause_client: pause_client,
..Default::default()
},
)
.await?;
application.load_udf_tests_modules().await?;

// Run the same query as different users, in parallel.
// In this case we don't know that the query doesn't check auth, so
// neither request waits for the other.
// To make sure they run in parallel, run each query up until they try
// to run a function, which is after they have checked the cache and decided
// that it's a cache miss.
let mut first_query = Box::pin(run_query(
&application,
"basic:listAllObjects",
json!({}),
Identity::user(UserIdentity::test()),
false,
));
let first_hold_guard = pause_controller.hold("run_function");
let first_pause_guard = tokio::select! {
_ = &mut first_query => {
panic!("First query completed before pause");
}
pause_guard = first_hold_guard.wait_for_blocked() => {
pause_guard.unwrap()
}
};
let second_hold_guard = pause_controller.hold("run_function");
let mut second_query = Box::pin(run_query(
&application,
"basic:listAllObjects",
json!({}),
Identity::system(),
false,
));
let second_pause_guard = tokio::select! {
_ = &mut second_query => {
panic!("Second query completed before pause");
}
pause_guard = second_hold_guard.wait_for_blocked() => {
pause_guard.unwrap()
}
};
first_pause_guard.unpause();
first_query.await?;
second_pause_guard.unpause();
second_query.await?;

// Insert an object to invalidate the cache.
// Then run both queries again in parallel.
// In this case we can guess that the query doesn't check auth, so
// the second request should wait for the first.
// But we don't do that yet. TODO(lee): fix this.
insert_object(&application).await?;

// Rerun queries in parallel
let mut first_query = Box::pin(run_query(
&application,
"basic:listAllObjects",
json!({}),
Identity::user(UserIdentity::test()),
false,
));
let first_hold_guard = pause_controller.hold("run_function");
let first_pause_guard = tokio::select! {
_ = &mut first_query => {
panic!("First query completed before pause");
}
pause_guard = first_hold_guard.wait_for_blocked() => {
pause_guard.unwrap()
}
};
run_query(
&application,
"basic:listAllObjects",
json!({}),
Identity::system(),
false,
)
.await?;
first_pause_guard.unpause();
first_query.await?;

Ok(())
}

#[convex_macro::test_runtime]
async fn test_query_cache_with_conditional_auth_check(rt: TestRuntime) -> anyhow::Result<()> {
let application = Application::new_for_tests(&rt).await?;
Expand Down
Loading

0 comments on commit 9ddd4d7

Please sign in to comment.