diff --git a/.gitignore b/.gitignore index 6cf26001..c9c9ddd0 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ obj/ /tests/golangworker/golangworker /.vs /.vscode -/.idea \ No newline at end of file +/.idea +/.zed +Temporalio.sln.DotSettings.user diff --git a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h index bc821b99..131ed307 100644 --- a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h +++ b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h @@ -391,9 +391,12 @@ typedef struct SlotReserveCtx { bool is_sticky; } SlotReserveCtx; -typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx); +typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx, void *sender); -typedef void (*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx); +/** + * Must return pointer to a C# object inheriting from SlotPermit + */ +typedef const void *(*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx); typedef enum SlotInfo_Tag { WorkflowSlotInfo, @@ -689,6 +692,8 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker, struct ByteArrayRef workflow_id, struct ByteArrayRef history); +void complete_async_reserve(void *sender, const void *permit); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus diff --git a/src/Temporalio/Bridge/src/worker.rs b/src/Temporalio/Bridge/src/worker.rs index 8be43573..af03fb57 100644 --- a/src/Temporalio/Bridge/src/worker.rs +++ b/src/Temporalio/Bridge/src/worker.rs @@ -24,6 +24,7 @@ use temporal_sdk_core_protos::coresdk::ActivityHeartbeat; use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion; use temporal_sdk_core_protos::temporal::api::history::v1::History; use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use std::collections::HashMap; @@ -84,8 +85,20 @@ pub struct ResourceBasedSlotSupplier { tuner_options: ResourceBasedTunerOptions, } -type CustomReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx); -type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx); +#[repr(C)] +pub struct CustomSlotSupplier { + inner: CustomSlotSupplierCallbacksImpl, + _pd: std::marker::PhantomData, +} + +unsafe impl Send for CustomSlotSupplier {} +unsafe impl Sync for CustomSlotSupplier {} + +type CustomReserveSlotCallback = + unsafe extern "C" fn(ctx: SlotReserveCtx, sender: *mut libc::c_void); +/// Must return pointer to a C# object inheriting from SlotPermit +type CustomTryReserveSlotCallback = + unsafe extern "C" fn(ctx: SlotReserveCtx) -> *const libc::c_void; type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: SlotMarkUsedCtx); type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: SlotReleaseCtx); @@ -113,15 +126,6 @@ impl CustomSlotSupplierCallbacksImpl { } } -#[repr(C)] -pub struct CustomSlotSupplier { - inner: CustomSlotSupplierCallbacksImpl, - _pd: std::marker::PhantomData, -} - -unsafe impl Send for CustomSlotSupplier {} -unsafe impl Sync for CustomSlotSupplier {} - #[repr(C)] pub enum SlotKindType { WorkflowSlotKindType, @@ -173,11 +177,13 @@ impl temporal_sdk_core_api::worker::SlotSupplier type SlotKind = SK; async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { + let (tx, rx) = oneshot::channel(); let ctx = Self::convert_reserve_ctx(ctx); + let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void; unsafe { - ((*self.inner.0).reserve)(ctx); + ((*self.inner.0).reserve)(ctx, tx); } - unimplemented!() + rx.await.expect("reserve channel is not closed") } fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option { @@ -727,6 +733,20 @@ pub extern "C" fn worker_replay_push( } } +#[no_mangle] +pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit: *const libc::c_void) { + if !sender.is_null() && !permit.is_null() { + unsafe { + let sender = Box::from_raw(sender as *mut Sender); + let permit = + SlotSupplierPermit::with_user_data(UserDataHandle(permit as *mut libc::c_void)); + let _ = sender.send(permit); + } + } else { + panic!("ReserveSlot sender & permit must not be null!"); + } +} + impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig { type Error = anyhow::Error;