Skip to content

Commit

Permalink
Updated Activity/Workflow Function bounds to allow a more generic use…
Browse files Browse the repository at this point in the history
…r function
  • Loading branch information
h7kanna committed May 11, 2023
1 parent c398004 commit 3a395fa
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 174 deletions.
82 changes: 38 additions & 44 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::{
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction,
WorkflowResult,
ActContext, ActExitValue, ActivityCancelledError, ActivityFunction, LocalActivityOptions,
WfContext, WorkflowFunction, WorkflowResult,
};
use temporal_sdk_core_api::{
errors::{PollActivityError, PollWfError},
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
Ok(().into())
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, echo);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, ActivityFunction::from(echo));
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn local_act_many_concurrent() {
let mut worker = mock_sdk(mh);

worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
worker.register_activity("echo", echo);
worker.register_activity("echo", ActivityFunction::from(echo));
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -207,14 +207,17 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
Ok(().into())
},
);
worker.register_activity("echo", move |_ctx: ActContext, str: String| async move {
if shutdown_middle {
shutdown_barr.wait().await;
}
// Take slightly more than two workflow tasks
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
Ok(str)
});
worker.register_activity(
"echo",
ActivityFunction::from(move |_ctx: ActContext, str: String| async move {
if shutdown_middle {
shutdown_barr.wait().await;
}
// Take slightly more than two workflow tasks
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
Ok(str)
}),
);
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -276,10 +279,10 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
},
);
let attempts: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
worker.register_activity("echo", move |_ctx: ActContext| async move {
// Succeed on 3rd attempt (which is ==2 since fetch_add returns prev val)
if 2 == attempts.fetch_add(1, Ordering::Relaxed) && eventually_pass {
Ok(())
Ok(().into())
} else {
Err(anyhow!("Oh no I failed!"))
}
Expand Down Expand Up @@ -355,12 +358,9 @@ async fn local_act_retry_long_backoff_uses_timer() {
Ok(().into())
},
);
worker.register_activity(
DEFAULT_ACTIVITY_TYPE,
move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("Oh no I failed!"))
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |_ctx: ActContext| async move {
Result::<ActExitValue<()>, _>::Err(anyhow!("Oh no I failed!"))
});
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn local_act_null_result() {
Ok(().into())
},
);
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -442,7 +442,7 @@ async fn local_act_command_immediately_follows_la_marker() {
Ok(().into())
},
);
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -778,10 +778,7 @@ async fn test_schedule_to_start_timeout() {
Ok(().into())
},
);
worker.register_activity(
"echo",
move |_ctx: ActContext, _: String| async move { Ok(()) },
);
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -868,10 +865,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
Ok(().into())
},
);
worker.register_activity(
"echo",
move |_ctx: ActContext, _: String| async move { Ok(()) },
);
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -917,16 +911,13 @@ async fn wft_failure_cancels_running_las() {
Ok(().into())
},
);
worker.register_activity(
DEFAULT_ACTIVITY_TYPE,
move |ctx: ActContext, _: String| async move {
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
if res.is_err() {
panic!("Activity must be cancelled!!!!");
}
Result::<(), _>::Err(ActivityCancelledError::default().into())
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |ctx: ActContext| async move {
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
if res.is_err() {
panic!("Activity must be cancelled!!!!");
}
Result::<ActExitValue<()>, _>::Err(ActivityCancelledError::default().into())
});
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -980,7 +971,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
);
worker.register_activity(
"echo",
move |_: ActContext, _: String| async move { Ok(()) },
ActivityFunction::from(move |_: ActContext, _: String| async move { Ok(()) }),
);
worker
.submit_wf(
Expand Down Expand Up @@ -1039,9 +1030,12 @@ async fn local_act_records_nonfirst_attempts_ok() {
Ok(().into())
},
);
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("I fail"))
});
worker.register_activity(
"echo",
ActivityFunction::from(move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("I fail"))
}),
);
worker
.submit_wf(
wf_id.to_owned(),
Expand Down
61 changes: 28 additions & 33 deletions sdk/src/activity_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ impl ActContext {
task_queue: String,
task_token: Vec<u8>,
task: activity_task::Start,
) -> (Self, Payload) {
) -> Self {
let activity_task::Start {
workflow_namespace,
workflow_type,
workflow_execution,
activity_id,
activity_type,
header_fields,
mut input,
input,
heartbeat_details,
scheduled_time,
current_attempt_scheduled_time,
Expand All @@ -86,37 +86,32 @@ impl ActContext {
start_to_close_timeout.as_ref(),
schedule_to_close_timeout.as_ref(),
);
let first_arg = input.pop().unwrap_or_default();

(
ActContext {
worker,
app_data,
cancellation_token,
input,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time
.try_into_or_none(),
retry_policy,
is_local,
},
ActContext {
worker,
app_data,
cancellation_token,
input,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time.try_into_or_none(),
retry_policy,
is_local,
},
first_arg,
)
}
}

/// Returns a future the completes if and when the activity this was called inside has been
Expand All @@ -133,8 +128,8 @@ impl ActContext {
/// Retrieve extra parameters to the Activity. The first input is always popped and passed to
/// the Activity function for the currently executing activity. However, if more parameters are
/// passed, perhaps from another language's SDK, explicit access is available from extra_inputs
pub fn extra_inputs(&mut self) -> &mut [Payload] {
&mut self.input
pub fn get_args(&self) -> &[Payload] {
&self.input
}

/// Extract heartbeat details from last failed attempt. This is used in combination with retry policy.
Expand Down
Loading

0 comments on commit 3a395fa

Please sign in to comment.