Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] threading for early return on long entity detect #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions ffi/ffi-macros/src/builtin_entity_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use snips_nlu_ontology_ffi_macros::{CBuiltinEntity, CBuiltinEntityArray};
use snips_nlu_parsers::{BuiltinEntityParser, BuiltinEntityParserLoader, EntityValue};
use std::ffi::CStr;
use std::slice;
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

#[repr(C)]
pub struct CBuiltinEntityParser(*const libc::c_void);
Expand Down Expand Up @@ -157,13 +160,35 @@ pub fn extract_builtin_entity(
} else {
None
};
let opt_filters = opt_filters.as_ref().map(|vec| vec.as_slice());
let (sender, receiver) = mpsc::channel();

let thread_join_handle = thread::spawn(move || {
let opt_filters = opt_filters.as_ref().map(|vec| vec.as_slice());
match sender.send(parser.extract_entities(
sentence,
opt_filters,
max_alternative_resolved_values as usize,
)){
Ok(()) => {}, // everything good
Err(_) => {}, // we have been released, don't panic
}
});

// return receiver.recv_timeout(Duration::from_millis(1000)).unwrap();

let res = receiver.recv_timeout(Duration::from_millis(1000));
// thread_join_handle.join().unwrap()
// res.unwrap()
if res.is_ok() {
return res.unwrap()
}
else {

// Err(*Box::from(res.err().unwrap()))
Err(res.err().unwrap().into())

}

parser.extract_entities(
sentence,
opt_filters,
max_alternative_resolved_values as usize,
)
}

pub fn destroy_builtin_entity_parser(ptr: *mut CBuiltinEntityParser) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion python/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ crate-type = ["cdylib"]
failure = "0.1"
libc = "0.2"
ffi-utils = { git = "https://github.com/snipsco/snips-utils-rs", rev = "291ce1d" }
snips-nlu-parsers-ffi-macros = { git = "https://github.com/purecloudlabs/snips-nlu-parsers-genesys.git", tag = "genesys-0.1.8" }
snips-nlu-parsers-ffi-macros = { git = "https://github.com/purecloudlabs/snips-nlu-parsers-genesys.git", tag = "genesys-0.1.9a2" }
snips-nlu-ontology = { git = "https://github.com/snipsco/snips-nlu-ontology", tag = "0.67.1" }
snips-nlu-ontology-ffi-macros = { git = "https://github.com/snipsco/snips-nlu-ontology", tag = "0.67.1" }
2 changes: 1 addition & 1 deletion python/snips_nlu_parsers/__version__
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.8
0.1.9a2
89 changes: 80 additions & 9 deletions python/snips_nlu_parsers/builtin_entity_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,64 @@
from builtins import bytes, str
from ctypes import c_char_p, c_int, c_void_p, string_at
from pathlib import Path
from future.utils import text_type

from snips_nlu_parsers.utils import (
CStringArray, check_ffi_error, lib, string_pointer)
CStringArray, check_ffi_error, lib, string_pointer, temp_dir)

import multiprocessing as mp


def unicode_string(string):
if isinstance(string, text_type):
return string
if isinstance(string, bytes):
return string.decode("utf8")
raise TypeError("Cannot convert %s into unicode string" % type(string))


def json_string(json_object, indent=2, sort_keys=True):
json_dump = json.dumps(json_object, indent=indent, sort_keys=sort_keys,
separators=(',', ': '))
return unicode_string(json_dump)


from pathlib import Path
from tempfile import mkdtemp

tmp_dir = mkdtemp()
serialization_dir = Path(tmp_dir)

gazetteer_entity_parser = None
metadata = {
"language": "EN",
"gazetteer_parser": gazetteer_entity_parser
}
metadata_path = serialization_dir / "metadata.json"
with metadata_path.open("w", encoding="utf-8") as f:
f.write(json_string(metadata))

parser = c_void_p()
# raise Exception(type(serialization_dir))
parser_path = bytes(str(serialization_dir), encoding="utf8")
exit_code = lib.snips_nlu_parsers_load_builtin_entity_parser(
byref(parser), parser_path)
check_ffi_error(exit_code, "Something went wrong when loading the "
"builtin entity parser")


def mp_builtin(q, text):
with string_pointer(c_char_p()) as ptr:
exit_code = lib.snips_nlu_parsers_extract_builtin_entities_json(
parser, text.encode("utf8"), None, 5, byref(ptr))
try:
check_ffi_error(exit_code, "Something went wrong when loading the "
"builtin entity parser")

except Exception as ex:
q.put(str(ex))
else:
q.put(string_at(ptr))


class BuiltinEntityParser(object):
Expand Down Expand Up @@ -67,14 +122,30 @@ def parse(self, text, scope=None, max_alternative_resolved_values=5):
arr.data = (c_char_p * len(scope))(*scope)
scope = byref(arr)

with string_pointer(c_char_p()) as ptr:
exit_code = lib.snips_nlu_parsers_extract_builtin_entities_json(
self._parser, text.encode("utf8"), scope,
max_alternative_resolved_values, byref(ptr))
check_ffi_error(exit_code, "Something went wrong when extracting "
"builtin entities")
result = string_at(ptr)
return json.loads(result.decode("utf8"))
ctx = mp.get_context('fork')
q = ctx.Queue()
p = ctx.Process(target=mp_builtin, args=(q, text))
p.start()
result = q.get()
if type(result) == str:
p.kill()
return json.loads("[]")
p.join()
return json.loads(result.decode("utf8"))
# with string_pointer(c_char_p()) as ptr:
# exit_code = lib.snips_nlu_parsers_extract_builtin_entities_json(
# self._parser, text.encode("utf8"), scope,
# max_alternative_resolved_values, byref(ptr))
# try:
# check_ffi_error(exit_code, "Something went wrong when extracting "
# "builtin entities")
# except ValueError as ex:
# if "timed out waiting on channel" in str(ex):
# return []
# else:
# raise ex
# result = string_at(ptr)
# return json.loads(result.decode("utf8"))

def extend_gazetteer_entity(self, entity_name, entity_values):
"""Extends a builtin gazetteer entity with custom values
Expand Down