Skip to content

Commit

Permalink
Make return values work
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 28, 2024
1 parent e1d94b8 commit 8f822c1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ obj/
/tests/golangworker/golangworker
/.vs
/.vscode
/.idea
/.idea
/.zed
Temporalio.sln.DotSettings.user
9 changes: 7 additions & 2 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,12 @@ typedef struct SlotReserveCtx {
bool is_sticky;
} SlotReserveCtx;

typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx);
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx, void *sender);

typedef void (*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
/**
* Must return pointer to a C# object inheriting from SlotPermit
*/
typedef const void *(*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);

typedef enum SlotInfo_Tag {
WorkflowSlotInfo,
Expand Down Expand Up @@ -689,6 +692,8 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker,
struct ByteArrayRef workflow_id,
struct ByteArrayRef history);

void complete_async_reserve(void *sender, const void *permit);

#ifdef __cplusplus
} // extern "C"
#endif // __cplusplus
46 changes: 33 additions & 13 deletions src/Temporalio/Bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion;
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;

use std::collections::HashMap;
Expand Down Expand Up @@ -84,8 +85,20 @@ pub struct ResourceBasedSlotSupplier {
tuner_options: ResourceBasedTunerOptions,
}

type CustomReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
#[repr(C)]
pub struct CustomSlotSupplier<SK> {
inner: CustomSlotSupplierCallbacksImpl,
_pd: std::marker::PhantomData<SK>,
}

unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}

type CustomReserveSlotCallback =
unsafe extern "C" fn(ctx: SlotReserveCtx, sender: *mut libc::c_void);
/// Must return pointer to a C# object inheriting from SlotPermit
type CustomTryReserveSlotCallback =
unsafe extern "C" fn(ctx: SlotReserveCtx) -> *const libc::c_void;
type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: SlotMarkUsedCtx);
type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: SlotReleaseCtx);

Expand Down Expand Up @@ -113,15 +126,6 @@ impl CustomSlotSupplierCallbacksImpl {
}
}

#[repr(C)]
pub struct CustomSlotSupplier<SK> {
inner: CustomSlotSupplierCallbacksImpl,
_pd: std::marker::PhantomData<SK>,
}

unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}

#[repr(C)]
pub enum SlotKindType {
WorkflowSlotKindType,
Expand Down Expand Up @@ -173,11 +177,13 @@ impl<SK: SlotKind + Send + Sync> temporal_sdk_core_api::worker::SlotSupplier
type SlotKind = SK;

async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
let (tx, rx) = oneshot::channel();
let ctx = Self::convert_reserve_ctx(ctx);
let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
unsafe {
((*self.inner.0).reserve)(ctx);
((*self.inner.0).reserve)(ctx, tx);
}
unimplemented!()
rx.await.expect("reserve channel is not closed")
}

fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
Expand Down Expand Up @@ -727,6 +733,20 @@ pub extern "C" fn worker_replay_push(
}
}

#[no_mangle]
pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit: *const libc::c_void) {
if !sender.is_null() && !permit.is_null() {
unsafe {
let sender = Box::from_raw(sender as *mut Sender<SlotSupplierPermit>);
let permit =
SlotSupplierPermit::with_user_data(UserDataHandle(permit as *mut libc::c_void));
let _ = sender.send(permit);
}
} else {
panic!("ReserveSlot sender & permit must not be null!");
}
}

impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
type Error = anyhow::Error;

Expand Down

0 comments on commit 8f822c1

Please sign in to comment.