Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] phrases multicore using joblib threading #1433

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions gensim/models/phrases.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@

from gensim import utils, interfaces

from joblib import Parallel, delayed
import threading
lock = threading.Lock()

logger = logging.getLogger(__name__)


Expand All @@ -95,6 +99,37 @@ def _is_single(obj):
# If the first item isn't a string, assume obj is a corpus
return False, obj_iter

def count_vocab(self,sentence_no, sentence):
self.sentence_no = sentence_no

if sentence_no % self.progress_per == 0:
logger.info("PROGRESS: at sentence #%i, processed %i words and %i word types" %
(sentence_no, self.total_words, len(self.vocab)))

sentence = [utils.any2utf8(w) for w in sentence]

for bigram in zip(sentence, sentence[1:]):
lock.acquire()
try:
self.vocab[bigram[0]] += 1
self.vocab[self.delimiter.join(bigram)] += 1
finally:
lock.release()
self.total_words += 1


if sentence: # add last word skipped by previous loop
word = sentence[-1]
lock.acquire()
try:
self.vocab[word] += 1
finally:
lock.release()

if len(self.vocab) > self.max_vocab_size:
utils.prune_vocab(self.vocab, self.min_reduce)
self.min_reduce += 1


class Phrases(interfaces.TransformationABC):
"""
Expand All @@ -106,7 +141,7 @@ class Phrases(interfaces.TransformationABC):

"""
def __init__(self, sentences=None, min_count=5, threshold=10.0,
max_vocab_size=40000000, delimiter=b'_', progress_per=10000):
max_vocab_size=40000000, delimiter=b'_', progress_per=1000):
"""
Initialize the model from an iterable of `sentences`. Each sentence must be
a list of words (unicode strings) that will be used for training.
Expand Down Expand Up @@ -157,35 +192,31 @@ def __str__(self):
self.__class__.__name__, len(self.vocab), self.min_count,
self.threshold, self.max_vocab_size)




@staticmethod
def learn_vocab(sentences, max_vocab_size, delimiter=b'_', progress_per=10000):
def learn_vocab(self, sentences, max_vocab_size, delimiter=b'_', progress_per=10000):
"""Collect unigram/bigram counts from the `sentences` iterable."""
sentence_no = -1
total_words = 0
self.sentence_no = -1
self.total_words = 0
logger.info("collecting all words and their counts")
vocab = defaultdict(int)
min_reduce = 1
for sentence_no, sentence in enumerate(sentences):
if sentence_no % progress_per == 0:
logger.info("PROGRESS: at sentence #%i, processed %i words and %i word types" %
(sentence_no, total_words, len(vocab)))
sentence = [utils.any2utf8(w) for w in sentence]
for bigram in zip(sentence, sentence[1:]):
vocab[bigram[0]] += 1
vocab[delimiter.join(bigram)] += 1
total_words += 1

if sentence: # add last word skipped by previous loop
word = sentence[-1]
vocab[word] += 1

if len(vocab) > max_vocab_size:
utils.prune_vocab(vocab, min_reduce)
min_reduce += 1
self.vocab = defaultdict(int)
self.min_reduce = 1
self.max_vocab_size = max_vocab_size
self.delimiter = delimiter
self.progress_per = progress_per

Parallel(n_jobs= -1, backend="threading")\
(delayed (count_vocab)(self, sentence_no, sentence)\
for sentence_no, sentence in enumerate(sentences))

logger.info("collected %i word types from a corpus of %i words (unigram + bigrams) and %i sentences" %
(len(vocab), total_words, sentence_no + 1))
return min_reduce, vocab
(len(self.vocab), self.total_words, self.sentence_no + 1))

return self.min_reduce, self.vocab



def add_vocab(self, sentences):
"""
Expand All @@ -197,7 +228,7 @@ def add_vocab(self, sentences):
# directly, but gives the new sentences a fighting chance to collect
# sufficient counts, before being pruned out by the (large) accummulated
# counts collected in previous learn_vocab runs.
min_reduce, vocab = self.learn_vocab(sentences, self.max_vocab_size, self.delimiter, self.progress_per)
min_reduce, vocab = self.learn_vocab(self, sentences, self.max_vocab_size, self.delimiter, self.progress_per)

if len(self.vocab) > 0:
logger.info("merging %i counts into %s", len(vocab), self)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def finalize_options(self):
'annoy',
'tensorflow >= 1.1.0',
'keras >= 2.0.4',
'joblib',
]

setup(
Expand Down