Skip to content

Commit

Permalink
Merge pull request #7 from dvzubarev/multiprocessing
Browse files Browse the repository at this point in the history
Introduce multiprocessing
  • Loading branch information
IINemo authored Apr 8, 2019
2 parents 1d8b8cd + b5bbd45 commit 3fac6ee
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 109 deletions.
17 changes: 7 additions & 10 deletions docker/base/start.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from isanlp.nlp_service_server import NlpServiceServer
import argparse
import importlib

from isanlp.nlp_service_server import NlpServiceServer

parser = argparse.ArgumentParser(description='NLP service.')
parser.add_argument('-p', type = int, default = 3333, help = 'Port to listen.')
parser.add_argument('-t', type = int, default = 1, help = 'Number of workers.')
parser.add_argument('-m', type = str, default = 'isanlp.pipeline_default', help = 'Python module.')
parser.add_argument('-a', type = str, default= 'PIPELINE_DEFAULT', help = 'Python object.')
parser.add_argument('-a', type = str, default= 'create_pipeline', help = 'Python function.')
args = parser.parse_args()

module_name = args.m
object_code = args.a
creator_fn_name = args.a
port = args.p
nthreads = args.t

Expand All @@ -20,12 +21,8 @@
# ppls = eval(expr)
# print(ppls)

def expand_ppl(ppl):
res_dict = {k : v[0] for k, v in ppl.get_processors().items()}
res_dict.update({ppl._name : ppl})
return res_dict

ppls = eval("expand_ppl(__import__('importlib').import_module('{}').{})".format(module_name, object_code))
creator_fn = getattr(importlib.import_module(module_name), creator_fn_name)
ppl = creator_fn(delay_init = True)
ppls = {ppl._name : ppl}

NlpServiceServer(ppls = ppls, port = port, max_workers = nthreads).serve()

2 changes: 1 addition & 1 deletion docker/main/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ RUN cd /src/ && wget http://maltparser.org/dist/maltparser-1.9.1.tar.gz
RUN cd /src/ && tar -xf maltparser-1.9.1.tar.gz


CMD [ "python", "/start.py", "-m", "isanlp.pipeline_default", "-a", "PIPELINE_DEFAULT" ]
CMD [ "python", "/start.py", "-m", "isanlp.pipeline_default", "-a", "create_pipeline" ]
28 changes: 14 additions & 14 deletions src/isanlp/en/pipeline_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from .processor_lemmatizer_nltk_en import ProcessorLemmatizerNltkEn


PIPELINE_DEFAULT = PipelineCommon({'tokenizer' : (ProcessorTokenizerNltkEn(),
['text'],
{0 : 'tokens'}),
'sentence_splitter' : (ProcessorSentenceSplitter(),
['tokens'],
{0 : 'sentences'}),
'postagger' : (ProcessorPostaggerNltkEn(),
['tokens', 'sentences'],
{0 : 'postag'}),
'lemmatizer' : (ProcessorLemmatizerNltkEn(),
['tokens', 'sentences', 'postag'],
{0 : 'lemma'})
},
name = 'default')
def create_pipeline(delay_init = False):
return PipelineCommon({'tokenizer' : (ProcessorTokenizerNltkEn(),
['text'],
{0 : 'tokens'}),
'sentence_splitter' : (ProcessorSentenceSplitter(),
['tokens'],
{0 : 'sentences'}),
'postagger' : (ProcessorPostaggerNltkEn(),
['tokens', 'sentences'],
{0 : 'postag'}),
'lemmatizer' : (ProcessorLemmatizerNltkEn(delay_init),
['tokens', 'sentences', 'postag'],
{0 : 'lemma'})},
name = 'default')
16 changes: 11 additions & 5 deletions src/isanlp/en/processor_lemmatizer_nltk_en.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ class ProcessorLemmatizerNltkEn:
Simple wrapper around NLTK WordNetLemmatizer.
"""

def __init__(self):
self._nltk_lmtzr = WordNetLemmatizer()

def __init__(self, delay_init = False):
self._nltk_lmtzr = None
if not delay_init:
self.init()

def init(self):
if self._nltk_lmtzr is None:
self._nltk_lmtzr = WordNetLemmatizer()

def __call__(self, tokens, sentences, postags):
"""Performs lemmatization of texts.
Expand All @@ -38,11 +44,11 @@ def __call__(self, tokens, sentences, postags):
Returns:
List of lists (sentences) of lemmas.
"""


assert self._nltk_lmtzr
result = []
for text_sent, postag_sent in zip(sentences, postags):
result.append([self._nltk_lmtzr.lemmatize(word.text.lower(), get_wordnet_pos(postag))
for (word, postag) in zip(CSentence(tokens, text_sent), postag_sent)])

return result

13 changes: 10 additions & 3 deletions src/isanlp/en/processor_tokenizer_nltk_en.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ class ProcessorTokenizerNltkEn:
Wrapper around NLTK RegexpTokenizer.
"""

def __init__(self):
self._proc = RegexpTokenizer(_en_regex)

def __init__(self, delay_init = False):
self._proc = None
if not delay_init:
self.init()

def init(self):
if self._proc is None:
self._proc = RegexpTokenizer(_en_regex)

def __call__(self, text):
"""Performs tokenization of text.
Expand All @@ -36,5 +42,6 @@ def __call__(self, text):
Returns:
List of Token objects.
"""


return [Token(text[start : end], start, end) for (start, end) in self._proc.span_tokenize(text)]
65 changes: 47 additions & 18 deletions src/isanlp/nlp_service.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,84 @@
import multiprocessing

from . import annotation_pb2 as pb
from . import annotation_pb2_grpc
from . import annotation_to_protobuf
from . import annotation_from_protobuf
from .pipeline_common import PipelineCommon

import grpc
import logging


logger = logging.getLogger('isanlp')

def _expand_ppl(ppl):
res_dict = {k : v[0] for k, v in ppl.get_processors().items()}
return res_dict

PPLS = None
def _init_process(ppls):
global PPLS
PPLS = ppls
standalone_procs = {}
for ppl in ppls.values():
if not isinstance(ppl, PipelineCommon):
continue
for proc in ppl.processors_iter():
if hasattr(proc, 'init'):
proc.init()
standalone_procs.update(_expand_ppl(ppl))
PPLS.update(standalone_procs)

def _process_input(ppl_name, input_annotations):
ppl = PPLS[ppl_name]
return ppl(*input_annotations)



class NlpService(annotation_pb2_grpc.NlpServiceServicer):
"""Basic NLP gRPC annotation service.
Args:
ppls: dictionary {<pipeline name> : <pipeline object>}
"""

def __init__(self, ppls):
self._ppls = ppls


def __init__(self, ppls, max_workers = 1):
self._pool = multiprocessing.Pool(processes=max_workers,
initializer = _init_process,
initargs = (ppls,) )



def process(self, request, context):
"""(gRPC method) Processes text document with a specified pipeline.
The request contains the name of a pipeline to invoke and required annotations.
"""

logger.info('Processing incoming request with "{}"...'.format(request.pipeline_name))
ppl = self._ppls[request.pipeline_name]
input_annotations = annotation_from_protobuf.convert_annotation(request.input_annotations)
res = ppl(*input_annotations)
logger.info('Done.')


logger.info('Processing incoming request with "{}"...'.format(request.pipeline_name))
res = self._pool.apply(_process_input, args=(request.pipeline_name, input_annotations))
pb_res = annotation_to_protobuf.convert_annotation(res)
reply = pb.ProcessReply()
reply.output_annotations.Pack(pb_res)

return reply




def get_registered_pipelines(self, request, context):
"""(gRPC method) Outputs pipelines registered in the service."""

repl = RegisteredPipelinesReply()
for e in self._ppl.keys():
repl.add(e)
return repl

def add_to_server(self, server):
"""Is required for adding service to server.
Is invoked by NlpServiceServer.
"""

annotation_pb2_grpc.add_NlpServiceServicer_to_server(self, server)
15 changes: 7 additions & 8 deletions src/isanlp/nlp_service_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,29 @@

class NlpServiceServer:
"""Implements gRPC server for NLP annotation service.
Args:
ppl(dict): Dictionary of pipelines that will be registered in the service.
port(int): Serving port.
max_workers(int): workers for gRPC server.
"""

def __init__(self, ppls, port = 3333, max_workers = 1):
self._port = port
self._max_workers = max_workers
self._service = NlpService(ppls)
self._service = NlpService(ppls, max_workers)

def serve(self):
"""Initiates server for listening of incoming connections (blocking)."""

server = grpc.server(futures.ThreadPoolExecutor(max_workers = self._max_workers))

self._service.add_to_server(server)
server.add_insecure_port('[::]:{}'.format(self._port))
server.start()

try:
while True:
time.sleep(60)
except KeyboardInterrupt:
server.stop(0)

8 changes: 8 additions & 0 deletions src/isanlp/pipeline_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,11 @@ def __call__(self, *input_data):

def get_processors(self):
return self._processors

def processors_iter(self):
for proc_stuff in self._processors.values():
proc = proc_stuff[0]
if hasattr(proc, 'processors_iter'):
yield from proc.processors_iter()
else:
yield proc
8 changes: 7 additions & 1 deletion src/isanlp/pipeline_conditional.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import itertools

class PipelineConditional:
def __init__(self, condition, ppl_dict, default_ppl = None):
self._condition = condition
Expand All @@ -15,4 +17,8 @@ def __call__(self, *args):
return self._default(*args)

raise RuntimeError('No such option: {}.'.format(cond_res))


def processors_iter(self):
for ppl in itertools.chain(self._ppl_dict.values(), [self._default]):
if ppl is not None:
yield from ppl.processors_iter()
39 changes: 23 additions & 16 deletions src/isanlp/pipeline_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,26 @@
from .pipeline_common import PipelineCommon


_ppl_cond = PipelineConditional((lambda _, lang: lang),
{'ru' : dflt_ru.PIPELINE_DEFAULT,
'en' : dflt_en.PIPELINE_DEFAULT},
default_ppl = dflt_ru.PIPELINE_DEFAULT)


PIPELINE_DEFAULT = PipelineCommon([(ProcessorPolyglot().detect_language,
['text'],
{0 : 'lang'}),
(_ppl_cond,
['text', 'lang'],
{'tokens' : 'tokens',
'sentences' : 'sentences',
'postag' : 'postag',
'lemma' : 'lemma'})],
name = 'default')



def create_pipeline(delay_init = False):
ru_ppl = dflt_ru.create_pipeline(delay_init)
_ppl_cond = PipelineConditional((lambda _, lang: lang),
{'ru' : ru_ppl,
'en' : dflt_en.create_pipeline(delay_init)},
default_ppl = ru_ppl)

ppl = PipelineCommon([(ProcessorPolyglot().detect_language,
['text'],
{0 : 'lang'}),
(_ppl_cond,
['text', 'lang'],
{'tokens' : 'tokens',
'sentences' : 'sentences',
'postag' : 'postag',
'lemma' : 'lemma'})],
name = 'default')


return ppl
19 changes: 13 additions & 6 deletions src/isanlp/processor_sentence_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@ class ProcessorSentenceSplitter:
Simple wrapper around NLTK component. Suitable for european languages.
"""

def __init__(self):
punkt_param = PunktParameters()
punkt_param.abbrev_types = self.compile_abbreviations()
self.sent_tokeniser_ = PunktSentenceTokenizer(punkt_param)

def __init__(self, delay_init = False):
self.sent_tokeniser_ = None
if not delay_init:
self.init()

def init(self):
if self.sent_tokeniser_ is None:
punkt_param = PunktParameters()
punkt_param.abbrev_types = self.compile_abbreviations()
self.sent_tokeniser_ = PunktSentenceTokenizer(punkt_param)

def __call__(self, tokens):
assert self.sent_tokeniser_
sents = self.sent_tokeniser_.sentences_from_tokens((e.text for e in tokens))
curr = 0
res_sents = list()
Expand All @@ -40,4 +47,4 @@ def clean_regexps(regexps):
en_abbrevs = get_dot_pairs('qwertyuiopasdfghjklzxcvbnm')
en_abbrevs += clean_regexps(_en_abbrevs)

return list(set(ru_abbrevs + en_abbrevs))
return list(set(ru_abbrevs + en_abbrevs))
Loading

0 comments on commit 3fac6ee

Please sign in to comment.