Skip to content

Commit

Permalink
fix: make the callback timeout configurable with a 5 sec default
Browse files Browse the repository at this point in the history
  • Loading branch information
Ronald Holshausen committed Feb 6, 2021
1 parent b7cff8b commit a0f0876
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions native/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,20 @@ fn get_string_array(cx: &mut FunctionContext, obj: &JsObject, name: &str) -> Res
}
}


fn get_integer_value(cx: &mut FunctionContext, obj: &JsObject, name: &str) -> Option<u64> {
match obj.get(cx, name) {
Ok(val) => match val.downcast::<JsNumber>() {
Ok(val) => Some(val.value() as u64),
Err(_) => None
},
_ => None
}
}

#[derive(Clone)]
struct RequestFilterCallback {
callback_handler: EventHandler
callback_handler: EventHandler,
timeout: u64
}

impl RequestFilterExecutor for RequestFilterCallback {
Expand Down Expand Up @@ -177,13 +186,14 @@ impl RequestFilterExecutor for RequestFilterCallback {
}
});

receiver.recv_timeout(Duration::from_millis(1000)).unwrap_or(request.clone())
receiver.recv_timeout(Duration::from_millis(self.timeout)).unwrap_or(request.clone())
}
}

#[derive(Clone)]
struct ProviderStateCallback<'a> {
callback_handlers: &'a HashMap<String, EventHandler>
callback_handlers: &'a HashMap<String, EventHandler>,
timeout: u64
}

#[async_trait]
Expand Down Expand Up @@ -228,12 +238,12 @@ impl ProviderStateExecutor for ProviderStateCallback<'_> {
}
};
});
match receiver.recv_timeout(Duration::from_millis(1000)) {
match receiver.recv_timeout(Duration::from_millis(self.timeout)) {
Ok(result) => {
debug!("Received {:?} from callback", result);
result
},
Err(_) => Err(ProviderStateError { description: format!("Provider state callback for '{}' timed out after 1000 ms", provider_state.name), interaction_id })
Err(_) => Err(ProviderStateError { description: format!("Provider state callback for '{}' timed out after {} ms", provider_state.name, self.timeout), interaction_id })
}
},
None => {
Expand Down Expand Up @@ -263,7 +273,10 @@ impl Task for BackgroundTask {
panic::catch_unwind(|| {
match tokio::runtime::Builder::new_current_thread().enable_all().build() {
Ok(runtime) => runtime.block_on(async {
let provider_state_executor = ProviderStateCallback { callback_handlers: &self.state_handlers };
let provider_state_executor = ProviderStateCallback {
callback_handlers: &self.state_handlers,
timeout: self.options.callback_timeout
};
pact_verifier::verify_provider_async(self.provider_info.clone(), self.pacts.clone(), self.filter_info.clone(), self.consumers_filter.clone(), self.options.clone(), &provider_state_executor).await
}),
Err(err) => {
Expand Down Expand Up @@ -336,7 +349,6 @@ pub fn verify_provider(mut cx: FunctionContext) -> JsResult<JsUndefined> {
Err(e) => return cx.throw_error(e)
};


match config.get(&mut cx, "pactBrokerUrl") {
Ok(url) => match url.downcast::<JsString>() {
Ok(url) => {
Expand Down Expand Up @@ -396,11 +408,16 @@ pub fn verify_provider(mut cx: FunctionContext) -> JsResult<JsUndefined> {

debug!("provider_info = {:?}", provider_info);

let callback_timeout = get_integer_value(&mut cx, &config, "callbackTimeoutMs").unwrap_or(5000);

let request_filter = match config.get(&mut cx, "requestFilter") {
Ok(request_filter) => match request_filter.downcast::<JsFunction>() {
Ok(val) => {
let this = cx.this();
Some(Box::new(RequestFilterCallback { callback_handler: EventHandler::new(&cx, this, val) }))
Some(Box::new(RequestFilterCallback {
callback_handler: EventHandler::new(&cx, this, val),
timeout: callback_timeout
}))
},
Err(_) => None
},
Expand Down Expand Up @@ -478,7 +495,8 @@ pub fn verify_provider(mut cx: FunctionContext) -> JsResult<JsUndefined> {
build_url: None,
request_filter,
provider_tags,
disable_ssl_verification
disable_ssl_verification,
callback_timeout
};

debug!("Starting background task");
Expand Down

0 comments on commit a0f0876

Please sign in to comment.