diff --git a/app/org/elastic4play/database/DBCreate.scala b/app/org/elastic4play/database/DBCreate.scala index 9d3032e..4dc2fdf 100644 --- a/app/org/elastic4play/database/DBCreate.scala +++ b/app/org/elastic4play/database/DBCreate.scala @@ -1,29 +1,24 @@ package org.elastic4play.database -import javax.inject.{ Inject, Singleton } - -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success, Try } +import javax.inject.{Inject, Singleton} import akka.stream.scaladsl.Sink - -import play.api.Logger -import play.api.libs.json.{ JsNull, JsObject, JsString, JsValue } -import play.api.libs.json.JsValue.jsValueToJsLookup - -import org.elasticsearch.action.index.IndexResponse -import org.elasticsearch.transport.RemoteTransportException - -import com.sksamuel.elastic4s.ElasticDsl.{ bulk, index } +import com.sksamuel.elastic4s.ElasticDsl.{bulk, index} import com.sksamuel.elastic4s.IndexAndTypes.apply import com.sksamuel.elastic4s.IndexDefinition import com.sksamuel.elastic4s.source.JsonDocumentSource import com.sksamuel.elastic4s.streams.RequestBuilder - -import org.elastic4play.{ CreateError, Timed } import org.elastic4play.models.BaseEntity +import org.elastic4play.{ConflictError, CreateError, InternalError} +import org.elasticsearch.action.index.IndexResponse import org.elasticsearch.index.engine.DocumentAlreadyExistsException -import org.elastic4play.ConflictError +import org.elasticsearch.transport.RemoteTransportException +import play.api.Logger +import play.api.libs.json.JsValue.jsValueToJsLookup +import play.api.libs.json.{JsNull, JsObject, JsString, JsValue} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} /** * Service lass responsible for entity creation @@ -171,14 +166,15 @@ class DBCreate @Inject() ( * Class used to build index definition based on model name and attributes * This class is used by sink (ElasticSearch reactive stream) */ - private class AttributeRequestBuilder(modelName: String) extends RequestBuilder[JsObject] { + private class AttributeRequestBuilder() extends RequestBuilder[JsObject] { override def request(attributes: JsObject): IndexDefinition = { val docSource = JsonDocumentSource(JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString) val id = (attributes \ "_id").asOpt[String] val parent = (attributes \ "_parent").asOpt[String] val routing = (attributes \ "_routing").asOpt[String] orElse parent orElse id + val modelName = (attributes \ "_type").asOpt[String].getOrElse(throw InternalError("The entity doesn't contain _type attribute")) addId(id).andThen(addParent(parent)).andThen(addRouting(routing)) { - index into db.indexName → modelName doc docSource update true + index.into(db.indexName → modelName).doc(docSource).update(true) } } } @@ -186,5 +182,5 @@ class DBCreate @Inject() ( /** * build a akka stream sink that create entities */ - def sink(modelName: String): Sink[JsObject, Future[Unit]] = db.sink(new AttributeRequestBuilder(modelName)) + def sink(): Sink[JsObject, Future[Unit]] = db.sink(new AttributeRequestBuilder()) } \ No newline at end of file diff --git a/app/org/elastic4play/services/MigrationSrv.scala b/app/org/elastic4play/services/MigrationSrv.scala index 3da0a8d..34c3d8b 100644 --- a/app/org/elastic4play/services/MigrationSrv.scala +++ b/app/org/elastic4play/services/MigrationSrv.scala @@ -1,21 +1,21 @@ package org.elastic4play.services -import javax.inject.{ Inject, Singleton } +import javax.inject.{Inject, Singleton} import akka.NotUsed import akka.stream.Materializer -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.scaladsl.{Sink, Source} import com.sksamuel.elastic4s.ElasticDsl.search import com.sksamuel.elastic4s.IndexesAndTypes.apply import org.elastic4play.InternalError import org.elastic4play.database._ import play.api.Logger import play.api.libs.json.JsValue.jsValueToJsLookup -import play.api.libs.json._ import play.api.libs.json.Json.toJsFieldJsValueWrapper +import play.api.libs.json._ -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} case class MigrationEvent(modelName: String, current: Long, total: Long) extends EventMessage case object EndOfMigrationEvent extends EventMessage @@ -118,7 +118,7 @@ class MigrationSrv @Inject() ( eventSrv.publish(MigrationEvent(modelName, current.toLong, total)) entity } - .runWith(dbcreate.sink(modelName)) + .runWith(dbcreate.sink()) r.onComplete { x ⇒ println(s"migrateEntity($modelName) has finished : $x") } @@ -164,6 +164,7 @@ class MigrationSrv @Inject() ( } def isMigrating: Boolean = !migrationProcess.isCompleted + def isReady: Boolean = dbindex.indexStatus && !migrationProcess.isCompleted } /* Operation applied to the previous state of the database to get next version */ trait Operation extends ((String ⇒ Source[JsObject, NotUsed]) ⇒ (String ⇒ Source[JsObject, NotUsed]))