diff --git a/build.sbt b/build.sbt index 6328cf2370fad5..440db621525083 100644 --- a/build.sbt +++ b/build.sbt @@ -82,7 +82,9 @@ lazy val testDependencies = Seq( lazy val utilDependencies = Seq( "com.typesafe" % "config" % "1.3.0", - "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" + "org.rocksdb" % "rocksdbjni" % "5.8.0", + "org.slf4j" % "slf4j-api" % "1.7.25", + "org.apache.commons" % "commons-compress" % "1.15" ) lazy val root = (project in file(".")) diff --git a/python/sparknlp/annotator.py b/python/sparknlp/annotator.py index ff2f90ac19360e..f51fd75b6fea1e 100755 --- a/python/sparknlp/annotator.py +++ b/python/sparknlp/annotator.py @@ -48,6 +48,26 @@ def setOutputCol(self, value): return self._set(outputCol=value) +class AnnotatorWithEmbeddings(Params): + sourceEmbeddingsPath = Param(Params._dummy(), + "sourceEmbeddingsPath", + "Word embeddings file", + typeConverter=TypeConverters.toString) + embeddingsFormat = Param(Params._dummy(), + "embeddingsFormat", + "Word vectors file format", + typeConverter=TypeConverters.toInt) + embeddingsNDims = Param(Params._dummy(), + "embeddingsNDims", + "Number of dimensions for word vectors", + typeConverter=TypeConverters.toInt) + + def setEmbeddingsSource(self, path, nDims, format): + self._set(sourceEmbeddingsPath=path) + self._set(embeddingsFormat=format) + return self._set(embeddingsNDims=nDims) + + class AnnotatorTransformer(JavaModel, JavaMLReadable, JavaMLWritable, AnnotatorProperties): column_type = "array>>" @@ -478,7 +498,7 @@ class NorvigSweetingModel(JavaModel, JavaMLWritable, JavaMLReadable, AnnotatorPr -class NerCrfApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties): +class NerCrfApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties, AnnotatorWithEmbeddings): labelColumn = Param(Params._dummy(), "labelColumn", "Column with label per each token", diff --git a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala index bee0a1240ade8a..3b3b367abfb9d1 100644 --- a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala +++ b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala @@ -128,4 +128,4 @@ class DatasetEncoder(val startLabel: String = "@#Start") { result } } -} +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala index ddfcf1e9963f59..09e3c5a4fefab4 100644 --- a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala +++ b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala @@ -132,5 +132,3 @@ object DatasetReader { } } } - - diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala index c1df9007d55434..0d83d6f6370da4 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala @@ -2,7 +2,7 @@ package com.johnsnowlabs.nlp import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.types.{ArrayType, MetadataBuilder, StructField, StructType} import org.apache.spark.ml.util.DefaultParamsWritable @@ -24,8 +24,15 @@ abstract class AnnotatorApproach[M <: Model[M]] def train(dataset: Dataset[_]): M + def beforeTraining(spark: SparkSession): Unit = {} + + def onTrained(model: M, spark: SparkSession): Unit = {} + override final def fit(dataset: Dataset[_]): M = { - copyValues(train(dataset).setParent(this)) + beforeTraining(dataset.sparkSession) + val model = copyValues(train(dataset).setParent(this)) + onTrained(model, dataset.sparkSession) + model } override final def copy(extra: ParamMap): Estimator[M] = defaultCopy(extra) @@ -50,5 +57,4 @@ abstract class AnnotatorApproach[M <: Model[M]] StructField(getOutputCol, ArrayType(Annotation.dataType), nullable = false, metadataBuilder.build) StructType(outputFields) } - } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/common/WordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/common/WordEmbeddings.scala deleted file mode 100644 index 52379b9ec7f50b..00000000000000 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/common/WordEmbeddings.scala +++ /dev/null @@ -1,89 +0,0 @@ -package com.johnsnowlabs.nlp.annotators.common - -import java.io.{Closeable, File} -import java.nio.ByteBuffer - -import org.fusesource.leveldbjni.JniDBFactory.{bytes, factory} -import org.iq80.leveldb.Options - -import scala.io.Source - - -object WordEmbeddingsIndexer { - - private[common] def toBytes(embeddings: Array[Float]): Array[Byte] = { - val buffer = ByteBuffer.allocate(embeddings.length * 4) - for (value <- embeddings) { - buffer.putFloat(value) - } - buffer.array() - } - - private[common] def fromBytes(source: Array[Byte]): Array[Float] = { - val wrapper = ByteBuffer.wrap(source) - val result = Array.fill[Float](source.length / 4)(0f) - - for (i <- 0 until result.length) { - result(i) = wrapper.getFloat(i * 4) - } - result - } - - def indexGloveToLevelDb(source: String, dbFile: String): Unit = { - val options = new Options() - options.createIfMissing(true) - val db = factory.open(new File(dbFile), options) - var batch = db.createWriteBatch - try { - var batchSize = 0 - for (line <- Source.fromFile(source).getLines()) { - val items = line.split(" ") - val word = items(0) - val embeddings = items.drop(1).map(i => i.toFloat) - batch.put(bytes(word), toBytes(embeddings)) - - batchSize += 1 - if (batchSize % 1000 == 0) { - db.write(batch) - batch.close() - batch = db.createWriteBatch() - batchSize == 0 - } - } - - db.write(batch) - batch.close() - } finally { - db.close() - } - } -} - -case class WordEmbeddings(levelDbFile: String, - nDims: Int, - cacheSizeMB: Int = 100, - lruCacheSize: Int = 100000) extends Closeable{ - val options = new Options() - options.cacheSize(cacheSizeMB * 1048576) // 100 Mb - options.createIfMissing(true) - val db = factory.open(new File(levelDbFile), options) - val zeroArray = Array.fill[Float](nDims)(0f) - - val lru = new LruMap[String, Array[Float]](lruCacheSize) - - private def getEmbeddingsFromDb(word: String): Array[Float] = { - val result = db.get(bytes(word.toLowerCase.trim)) - if (result == null) - zeroArray - else - WordEmbeddingsIndexer.fromBytes(result) - } - - def getEmbeddings(word: String): Array[Float] = { - lru.getOrElseUpdate(word, getEmbeddingsFromDb(word)) - } - - override def close(): Unit = { - db.close() - } -} diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala index 80def1e790c93b..b9dcffdf71fe78 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala @@ -2,8 +2,6 @@ package com.johnsnowlabs.nlp.annotators.ner.crf import com.johnsnowlabs.nlp.util.io.ResourceHelper -import scala.io.Source - case class DictionaryFeatures(dict: Map[String, String]) { def get(tokens: Seq[String]): Seq[String] = { diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala index 6b285f7fc50040..98793ad3d16786 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala @@ -1,7 +1,8 @@ package com.johnsnowlabs.nlp.annotators.ner.crf import com.johnsnowlabs.ml.crf._ -import com.johnsnowlabs.nlp.annotators.common.{TaggedSentence, WordEmbeddings} +import com.johnsnowlabs.nlp.annotators.common.TaggedSentence +import com.johnsnowlabs.nlp.embeddings.WordEmbeddings import scala.collection.mutable diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index a5879a2e162bbe..8a86b9a6e99f38 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -9,6 +9,7 @@ import com.johnsnowlabs.nlp.annotators.common.NerTagged import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.embeddings.AnnotatorWithWordEmbeddings import org.apache.spark.ml.Pipeline import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, StringArrayParam} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} @@ -17,7 +18,9 @@ import org.apache.spark.sql.{DataFrame, Dataset} /* Algorithm for training Named Entity Recognition Model. */ -class NerCrfApproach(override val uid: String) extends AnnotatorApproach[NerCrfModel]{ +class NerCrfApproach(override val uid: String) + extends AnnotatorWithWordEmbeddings[NerCrfApproach, NerCrfModel] { + def this() = this(Identifiable.randomUID("NER")) override val description = "CRF based Named Entity Recognition Tagger" @@ -116,7 +119,8 @@ class NerCrfApproach(override val uid: String) extends AnnotatorApproach[NerCrfM val dictPaths = get(dicts).getOrElse(Array.empty[String]) val dictFeatures = DictionaryFeatures.read(dictPaths.toSeq) - val crfDataset = FeatureGenerator(dictFeatures).generateDataset(trainDataset) + val crfDataset = FeatureGenerator(dictFeatures, embeddings) + .generateDataset(trainDataset) val params = CrfParams( minEpochs = getOrDefault(minEpochs), diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index e8462ebdfe3f2c..8a9c179326f385 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -4,6 +4,7 @@ import com.johnsnowlabs.ml.crf.{LinearChainCrfModel, SerializedLinearChainCrfMod import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} +import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} import org.apache.hadoop.fs.Path import org.apache.spark.ml.param.StringArrayParam @@ -14,8 +15,7 @@ import org.apache.spark.sql.{Encoders, Row} /* Named Entity Recognition model */ -class NerCrfModel(override val uid: String) - extends AnnotatorModel[NerCrfModel] { +class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerCrfModel]{ def this() = this(Identifiable.randomUID("NER")) @@ -36,7 +36,7 @@ class NerCrfModel(override val uid: String) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) /** - Predicts Named Entities in input sentences + Predicts Named Entities in input sentences * @param sentences POS tagged sentences. * @return sentences with recognized Named Entities */ @@ -45,8 +45,9 @@ class NerCrfModel(override val uid: String) val crf = model.get + val fg = FeatureGenerator(dictionaryFeatures, embeddings) sentences.map{sentence => - val instance = FeatureGenerator(dictionaryFeatures).generate(sentence, crf.metadata) + val instance = fg.generate(sentence, crf.metadata) val labelIds = crf.predict(instance) val words = sentence.indexedTaggedWords .zip(labelIds.labels) @@ -116,6 +117,9 @@ object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { instance .setModel(crfModel.deserialize) .setDictionaryFeatures(dictFeatures) + + instance.deserializeEmbeddings(path, sparkSession.sparkContext) + instance } } @@ -136,7 +140,8 @@ object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { val dictPath = new Path(path, "dict").toString val dictLines = model.dictionaryFeatures.dict.toSeq.map(p => p._1 + ":" + p._2) Seq(dictLines).toDS.write.mode("overwrite").parquet(dictPath) + + model.serializeEmbeddings(path, sparkSession.sparkContext) } } } - diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala index 91a29ce602f7fa..73c561b03c49bf 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala @@ -74,7 +74,7 @@ case class CoNLL(targetColumn: Int = 3, annotatorType: String) { def pack(sentences: Seq[TaggedSentence]): Seq[Annotation] = { if (annotatorType == AnnotatorType.NAMED_ENTITY) - NerTagged.pack(sentences) + NerTagged.pack(sentences) else PosTagged.pack(sentences) } @@ -99,4 +99,4 @@ case class CoNLL(targetColumn: Int = 3, annotatorType: String) { val seq = readLines(lines).map(p => (p._1, pack(p._2))) seq.toDF(textColumn, labelColumn) } -} \ No newline at end of file +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala new file mode 100644 index 00000000000000..dd3b7797ae8288 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala @@ -0,0 +1,75 @@ +package com.johnsnowlabs.nlp.datasets + +import java.io.File + +import com.johnsnowlabs.ml.crf.{CrfDataset, DatasetMetadata, InstanceLabels, TextSentenceLabels} +import com.johnsnowlabs.nlp.AnnotatorType +import com.johnsnowlabs.nlp.annotators.common.TaggedSentence +import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsFormat, WordEmbeddingsIndexer} + +/** + * Helper class for to work with CoNLL 2003 dataset for NER task + * Class is made for easy use from Java + */ +class CoNLL2003NerReader(wordEmbeddingsFile: String, + wordEmbeddingsNDims: Int, + embeddingsFormat: WordEmbeddingsFormat.Format, + dictionaryFile: String) { + + private val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) + private val posReader = CoNLL(1, AnnotatorType.POS) + + private var wordEmbeddings: Option[WordEmbeddings] = None + + if (wordEmbeddingsFile != null) { + require(new File(wordEmbeddingsFile).exists()) + + var fileDb = wordEmbeddingsFile + ".db" + + if (!new File(fileDb).exists()) { + embeddingsFormat match { + case WordEmbeddingsFormat.Text => + WordEmbeddingsIndexer.indexText(wordEmbeddingsFile, fileDb) + case WordEmbeddingsFormat.Binary => + WordEmbeddingsIndexer.indexBinary(wordEmbeddingsFile, fileDb) + case WordEmbeddingsFormat.SparkNlp => + fileDb = wordEmbeddingsFile + } + + } + + if (new File(fileDb).exists()) { + wordEmbeddings = Some(WordEmbeddings(fileDb, wordEmbeddingsNDims)) + } + } + + private val dicts = if (dictionaryFile == null) Seq.empty[String] else Seq(dictionaryFile) + + private val fg = FeatureGenerator( + DictionaryFeatures.read(dicts), + wordEmbeddings + ) + + private def readDataset(file: String): Seq[(TextSentenceLabels, TaggedSentence)] = { + val labels = nerReader.readDocs(file).flatMap(_._2) + .map(sentence => TextSentenceLabels(sentence.tags)) + + val posTaggedSentences = posReader.readDocs(file).flatMap(_._2) + labels.zip(posTaggedSentences) + } + + def readNerDataset(file: String, metadata: Option[DatasetMetadata] = None): CrfDataset = { + val lines = readDataset(file) + if (metadata.isEmpty) + fg.generateDataset(lines) + else { + val labeledInstances = lines.map { line => + val instance = fg.generate(line._2, metadata.get) + val labels = InstanceLabels(line._1.labels.map(l => metadata.get.label2Id.getOrElse(l, -1))) + (labels, instance) + } + CrfDataset(labeledInstances, metadata.get) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala new file mode 100644 index 00000000000000..6b69f24aa62ea3 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -0,0 +1,114 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import com.johnsnowlabs.nlp.AnnotatorApproach +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkContext +import org.apache.spark.ml.param.{IntParam, Param} +import org.apache.spark.sql.SparkSession + + +/** + * Base class for annotators that uses Word Embeddings. + * This implementation is based on RocksDB so it has a compact RAM usage + * + * 1. User configures Word Embeddings by method 'setWordEmbeddingsSource'. + * 2. During training Word Embeddings are indexed as RockDB index file. + * 3. Than this index file is spread across the cluster. + * 4. Every model 'ModelWithWordEmbeddings' uses local RocksDB as Word Embeddings lookup. + */ +abstract class AnnotatorWithWordEmbeddings[A <: AnnotatorWithWordEmbeddings[A, M], M <: ModelWithWordEmbeddings[M]] + extends AnnotatorApproach[M] with AutoCloseable { + + val sourceEmbeddingsPath = new Param[String](this, "sourceEmbeddingsPath", "Word embeddings file") + val embeddingsFormat = new IntParam(this, "embeddingsFormat", "Word vectors file format") + val embeddingsNDims = new IntParam(this, "embeddingsNDims", "Number of dimensions for word vectors") + + + def setEmbeddingsSource(path: String, nDims: Int, format: WordEmbeddingsFormat.Format): A = { + set(this.sourceEmbeddingsPath, path) + set(this.embeddingsFormat, format.id) + set(this.embeddingsNDims, nDims).asInstanceOf[A] + } + + override def beforeTraining(spark: SparkSession): Unit = { + if (isDefined(sourceEmbeddingsPath)) { + // 1. Create tmp file for index + localPath = Some(WordEmbeddingsClusterHelper.createLocalPath()) + // 2. Index Word Embeddings + indexEmbeddings(localPath.get, spark.sparkContext) + // 3. Copy WordEmbeddings to cluster + WordEmbeddingsClusterHelper.copyIndexToCluster(localPath.get, spark.sparkContext) + // 4. Create Embeddings for usage during train + embeddings = Some(WordEmbeddings(localPath.get, $(embeddingsNDims))) + } + } + + override def onTrained(model: M, spark: SparkSession): Unit = { + if (isDefined(sourceEmbeddingsPath)) { + val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath.get).toString + + model.setDims($(embeddingsNDims)) + model.setIndexPath(fileName) + } + } + + var embeddings: Option[WordEmbeddings] = None + private var localPath: Option[String] = None + + private def indexEmbeddings(localFile: String, spark: SparkContext): Unit = { + val formatId = $(embeddingsFormat) + + val fs = FileSystem.get(spark.hadoopConfiguration) + + if (formatId == WordEmbeddingsFormat.Text.id) { + val tmpFile = Files.createTempFile("embeddings", ".bin").toAbsolutePath.toString() + fs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(tmpFile)) + WordEmbeddingsIndexer.indexText(tmpFile, localFile) + } else if (formatId == WordEmbeddingsFormat.Binary.id) { + val tmpFile = Files.createTempFile("embeddings", ".bin").toAbsolutePath.toString() + fs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(tmpFile)) + WordEmbeddingsIndexer.indexBinary(tmpFile, localFile) + } + else if (formatId == WordEmbeddingsFormat.SparkNlp.id) { + val hdfs = FileSystem.get(spark.hadoopConfiguration) + hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(localFile)) + } + } + + override def close(): Unit = { + if (embeddings.nonEmpty) + embeddings.get.close() + } +} + +object WordEmbeddingsClusterHelper { + + def createLocalPath(): String = { + Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") + .toAbsolutePath.toString + } + + def getClusterFileName(localFile: String): Path = { + val name = new File(localFile).getName + Path.mergePaths(new Path("/embeddings"), new Path(name)) + } + + def copyIndexToCluster(localFolder: String, spark: SparkContext): String = { + val fs = FileSystem.get(spark.hadoopConfiguration) + + val src = new Path(localFolder) + val dst = Path.mergePaths(fs.getHomeDirectory, getClusterFileName(localFolder)) + + fs.copyFromLocalFile(false, true, src, dst) + fs.deleteOnExit(dst) + + spark.addFile(dst.toString, true) + + dst.toString + } +} + diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala new file mode 100644 index 00000000000000..8fe438d8a5d93f --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -0,0 +1,84 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.File +import java.nio.file.{Files, Paths} + +import com.johnsnowlabs.nlp.AnnotatorModel +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.ivy.util.FileUtil +import org.apache.spark.{SparkContext, SparkFiles} +import org.apache.spark.ml.param.{IntParam, Param} + + +/** + * Base class for models that uses Word Embeddings. + * This implementation is based on RocksDB so it has a compact RAM usage + * + * Corresponding Approach have to implement AnnotatorWithWordEmbeddings + */ +abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] + extends AnnotatorModel[M] with AutoCloseable { + + val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") + val indexPath = new Param[String](this, "indexPath", "File that stores Index") + + def setDims(nDims: Int) = set(this.nDims, nDims).asInstanceOf[M] + def setIndexPath(path: String) = set(this.indexPath, path).asInstanceOf[M] + + lazy val embeddings: Option[WordEmbeddings] = get(indexPath).map { path => + // Have to copy file because RockDB changes it and Spark rises Exception + val src = SparkFiles.get(path) + val workPath = src + "_work" + if (!new File(workPath).exists()) + FileUtil.deepCopy(new File(src), new File(workPath), null, false) + + WordEmbeddings(workPath, $(nDims)) + } + + override def close(): Unit = { + if (embeddings.nonEmpty) + embeddings.get.close() + } + + def moveFolderFiles(folderSrc: String, folderDst: String): Unit = { + for (file <- new File(folderSrc).list()) { + Files.move(Paths.get(folderSrc, file), Paths.get(folderDst, file)) + } + + Files.delete(Paths.get(folderSrc)) + } + + def deserializeEmbeddings(path: String, spark: SparkContext): Unit = { + val fs = FileSystem.get(spark.hadoopConfiguration) + + val src = getEmbeddingsSerializedPath(path) + + // 1. Copy to local file + val localPath = WordEmbeddingsClusterHelper.createLocalPath + if (fs.exists(src)) { + fs.copyToLocalFile(src, new Path(localPath)) + + // 2. Move files from localPath/embeddings to localPath + moveFolderFiles(localPath + "/embeddings", localPath) + + // 2. Copy local file to cluster + WordEmbeddingsClusterHelper.copyIndexToCluster(localPath, spark) + + // 3. Set correct path + val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath).toString + setIndexPath(fileName) + } + } + + def serializeEmbeddings(path: String, spark: SparkContext): Unit = { + if (isDefined(indexPath)) { + val index = new Path(SparkFiles.get($(indexPath))) + val fs = FileSystem.get(spark.hadoopConfiguration) + + val dst = getEmbeddingsSerializedPath(path) + fs.copyFromLocalFile(false, true, index, dst) + } + } + + def getEmbeddingsSerializedPath(path: String) = Path.mergePaths(new Path(path), new Path("/embeddings")) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala new file mode 100644 index 00000000000000..3f3a41c2fb78a4 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala @@ -0,0 +1,42 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.Closeable +import org.rocksdb.{Options, RocksDB, WriteBatch, WriteOptions} + + +private [embeddings] case class RocksDbIndexer(dbFile: String, autoFlashAfter: Option[Integer] = None) extends Closeable{ + val options = new Options() + options.setCreateIfMissing(true) + options.setWriteBufferSize(20 * 1 << 20) + + RocksDB.loadLibrary() + val writeOptions = new WriteOptions() + + val db = RocksDB.open(options, dbFile) + var batch = new WriteBatch() + var batchSize = 0 + + def flush() = { + db.write(writeOptions, batch) + batch.close() + batch = new WriteBatch() + batchSize = 0 + } + + def add(word: String, vector: Array[Float]) = { + batch.put(word.getBytes, WordEmbeddingsIndexer.toBytes(vector)) + batchSize += 1 + + if (autoFlashAfter.isDefined) { + if (batchSize >= autoFlashAfter.get) + flush() + } + } + + override def close(): Unit = { + if (batchSize > 0) + flush() + + db.close + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala new file mode 100644 index 00000000000000..4a869b3c659ddc --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala @@ -0,0 +1,37 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io._ +import com.johnsnowlabs.nlp.util.LruMap +import org.rocksdb._ + + +case class WordEmbeddings(dbFile: String, + nDims: Int, + cacheSizeMB: Int = 100, + lruCacheSize: Int = 100000) extends Closeable{ + val options = new Options() + options.setRowCache(new LRUCache(cacheSizeMB * 1 << 20)) + RocksDB.loadLibrary() + + val db = RocksDB.openReadOnly(options, dbFile) + + val zeroArray = Array.fill[Float](nDims)(0f) + + val lru = new LruMap[String, Array[Float]](lruCacheSize) + + private def getEmbeddingsFromDb(word: String): Array[Float] = { + val result = db.get(word.toLowerCase.trim.getBytes()) + if (result == null) + zeroArray + else + WordEmbeddingsIndexer.fromBytes(result) + } + + def getEmbeddings(word: String): Array[Float] = { + lru.getOrElseUpdate(word, getEmbeddingsFromDb(word)) + } + + override def close(): Unit = { + db.close() + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala new file mode 100644 index 00000000000000..abb2994ec391c8 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala @@ -0,0 +1,9 @@ +package com.johnsnowlabs.nlp.embeddings + +object WordEmbeddingsFormat extends Enumeration { + type Format = Value + + val SparkNlp = Value(1) + val Text = Value(2) + val Binary = Value(3) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala new file mode 100644 index 00000000000000..a01580f0c69e45 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala @@ -0,0 +1,153 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io._ +import java.nio.ByteBuffer +import org.slf4j.LoggerFactory +import scala.io.Source + + +object WordEmbeddingsIndexer { + + private[embeddings] def toBytes(embeddings: Array[Float]): Array[Byte] = { + val buffer = ByteBuffer.allocate(embeddings.length * 4) + for (value <- embeddings) { + buffer.putFloat(value) + } + buffer.array() + } + + private[embeddings] def fromBytes(source: Array[Byte]): Array[Float] = { + val wrapper = ByteBuffer.wrap(source) + val result = Array.fill[Float](source.length / 4)(0f) + + for (i <- 0 until result.length) { + result(i) = wrapper.getFloat(i * 4) + } + result + } + + /** + * Indexes Word embeddings in CSV Format + */ + def indexText(source: Iterator[String], dbFile: String): Unit = { + TextIndexer.index(source, dbFile) + } + + /** + * Indexes Word embeddings in CSV Text File + */ + def indexText(source: String, dbFile: String): Unit ={ + TextIndexer.index(source, dbFile) + } + + + def indexBinary(source: DataInputStream, dbFile: String): Unit = { + BinaryIndexer.index(source, dbFile) + } + + /** + * Indexes Binary formatted file + */ + def indexBinary(source: String, dbFile: String): Unit = { + BinaryIndexer.index(source, dbFile) + } +} + + + + +private[embeddings] object TextIndexer { + + def index(source: Iterator[String], dbFile: String): Unit = { + val indexer = RocksDbIndexer(dbFile, Some(1000)) + + try { + for (line <- source) { + val items = line.split(" ") + val word = items(0) + val embeddings = items.drop(1).map(i => i.toFloat) + indexer.add(word, embeddings) + } + } finally { + indexer.close() + } + } + + def index(source: String, dbFile: String): Unit = { + val lines = Source.fromFile(source).getLines() + index(lines, dbFile) + } +} + + +private[embeddings] object BinaryIndexer { + + private val logger = LoggerFactory.getLogger("WordEmbeddings") + + def index(source: DataInputStream, dbFile: String): Unit = { + val indexer = RocksDbIndexer(dbFile, Some(1000)) + + try { + // File Header + val numWords = Integer.parseInt(readString(source)) + val vecSize = Integer.parseInt(readString(source)) + + // File Body + for (i <- 0 until numWords) { + val word = readString(source) + + // Unit Vector + val vector = readFloatVector(source, vecSize) + indexer.add(word, vector) + } + + logger.info(s"Loaded $numWords words, vector size $vecSize") + } finally { + indexer.close() + } + } + + def index(source: String, dbFile: String): Unit = { + + val ds = new DataInputStream(new BufferedInputStream(new FileInputStream(source), 1 << 15)) + + try { + index(ds, dbFile) + } finally { + ds.close() + } + } + + /** + * Read a string from the binary model (System default should be UTF-8): + */ + private def readString(ds: DataInputStream): String = { + val byteBuffer = new ByteArrayOutputStream() + + var isEnd = false + while (!isEnd) { + val byteValue = ds.readByte() + if ((byteValue != 32) && (byteValue != 10)) { + byteBuffer.write(byteValue) + } else if (byteBuffer.size() > 0) { + isEnd = true + } + } + + val word = byteBuffer.toString() + byteBuffer.close() + word + } + + /** + * Read a Vector - Array of Floats from the binary model: + */ + private def readFloatVector(ds: DataInputStream, vectorSize: Int): Array[Float] = { + // Read Bytes + val vectorBuffer = Array.fill[Byte](4 * vectorSize)(0) + ds.read(vectorBuffer) + + // Convert Bytes to Floats + WordEmbeddingsIndexer.fromBytes(vectorBuffer) + } +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/common/LruMap.scala b/src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala similarity index 94% rename from src/main/scala/com/johnsnowlabs/nlp/annotators/common/LruMap.scala rename to src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala index 46cbfa46b6601d..8e530b4b440c55 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/common/LruMap.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala @@ -1,4 +1,4 @@ -package com.johnsnowlabs.nlp.annotators.common +package com.johnsnowlabs.nlp.util import scala.collection.mutable @@ -35,4 +35,4 @@ class LruMap[TKey, TValue](maxCacheSize: Int) { object KeyPriorityOrdering extends Ordering[KeyPriority] { override def compare(x: KeyPriority, y: KeyPriority): Int = x.priority.compareTo(y.priority) } -} \ No newline at end of file +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala b/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala new file mode 100644 index 00000000000000..aaeb53fc2655a1 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala @@ -0,0 +1,10 @@ +package com.johnsnowlabs.nlp.util + +/** + * Additional configure options that used by spark.nlp + */ +object SparkNlpConfigKeys { + + /** Folder to store word embeddings */ + val embeddingsFolder = "sparknlp.embeddings.folder" +} diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index b979ea5aa6742b..82a6454d35b980 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -6,4 +6,5 @@ log4j.appender.STDOUT.layout.ConversionPattern=[%5p] %m%n log4j.logger.RuleFactory=ERROR log4j.logger.PerceptronTraining=ERROR log4j.logger.PragmaticScorer=ERROR -log4j.logger.NorvigApproach=ERROR \ No newline at end of file +log4j.logger.NorvigApproach=ERROR +log4j.logger.CRF=INFO \ No newline at end of file diff --git a/src/test/resources/ner-corpus/test_embeddings.txt b/src/test/resources/ner-corpus/test_embeddings.txt new file mode 100644 index 00000000000000..844cecf36dac47 --- /dev/null +++ b/src/test/resources/ner-corpus/test_embeddings.txt @@ -0,0 +1,2 @@ +hello 0.1 0.6 0.0 +world 0 1 0 \ No newline at end of file diff --git a/src/test/resources/ner-corpus/test_ner_dataset.txt b/src/test/resources/ner-corpus/test_ner_dataset.txt index 77c9d9827a2d4f..2f77cc28f6013b 100644 --- a/src/test/resources/ner-corpus/test_ner_dataset.txt +++ b/src/test/resources/ner-corpus/test_ner_dataset.txt @@ -5,4 +5,4 @@ works VBZ I-VP O at IN I-PP O Airbus NNP I-NP ORG Germany NNP B-NP LOC -. . O O +. . O O \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala index 662632e86f4aa3..4347f579061c33 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala @@ -1,11 +1,7 @@ package com.johnsnowlabs.ml.crf -import java.io.File - -import com.johnsnowlabs.nlp.annotators.common.{TaggedSentence, WordEmbeddings, WordEmbeddingsIndexer} -import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} -import com.johnsnowlabs.nlp.AnnotatorType -import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.datasets.{CoNLL, CoNLL2003NerReader} +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import scala.collection.mutable @@ -27,39 +23,19 @@ object CoNLL2003CrfTest extends App { val embeddingsDims = 100 val embeddingsFile = folder + s"glove.6B.${embeddingsDims}d.txt" - val wordEmbeddingsDb = folder + s"embeddings.${embeddingsDims}d.db" - - var wordEmbeddings: Option[WordEmbeddings] = None - - val time = System.nanoTime() - if (new File(embeddingsFile).exists() && !new File(wordEmbeddingsDb).exists()) { - WordEmbeddingsIndexer.indexGloveToLevelDb(embeddingsFile, wordEmbeddingsDb) - } - - if (new File(wordEmbeddingsDb).exists()) { - wordEmbeddings = Some(WordEmbeddings(wordEmbeddingsDb, embeddingsDims)) - } - val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) - val posReader = CoNLL(1, AnnotatorType.POS) - val fg = FeatureGenerator( - DictionaryFeatures.read(Seq("src/main/resources/ner-corpus/dict.txt")), - wordEmbeddings + val reader = new CoNLL2003NerReader( + embeddingsFile, + embeddingsDims, + WordEmbeddingsFormat.Text, + "/ner-corpus/dict.txt" ) - def readDataset(file: String): Seq[(TextSentenceLabels, TaggedSentence)] = { - val labels = nerReader.readDocs(file).flatMap(_._2) - .map(sentence => TextSentenceLabels(sentence.tags)) - - val posTaggedSentences = posReader.readDocs(file).flatMap(_._2) - labels.zip(posTaggedSentences) - } - def trainModel(file: String): LinearChainCrfModel = { System.out.println("Dataset Reading") val time = System.nanoTime() - val lines = readDataset(file) - val dataset = fg.generateDataset(lines) + val dataset = reader.readNerDataset(file) + System.out.println(s"Done, ${(System.nanoTime() - time)/1e9}\n") System.out.println("Start fitting") @@ -81,19 +57,20 @@ object CoNLL2003CrfTest extends App { val started = System.nanoTime() val predictedCorrect = mutable.Map[String, Int]() - val predicted = mutable.Map[String, Int]() + val predicted = mutable. Map[String, Int]() val correct = mutable.Map[String, Int]() - val testInstances = readDataset(file) + val testDataset = reader.readNerDataset(file, Some(model.metadata)) - for ((labels, sentence) <- testInstances) { - val instance = fg.generate(sentence, model.metadata) + for ((labels, sentence) <- testDataset.instances) { - val predictedLabels = model.predict(instance) + val predictedLabels = model.predict(sentence) .labels .map(l => model.metadata.labels(l)) + val correctLabels = + labels.labels.map{l => if (l >= 0) model.metadata.labels(l) else "unknown"} - for ((lCorrect, lPredicted) <- labels.labels.zip(predictedLabels)) { + for ((lCorrect, lPredicted) <- correctLabels.zip(predictedLabels)) { correct(lCorrect) = correct.getOrElseUpdate(lCorrect, 0) + 1 predicted(lPredicted) = predicted.getOrElse(lPredicted, 0) + 1 @@ -106,7 +83,7 @@ object CoNLL2003CrfTest extends App { System.out.println("label\tprec\trec\tf1") val totalCorrect = correct.filterKeys(label => label != "O").values.sum - val totalPredicted = correct.filterKeys(label => label != "O").values.sum + val totalPredicted = predicted.filterKeys(label => label != "O").values.sum val totalPredictedCorrect = predictedCorrect.filterKeys(label => label != "O").values.sum val rec = totalPredictedCorrect.toFloat / totalCorrect @@ -137,4 +114,3 @@ object CoNLL2003CrfTest extends App { System.out.println("\n\nQuality on test B data") testDataset(testFileB, model) } - diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala index 492b8bffe3f0fb..61ff42fa293060 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala @@ -8,6 +8,7 @@ import com.johnsnowlabs.nlp.annotators.ner.crf.NerCrfApproach import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.sql.DataFrame @@ -60,6 +61,7 @@ object CoNLL2003PipelineTest extends App { .setRandomSeed(100) .setMaxEpochs(20) .setOutputCol("ner") + .setEmbeddingsSource("glove.6B.100d.txt", 100, WordEmbeddingsFormat.Text) getPosStages() :+ nerTagger } diff --git a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala index 20b847e5b6ab71..bd08ae31385ca3 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala @@ -8,6 +8,7 @@ import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.annotators.sda.pragmatic.SentimentDetectorModel import com.johnsnowlabs.nlp.annotators.sda.vivekn.ViveknSentimentApproach import com.johnsnowlabs.nlp.annotators.spell.norvig.NorvigSweetingApproach +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import org.apache.spark.sql.{Dataset, Row} import org.scalatest._ @@ -152,6 +153,7 @@ object AnnotatorBuilder extends FlatSpec { this: Suite => .setMinEpochs(1) .setMaxEpochs(3) .setDatsetPath("src/test/resources/ner-corpus/test_ner_dataset.txt") + .setEmbeddingsSource("src/test/resources/ner-corpus/test_embeddings.txt", 3, WordEmbeddingsFormat.Text) .setC0(34) .setL2(3.0) .setOutputCol("ner") diff --git a/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala b/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala index 9e400a2b3f347b..faa0ebc8d81c5d 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala @@ -8,7 +8,7 @@ object SparkAccessor { .builder() .appName("test") .master("local[*]") - .config("spark.driver.memory","4G") + .config("spark.driver.memory","8G") .config("spark.kryoserializer.buffer.max","200M") .getOrCreate() } \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala index 8739ec15c5ff7c..8cfe7e79d359ba 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala @@ -76,4 +76,4 @@ class NerCrfApproachSpec extends FlatSpec { assert(tags == Seq("PER", "PER", "LOC")) } -} +} \ No newline at end of file