diff --git a/python/example/vivekn-sentiment/sentiment.ipynb b/python/example/vivekn-sentiment/sentiment.ipynb index 22653401696249..3aa72328ccce70 100644 --- a/python/example/vivekn-sentiment/sentiment.ipynb +++ b/python/example/vivekn-sentiment/sentiment.ipynb @@ -9,32 +9,15 @@ "outputs": [], "source": [ "#Imports\n", + "import time\n", "import sys\n", "sys.path.append('../../')\n", "\n", - "from pyspark.ml import Pipeline\n", + "from pyspark.ml import Pipeline, PipelineModel\n", "from sparknlp.annotator import *\n", "from sparknlp.base import DocumentAssembler, Finisher\n" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "from pyspark.sql import SparkSession\n", - "\n", - "spark = SparkSession.builder \\\n", - " .master(\"local[2]\") \\\n", - " .config(\"spark.jar\", \"lib/sparknlp.jar\") \\\n", - " .config(\"spark.driver.memory\", \"5g\")\\\n", - " .config(\"spark.dirver.maxResultSize\", \"2g\")\\\n", - " .getOrCreate()" - ] - }, { "cell_type": "code", "execution_count": null, @@ -137,9 +120,22 @@ "sentiment_detector = ViveknSentimentApproach() \\\n", " .setInputCols([\"spell\", \"sentence\"]) \\\n", " .setOutputCol(\"sentiment\") \\\n", + " .setPruneCorpus(False) \\\n", " .setPositiveSource(\"../../../src/test/resources/vivekn/positive\") \\\n", - " .setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n", - " .setPruneCorpus(False)\n" + " .setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "pos = PerceptronApproach() \\\n", + " .setInputCols([\"sentence\", \"spell\"]) \\\n", + " .setOutputCol(\"pos\")" ] }, { @@ -168,11 +164,15 @@ " normalizer,\n", " spell_checker,\n", " sentiment_detector,\n", + " pos,\n", " finisher\n", "])\n", "\n", + "start = time.time()\n", "sentiment_data = pipeline.fit(data).transform(data)\n", - "sentiment_data.show()" + "sentiment_data.show()\n", + "end = time.time()\n", + "print(\"Time elapsed pipeline process: \" + str(end - start))" ] }, { @@ -188,24 +188,27 @@ { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ + "start = time.time()\n", "pipeline.write().overwrite().save(\"./ps\")\n", - "pipeline.fit(data).write().overwrite().save(\"./ms\")" + "pipeline.fit(data).write().overwrite().save(\"./ms\")\n", + "end = time.time()\n", + "print(\"Time elapsed in write pipelines: \" + str(end - start))" ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "from pyspark.ml import Pipeline,PipelineModel" + "start = time.time()\n", + "p = Pipeline.read().load(\"./ps\")\n", + "pm = PipelineModel.read().load(\"./ms\")\n", + "end = time.time()\n", + "print(\"Time elapsed in read pipelines: \" + str(end - start))" ] }, { @@ -214,8 +217,11 @@ "metadata": {}, "outputs": [], "source": [ - "Pipeline.read().load(\"./ps\")\n", - "PipelineModel.read().load(\"./ms\")" + "start = time.time()\n", + "pm.transform(data).where(\"finished_sentiment not like '%negative%'\").show()\n", + "print(pm.transform(data).count())\n", + "end = time.time()\n", + "print(\"Time elapsed in using loaded pipelines: \" + str(end - start))" ] }, { diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala index 51b288451bf50c..d3ed36a2daff82 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala @@ -2,7 +2,6 @@ package com.johnsnowlabs.nlp import org.apache.spark.ml.Model import org.apache.spark.ml.param.ParamMap -import org.apache.spark.ml.util.DefaultParamsWritable import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types._ @@ -15,7 +14,7 @@ import org.apache.spark.sql.functions.{array, udf} */ abstract class AnnotatorModel[M <: Model[M]] extends Model[M] - with DefaultParamsWritable + with ParamsAndFeaturesWritable with HasAnnotatorType with HasInputAnnotationCols with HasOutputAnnotationCol { diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala new file mode 100644 index 00000000000000..b8738116d7a385 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala @@ -0,0 +1,35 @@ +package com.johnsnowlabs.nlp + +import com.johnsnowlabs.nlp.serialization.{ArrayFeature, Feature, MapFeature, StructFeature} + +import scala.collection.mutable.ArrayBuffer + +trait HasFeatures { + + val features: ArrayBuffer[Feature[_, _, _]] = ArrayBuffer.empty + + protected def set[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setValue(Some(value)); this} + + protected def set[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setValue(Some(value)); this} + + protected def set[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + + protected def get[T](feature: ArrayFeature[T]): Option[Array[T]] = feature.get + + protected def get[K, V](feature: MapFeature[K, V]): Option[Map[K, V]] = feature.get + + protected def get[T](feature: StructFeature[T]): Option[T] = feature.get + + protected def $$[T](feature: ArrayFeature[T]): Array[T] = feature.getValue + + protected def $$[K, V](feature: MapFeature[K, V]): Map[K, V] = feature.getValue + + protected def $$[T](feature: StructFeature[T]): T = feature.getValue + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala new file mode 100644 index 00000000000000..6194d743ad7ff1 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala @@ -0,0 +1,31 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.util.{DefaultParamsReadable, MLReader} +import org.apache.spark.sql.SparkSession + +class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T], onRead: (T, String, SparkSession) => Unit) extends MLReader[T] { + + override def load(path: String): T = { + + val instance = baseReader.load(path) + + for (feature <- instance.features) { + val value = feature.deserialize(sparkSession, path, feature.name) + feature.setValue(value) + } + + onRead(instance, path, sparkSession) + + instance + } +} + +trait ParamsAndFeaturesReadable[T <: HasFeatures] extends DefaultParamsReadable[T] { + + def onRead(instance: T, path: String, spark: SparkSession): Unit = {} + + override def read: MLReader[T] = new FeaturesReader( + super.read, + (instance: T, path: String, spark: SparkSession) => onRead(instance, path, spark) + ) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala new file mode 100644 index 00000000000000..aac623b487c02d --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala @@ -0,0 +1,32 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.param.Params +import org.apache.spark.ml.util.{DefaultParamsWritable, MLWriter} +import org.apache.spark.sql.SparkSession + +class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter, onWritten: (String, SparkSession) => Unit) + extends MLWriter with HasFeatures { + + override protected def saveImpl(path: String): Unit = { + baseWriter.save(path) + + for (feature <- annotatorWithFeatures.features) { + feature.serializeInfer(sparkSession, path, feature.name, feature.getValue) + } + + onWritten(path, sparkSession) + + } +} + +trait ParamsAndFeaturesWritable extends DefaultParamsWritable with Params with HasFeatures { + + def onWritten(path: String, spark: SparkSession): Unit = {} + + override def write: MLWriter = new FeaturesWriter( + this, + super.write, + (path: String, spark: SparkSession) => onWritten(path, spark) + ) + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index 3673e5564e25e5..efdb6d6adc0961 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -1,12 +1,12 @@ package com.johnsnowlabs.nlp.annotators -import com.johnsnowlabs.nlp.annotators.common.StringMapParam +import com.johnsnowlabs.nlp.serialization.MapFeature import com.johnsnowlabs.nlp.util.io.ResourceHelper -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} import com.typesafe.config.Config import com.johnsnowlabs.nlp.util.ConfigHelper import org.apache.spark.ml.param.Param -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.util.Identifiable import scala.collection.JavaConverters._ @@ -25,7 +25,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { private val config: Config = ConfigHelper.retrieve - val lemmaDict: StringMapParam = new StringMapParam(this, "lemmaDict", "provide a lemma dictionary") + val lemmaDict: MapFeature[String, String] = new MapFeature(this, "lemmaDict") val lemmaFormat: Param[String] = new Param[String](this, "lemmaFormat", "TXT or TXTDS for reading dictionary as dataset") @@ -52,15 +52,23 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { def this() = this(Identifiable.randomUID("LEMMATIZER")) - def getLemmaDict: Map[String, String] = $(lemmaDict) + def getLemmaDict: Map[String, String] = $$(lemmaDict) + protected def getLemmaFormat: String = $(lemmaFormat) + protected def getLemmaKeySep: String = $(lemmaKeySep) + protected def getLemmaValSep: String = $(lemmaValSep) def setLemmaDict(dictionary: String): this.type = { set(lemmaDict, Lemmatizer.retrieveLemmaDict(dictionary, $(lemmaFormat), $(lemmaKeySep), $(lemmaValSep))) } - def setLemmaDictHMap(dictionary: java.util.HashMap[String, String]): this.type = { set(lemmaDict, dictionary.asScala.toMap) } + def setLemmaDictMap(dictionary: Map[String, String]): this.type = { + set(lemmaDict, dictionary) + } + def setLemmaFormat(value: String): this.type = set(lemmaFormat, value) + def setLemmaKeySep(value: String): this.type = set(lemmaKeySep, value) + def setLemmaValSep(value: String): this.type = set(lemmaValSep, value) /** * @return one to one annotation from token to a lemmatized word, if found on dictionary or leave the word as is @@ -72,19 +80,14 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { annotatorType, tokenAnnotation.begin, tokenAnnotation.end, - $(lemmaDict).getOrElse(token, token), + $$(lemmaDict).getOrElse(token, token), tokenAnnotation.metadata ) } } } -object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { - - /** - * Retrieves Lemma dictionary from configured compiled source set in configuration - * @return a Dictionary for lemmas - */ +object Lemmatizer extends ParamsAndFeaturesReadable[Lemmatizer] { protected def retrieveLemmaDict( lemmaFilePath: String, lemmaFormat: String, diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index 8a9c179326f385..d940f6b96fedb1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -1,15 +1,14 @@ package com.johnsnowlabs.nlp.annotators.ner.crf -import com.johnsnowlabs.ml.crf.{LinearChainCrfModel, SerializedLinearChainCrfModel} +import com.johnsnowlabs.ml.crf.LinearChainCrfModel import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} -import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import org.apache.hadoop.fs.Path +import com.johnsnowlabs.nlp.serialization.{MapFeature, StructFeature} +import com.johnsnowlabs.nlp.embeddings.{EmbeddingsReadable, ModelWithWordEmbeddings} +import com.johnsnowlabs.nlp.Annotation import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ -import org.apache.spark.sql.{Encoders, Row} /* @@ -20,18 +19,13 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC def this() = this(Identifiable.randomUID("NER")) val entities = new StringArrayParam(this, "entities", "List of Entities to recognize") - var model: Option[LinearChainCrfModel] = None - var dictionaryFeatures = DictionaryFeatures(Seq.empty) + val model: StructFeature[LinearChainCrfModel] = new StructFeature[LinearChainCrfModel](this, "crfModel") + val dictionaryFeatures: MapFeature[String, String] = new MapFeature[String, String](this, "dictionaryFeatures") - def setModel(crf: LinearChainCrfModel): NerCrfModel = { - model = Some(crf) - this - } + def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) - def setDictionaryFeatures(dictFeatures: DictionaryFeatures) = { - dictionaryFeatures = dictFeatures - this - } + def setDictionaryFeatures(dictFeatures: DictionaryFeatures): this.type = set(dictionaryFeatures, dictFeatures.dict) + setDefault(dictionaryFeatures, Map.empty[String, String]) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) @@ -41,11 +35,11 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC * @return sentences with recognized Named Entities */ def tag(sentences: Seq[PosTaggedSentence]): Seq[NerTaggedSentence] = { - require(model.isDefined, "model must be set before tagging") + require(model.isSet, "model must be set before tagging") - val crf = model.get + val crf = $$(model) - val fg = FeatureGenerator(dictionaryFeatures, embeddings) + val fg = FeatureGenerator(new DictionaryFeatures($$(dictionaryFeatures)), embeddings) sentences.map{sentence => val instance = fg.generate(sentence, crf.metadata) val labelIds = crf.predict(instance) @@ -72,76 +66,12 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC NerTagged.pack(taggedSentences) } - def shrink(minW: Float): NerCrfModel = { - model = model.map(m => m.shrink(minW)) - this - } + def shrink(minW: Float): NerCrfModel = set(model, $$(model).shrink(minW)) override val requiredAnnotatorTypes = Array(DOCUMENT, TOKEN, POS) override val annotatorType: AnnotatorType = NAMED_ENTITY - override def write: MLWriter = new NerCrfModel.NerCrfModelWriter(this, super.write) } -object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { - implicit val crfEncoder = Encoders.kryo[SerializedLinearChainCrfModel] - - override def read: MLReader[NerCrfModel] = new NerCrfModelReader(super.read) - - class NerCrfModelReader(baseReader: MLReader[NerCrfModel]) extends MLReader[NerCrfModel] { - override def load(path: String): NerCrfModel = { - val instance = baseReader.load(path) - - val dataPath = new Path(path, "data").toString - val loaded = sparkSession.sqlContext.read.format("parquet").load(dataPath) - val crfModel = loaded.as[SerializedLinearChainCrfModel].head - - val dictPath = new Path(path, "dict").toString - val dictLoaded = sparkSession.sqlContext.read.format("parquet") - .load(dictPath) - .collect - .head - - val lines = dictLoaded.asInstanceOf[Row].getAs[Seq[String]](0) - - val dict = lines - .map {line => - val items = line.split(":") - (items(0), items(1)) - } - .toMap - - val dictFeatures = new DictionaryFeatures(dict) - - instance - .setModel(crfModel.deserialize) - .setDictionaryFeatures(dictFeatures) - - instance.deserializeEmbeddings(path, sparkSession.sparkContext) - instance - } - } - - class NerCrfModelWriter(model: NerCrfModel, baseWriter: MLWriter) extends MLWriter { - - override protected def saveImpl(path: String): Unit = { - require(model.model.isDefined, "Crf Model must be defined before serialization") - - baseWriter.save(path) - - val spark = sparkSession - import spark.sqlContext.implicits._ - - val toStore = model.model.get.serialize - val dataPath = new Path(path, "data").toString - Seq(toStore).toDS.write.mode("overwrite").parquet(dataPath) - - val dictPath = new Path(path, "dict").toString - val dictLines = model.dictionaryFeatures.dict.toSeq.map(p => p._1 + ":" + p._2) - Seq(dictLines).toDS.write.mode("overwrite").parquet(dictPath) - - model.serializeEmbeddings(path, sparkSession.sparkContext) - } - } -} +object NerCrfModel extends EmbeddingsReadable[NerCrfModel] diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala index 8b614548fbce84..2a90cb45e6cd14 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp.annotators.pos.perceptron import com.johnsnowlabs.nlp.annotators.common._ -import com.johnsnowlabs.nlp.annotators.param.AnnotatorParam -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import com.johnsnowlabs.nlp.serialization.StructFeature +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} +import org.apache.spark.ml.util.Identifiable /** * Part of speech tagger that might use different approaches @@ -18,8 +18,8 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro /** Internal structure for target sentences holding their range information which is used for annotation */ private case class SentenceToBeTagged(tokenizedSentence: TokenizedSentence, start: Int, end: Int) - val model: AnnotatorParam[AveragedPerceptron, SerializedPerceptronModel] = - new AnnotatorParam[AveragedPerceptron, SerializedPerceptronModel](this, "POS Model", "POS Tagging approach") + val model: StructFeature[AveragedPerceptron] = + new StructFeature[AveragedPerceptron](this, "POS Model") override val annotatorType: AnnotatorType = POS @@ -35,16 +35,16 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro */ def tag(tokenizedSentences: Array[TokenizedSentence]): Array[TaggedSentence] = { logger.debug(s"PREDICTION: Tagging:\nSENT: <<${tokenizedSentences.map(_.condense).mkString(">>\nSENT<<")}>> model weight properties in 'bias' " + - s"feature:\nPREDICTION: ${$(model).getWeights("bias").mkString("\nPREDICTION: ")}") + s"feature:\nPREDICTION: ${$$(model).getWeights("bias").mkString("\nPREDICTION: ")}") var prev = START(0) var prev2 = START(1) tokenizedSentences.map(sentence => { val context: Array[String] = START ++: sentence.tokens.map(normalized) ++: END sentence.indexedTokens.zipWithIndex.map { case (IndexedToken(word, begin, end), i) => - val tag = $(model).getTagBook.find(_.word == word.toLowerCase).map(_.tag).getOrElse( + val tag = $$(model).getTagBook.find(_.word == word.toLowerCase).map(_.tag).getOrElse( { val features = getFeatures(i, word, context, prev, prev2) - $(model).predict(features) + $$(model).predict(features) } ) prev2 = prev @@ -56,7 +56,7 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro def this() = this(Identifiable.randomUID("POS")) - def getModel: AveragedPerceptron = $(model) + def getModel: AveragedPerceptron = $$(model) def setModel(targetModel: AveragedPerceptron): this.type = set(model, targetModel) @@ -68,4 +68,4 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro } } -object PerceptronModel extends DefaultParamsReadable[PerceptronModel] \ No newline at end of file +object PerceptronModel extends ParamsAndFeaturesReadable[PerceptronModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala index 213f8e9017a1e0..cf60d22f167d74 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala @@ -1,16 +1,18 @@ package com.johnsnowlabs.nlp.annotators.sda.vivekn -import com.johnsnowlabs.nlp.annotators.common.{IntStringMapParam, Tokenized, TokenizedSentence} -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.spark.ml.param.{IntParam, StringArrayParam} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import com.johnsnowlabs.nlp.annotators.common.{Tokenized, TokenizedSentence} +import com.johnsnowlabs.nlp.serialization.{ArrayFeature, MapFeature} +import com.johnsnowlabs.nlp.util.ConfigHelper +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} +import com.typesafe.config.Config +import org.apache.spark.ml.param.IntParam +import org.apache.spark.ml.util.Identifiable class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[ViveknSentimentModel] { import com.johnsnowlabs.nlp.AnnotatorType._ - private val config: Config = ConfigFactory.load + private val config: Config = ConfigHelper.retrieve private val importantFeatureRatio = config.getDouble("nlp.viveknSentiment.importantFeaturesRatio") private val unimportantFeatureStep = config.getDouble("nlp.viveknSentiment.unimportantFeaturesStepRatio") private val featureLimit = config.getInt("nlp.viveknSentiment.featuresLimit") @@ -19,19 +21,24 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive override val requiredAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT) - protected val positive: IntStringMapParam = new IntStringMapParam(this, "positive_sentences", "positive sentences trained") - protected val negative: IntStringMapParam = new IntStringMapParam(this, "negative_sentences", "negative sentences trained") - protected val features: StringArrayParam = new StringArrayParam(this, "words", "unique words trained") + protected val positive: MapFeature[String, Int] = new MapFeature(this, "positive_sentences") + protected val negative: MapFeature[String, Int] = new MapFeature(this, "negative_sentences") + protected val words: ArrayFeature[String] = new ArrayFeature(this, "words") + protected val positiveTotals: IntParam = new IntParam(this, "positive_totals", "count of positive words") protected val negativeTotals: IntParam = new IntParam(this, "negative_totals", "count of negative words") def this() = this(Identifiable.randomUID("VIVEKN")) - private[vivekn] def setPositive(value: Map[String, Int]) = set(positive, value) - private[vivekn] def setNegative(value: Map[String, Int]) = set(negative, value) - private[vivekn] def setPositiveTotals(value: Int) = set(positiveTotals, value) - private[vivekn] def setNegativeTotals(value: Int) = set(negativeTotals, value) - private[vivekn] def setWords(value: Array[String]) = { + private[vivekn] def getPositive: Map[String, Int] = $$(positive) + private[vivekn] def getNegative: Map[String, Int] = $$(negative) + private[vivekn] def getFeatures: Array[String] = $$(words) + + private[vivekn] def setPositive(value: Map[String, Int]): this.type = set(positive, value) + private[vivekn] def setNegative(value: Map[String, Int]): this.type = set(negative, value) + private[vivekn] def setPositiveTotals(value: Int): this.type = set(positiveTotals, value) + private[vivekn] def setNegativeTotals(value: Int): this.type = set(negativeTotals, value) + private[vivekn] def setWords(value: Array[String]): this.type = { require(value.nonEmpty, "Word analysis for features cannot be empty. Set prune to false if training is small") val currentFeatures = scala.collection.mutable.Set.empty[String] val start = (value.length * importantFeatureRatio).ceil.toInt @@ -44,14 +51,15 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive Range(start, afterStart, step).foreach(k => { value.slice(k, k+step).foreach(currentFeatures.add) }) - set(features, currentFeatures.toArray) + + set(words, currentFeatures.toArray) } def classify(sentence: TokenizedSentence): Boolean = { - val words = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect($(features)).distinct - if (words.isEmpty) return true - val positiveProbability = words.map(word => scala.math.log(($(positive).getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum - val negativeProbability = words.map(word => scala.math.log(($(negative).getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum + val wordFeatures = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect($$(words)).distinct + if (wordFeatures.isEmpty) return true + val positiveProbability = wordFeatures.map(word => scala.math.log(($$(positive).getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum + val negativeProbability = wordFeatures.map(word => scala.math.log(($$(negative).getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum positiveProbability > negativeProbability } @@ -76,6 +84,7 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive ) }) } + } -object ViveknSentimentModel extends DefaultParamsReadable[ViveknSentimentModel] \ No newline at end of file +object ViveknSentimentModel extends ParamsAndFeaturesReadable[ViveknSentimentModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala index 4767a67bc55cd0..e12e0899eedb64 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp.annotators.spell.norvig -import com.johnsnowlabs.nlp.annotators.common.{IntStringMapParam, StringMapParam} -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} +import com.johnsnowlabs.nlp.serialization.MapFeature +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} import com.typesafe.config.{Config, ConfigFactory} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.util.Identifiable import org.slf4j.LoggerFactory import scala.collection.immutable.HashSet @@ -22,8 +22,8 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi private val alphabet = "abcdefghijjklmnopqrstuvwxyz".toCharArray private val vowels = "aeiouy".toCharArray - protected val wordCount: IntStringMapParam = new IntStringMapParam(this, "word_count", "word count") - protected val customDict: StringMapParam = new StringMapParam(this, "custom_dict", "custom dict") + protected val wordCount: MapFeature[String, Int] = new MapFeature(this, "wordCount") + protected val customDict: MapFeature[String, String] = new MapFeature(this, "customDict") private val logger = LoggerFactory.getLogger("NorvigApproach") private val config: Config = ConfigFactory.load @@ -36,15 +36,17 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi private val vowelSwapLimit = config.getInt("nlp.norvigChecker.vowelSwapLimit") private lazy val allWords: HashSet[String] = { - if ($(caseSensitive)) HashSet($(wordCount).keys.toSeq:_*) else HashSet($(wordCount).keys.toSeq.map(_.toLowerCase):_*) + if ($(caseSensitive)) HashSet($$(wordCount).keys.toSeq:_*) else HashSet($$(wordCount).keys.toSeq.map(_.toLowerCase):_*) } def this() = this(Identifiable.randomUID("SPELL")) def setWordCount(value: Map[String, Int]): this.type = set(wordCount, value) - def setCustomDict(value: Map[String, String]): this.type = set(customDict, value) + protected def getWordCount: Map[String, Int] = $$(wordCount) + protected def getCustomDict: Map[String, String] = $$(customDict) + /** Utilities */ /** number of items duplicated in some text */ def cartesianProduct[T](xss: List[List[_]]): List[List[_]] = xss match { @@ -98,7 +100,7 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi wordCount.getOrElse(word, 0) } - private def compareFrequencies(value: String): Int = frequency(value, $(wordCount)) + private def compareFrequencies(value: String): Int = frequency(value, $$(wordCount)) private def compareHammers(input: String)(value: String): Int = hammingDistance(input, value) /** Posibilities analysis */ @@ -167,9 +169,9 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi if (allWords.contains(word)) { logger.debug("Word found in dictionary. No spell change") Some(word) - } else if ($(customDict).contains(word)) { + } else if ($$(customDict).contains(word)) { logger.debug("Word custom dictionary found. Replacing") - Some($(customDict)(word)) + Some($$(customDict)(word)) } else if (allWords.contains(word.distinct)) { logger.debug("Word as distinct found in dictionary") Some(word.distinct) @@ -244,4 +246,4 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi } } -object NorvigSweetingModel extends DefaultParamsReadable[NorvigSweetingModel] +object NorvigSweetingModel extends ParamsAndFeaturesReadable[NorvigSweetingModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala new file mode 100644 index 00000000000000..3049abac384572 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala @@ -0,0 +1,10 @@ +package com.johnsnowlabs.nlp.embeddings + +import com.johnsnowlabs.nlp.ParamsAndFeaturesReadable +import org.apache.spark.sql.SparkSession + +trait EmbeddingsReadable[T <: ModelWithWordEmbeddings[_]] extends ParamsAndFeaturesReadable[T] { + override def onRead(instance: T, path: String, spark: SparkSession): Unit = { + instance.deserializeEmbeddings(path, spark.sparkContext) + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index 8fe438d8a5d93f..4319c5899e0a7a 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.ivy.util.FileUtil import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.ml.param.{IntParam, Param} +import org.apache.spark.sql.SparkSession /** @@ -22,8 +23,8 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") val indexPath = new Param[String](this, "indexPath", "File that stores Index") - def setDims(nDims: Int) = set(this.nDims, nDims).asInstanceOf[M] - def setIndexPath(path: String) = set(this.indexPath, path).asInstanceOf[M] + def setDims(nDims: Int): this.type = set(this.nDims, nDims) + def setIndexPath(path: String): this.type = set(this.indexPath, path) lazy val embeddings: Option[WordEmbeddings] = get(indexPath).map { path => // Have to copy file because RockDB changes it and Spark rises Exception @@ -54,7 +55,7 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val src = getEmbeddingsSerializedPath(path) // 1. Copy to local file - val localPath = WordEmbeddingsClusterHelper.createLocalPath + val localPath = WordEmbeddingsClusterHelper.createLocalPath() if (fs.exists(src)) { fs.copyToLocalFile(src, new Path(localPath)) @@ -80,5 +81,10 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] } } - def getEmbeddingsSerializedPath(path: String) = Path.mergePaths(new Path(path), new Path("/embeddings")) + def getEmbeddingsSerializedPath(path: String): Path = Path.mergePaths(new Path(path), new Path("/embeddings")) + + override def onWritten(path: String, spark: SparkSession): Unit = { + deserializeEmbeddings(path, spark.sparkContext) + } + } diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala new file mode 100644 index 00000000000000..da916d90f31bf5 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -0,0 +1,110 @@ +package com.johnsnowlabs.nlp.serialization + +import java.io.File +import java.nio.file.{Files, Paths} + +import com.johnsnowlabs.nlp.HasFeatures +import com.johnsnowlabs.nlp.embeddings.{ModelWithWordEmbeddings, WordEmbeddings, WordEmbeddingsClusterHelper} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.{SparkContext, SparkFiles} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.param.{ParamMap, Params} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +import scala.reflect.ClassTag + +abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: HasFeatures, val name: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { + model.features.append(this) + + final protected var value: Option[Broadcast[TComplete]] = None + + def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit + + final def serializeInfer(spark: SparkSession, path: String, field: String, value: Any): Unit = + serialize(spark, path, field, value.asInstanceOf[TComplete]) + + def deserialize(spark: SparkSession, path: String, field: String): Option[_] + + final protected def getFieldPath(path: String, field: String): Path = + Path.mergePaths(new Path(path), new Path("/fields/" + field)) + + final def get: Option[TComplete] = value.map(_.value) + final def getValue: TComplete = value.map(_.value).getOrElse(throw new Exception(s"feature $name is not set")) + final def setValue(v: Option[Any]): HasFeatures = { + if (isSet) value.get.destroy() + value = Some(sparkSession.sparkContext.broadcast[TComplete](v.get.asInstanceOf[TComplete])) + model + } + final def isSet: Boolean = value.isDefined + +} + +class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TValue, TValue, TValue](model, name) { + + implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] + + override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { + val dataPath = getFieldPath(path, field) + spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[TValue] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[TValue].first) + } else { + None + } + } + +} + +class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TKey, TValue, Map[TKey, TValue]](model, name) { + + implicit val encoder: Encoder[(TKey, TValue)] = Encoders.kryo[(TKey, TValue)] + + override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { + import spark.implicits._ + val dataPath = getFieldPath(path, field) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) + } + + + + override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) + } else { + None + } + } + +} + +class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TValue, TValue, Array[TValue]](model, name) { + + implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] + + override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { + val dataPath = getFieldPath(path, field) + spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[TValue].collect) + } else { + None + } + } + +} + diff --git a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala index 8cfe7e79d359ba..327f84e7d80738 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala @@ -16,14 +16,14 @@ class NerCrfApproachSpec extends FlatSpec { nerModel.write.overwrite.save("./test_crf_pipeline") val loadedNer = NerCrfModel.read.load("./test_crf_pipeline") - assert(nerModel.model.get.serialize == loadedNer.model.get.serialize) - assert(nerModel.dictionaryFeatures == loadedNer.dictionaryFeatures) + assert(nerModel.model.getValue.serialize == loadedNer.model.getValue.serialize) + assert(nerModel.dictionaryFeatures.getValue == loadedNer.dictionaryFeatures.getValue) } "NerCrfApproach" should "have correct set of labels" in { - assert(nerModel.model.isDefined) - val metadata = nerModel.model.get.metadata + assert(nerModel.model.isSet) + val metadata = nerModel.model.getValue.metadata assert(metadata.labels.toSeq == Seq("@#Start", "PER", "O", "ORG", "LOC")) } @@ -65,7 +65,7 @@ class NerCrfApproachSpec extends FlatSpec { "NerCrfModel" should "correctly handle entities param" in { val restrictedModel = new NerCrfModel() .setEntities(Array("PER", "LOC")) - .setModel(nerModel.model.get) + .setModel(nerModel.model.getValue) .setOutputCol(nerModel.getOutputCol) .setInputCols(nerModel.getInputCols)