Skip to content

Commit

Permalink
#44 Add parameter to update methods to manager concurrent access
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 22, 2018
1 parent ab30372 commit 06d45ec
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 47 deletions.
9 changes: 5 additions & 4 deletions app/org/elastic4play/controllers/Fields.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ class Fields(private val fields: Map[String, InputValue]) {

def isEmpty: Boolean = fields.isEmpty

def addIfAbsent(name: String, value: String): Fields = getString(name) match {
case Some(_) this
case None set(name, value)
}
def addIfAbsent(name: String, value: String): Fields = getString(name).fold(set(name, value))(_ this)

def addIfAbsent(name: String, value: JsValue): Fields = getValue(name).fold(set(name, value))(_ this)

def addIfAbsent(name: String, value: InputValue): Fields = get(name).fold(set(name, value))(_ this)

def ++(other: GenTraversableOnce[(String, InputValue)]) = new Fields(fields ++ other)

Expand Down
5 changes: 3 additions & 2 deletions app/org/elastic4play/database/DBCreate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.concurrent.{ ExecutionContext, Future }

import play.api.Logger
import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json.{ JsNull, JsObject, JsString, JsValue }
import play.api.libs.json._

import akka.stream.scaladsl.Sink
import com.sksamuel.elastic4s.ElasticDsl.indexInto
Expand Down Expand Up @@ -71,7 +71,8 @@ class DBCreate @Inject() (
("_type" JsString(modelName)) +
("_id" JsString(indexResponse.id)) +
("_parent" parentId.fold[JsValue](JsNull)(JsString)) +
("_routing" JsString(routing.getOrElse(indexResponse.id))),
("_routing" JsString(routing.getOrElse(indexResponse.id))) +
("_version" -> JsNumber(indexResponse.version)),
convertError(attributes, _))
}

Expand Down
17 changes: 2 additions & 15 deletions app/org/elastic4play/database/DBFind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.concurrent.duration.{ DurationLong, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

import play.api.libs.json.{ JsNull, JsObject, JsString, Json }
import play.api.libs.json._
import play.api.{ Configuration, Logger }

import akka.NotUsed
Expand Down Expand Up @@ -89,19 +89,6 @@ class DBFind(
(src, total)
}

/**
* Transform search hit into JsObject
* This function parses hit source add _type, _routing, _parent and _id attributes
*/
private[database] def hit2json(hit: RichSearchHit) = {
val id = JsString(hit.id)
Json.parse(hit.sourceAsString).as[JsObject] +
("_type" JsString(hit.`type`)) +
("_routing" hit.fields.get("_routing").map(r JsString(r.java.getValue[String])).getOrElse(id)) +
("_parent" hit.fields.get("_parent").map(r JsString(r.java.getValue[String])).getOrElse(JsNull)) +
("_id" id)
}

/**
* Search entities in ElasticSearch
*
Expand All @@ -124,7 +111,7 @@ class DBFind(
searchWithoutScroll(searchDefinition, offset, limit)
}

(src.map(hit2json), total)
(src.map(DBUtils.hit2json), total)
}

/**
Expand Down
18 changes: 13 additions & 5 deletions app/org/elastic4play/database/DBModify.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy

import org.elastic4play.models.BaseEntity

case class ModifyConfig(retryOnConflict: Int = 5, refreshPolicy: RefreshPolicy = RefreshPolicy.WAIT_UNTIL, version: Option[Long] = None)
object ModifyConfig {
def default: ModifyConfig = ModifyConfig(5, RefreshPolicy.WAIT_UNTIL, None)
}

@Singleton
class DBModify @Inject() (
db: DBConfiguration,
Expand Down Expand Up @@ -69,25 +74,28 @@ class DBModify @Inject() (
* @param entity entity to update
* @param updateAttributes contains attributes to update. JSON object contains key (attribute name) and value.
* Sub attribute can be updated using dot notation ("attr.subattribute").
* @param modifyConfig modification parameter (retryOnConflict and refresh policy)
* @return new version of the entity
*/
def apply(entity: BaseEntity, updateAttributes: JsObject): Future[BaseEntity] = {
def apply(entity: BaseEntity, updateAttributes: JsObject, modifyConfig: ModifyConfig): Future[BaseEntity] = {
db
.execute {
update(entity.id)
val updateDefinition = update(entity.id)
.in(db.indexName entity.model.modelName)
.routing(entity.routing)
.script(buildScript(entity, updateAttributes))
.fetchSource(true)
.retryOnConflict(5)
.refresh(RefreshPolicy.WAIT_UNTIL)
.retryOnConflict(modifyConfig.retryOnConflict)
.refresh(modifyConfig.refreshPolicy)
modifyConfig.version.fold(updateDefinition)(updateDefinition.version(_))
}
.map { updateResponse
entity.model(Json.parse(updateResponse.get.sourceAsString).as[JsObject] +
("_type" JsString(entity.model.modelName)) +
("_id" JsString(entity.id)) +
("_routing" JsString(entity.routing)) +
("_parent" entity.parentId.fold[JsValue](JsNull)(JsString)))
("_parent" entity.parentId.fold[JsValue](JsNull)(JsString)) +
("_version" -> JsNumber(updateResponse.version)))
}
}
}
18 changes: 11 additions & 7 deletions app/org/elastic4play/database/DBUtils.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.elastic4play.database

import play.api.libs.json.{ JsNull, JsObject, JsString, Json }
import play.api.libs.json._

import com.sksamuel.elastic4s.ElasticDsl.fieldSort
import com.sksamuel.elastic4s.searches.RichSearchHit
Expand All @@ -26,14 +26,18 @@ object DBUtils {
.map(_._2) :+ fieldSort("_uid").order(DESC)
}

def hit2json( /*fields: Option[Seq[Attribute[_]]], */ hit: RichSearchHit): JsObject = {
val fieldsValue = hit.fields
/**
* Transform search hit into JsObject
* This function parses hit source add _type, _routing, _parent, _id and _version attributes
*/
def hit2json(hit: RichSearchHit) = {
val id = JsString(hit.id)
Option(hit.sourceAsString).filterNot(_ == "").fold(JsObject.empty)(s Json.parse(s).as[JsObject]) +
Json.parse(hit.sourceAsString).as[JsObject] +
("_type" JsString(hit.`type`)) +
("_routing" fieldsValue.get("_routing").map(r JsString(r.java.getValue[String])).getOrElse(id)) +
("_parent" fieldsValue.get("_parent").map(r JsString(r.java.getValue[String])).getOrElse(JsNull)) +
("_id" id)
("_routing" hit.fields.get("_routing").map(r JsString(r.java.getValue[String])).getOrElse(id)) +
("_parent" hit.fields.get("_parent").map(r JsString(r.java.getValue[String])).getOrElse(JsNull)) +
("_id" id) +
("_version" -> JsNumber(hit.version))
}

@scala.annotation.tailrec
Expand Down
1 change: 1 addition & 0 deletions app/org/elastic4play/models/ModelDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class BaseEntity(val model: BaseModelDef, val attributes: JsObject) {
val id = (attributes \ "_id").as[String]
val routing = (attributes \ "_routing").as[String]
lazy val parentId = (attributes \ "_parent").asOpt[String]
val version = (attributes \ "_version").as[Long]
def createdBy = (attributes \ "createdBy").as[String]
def createdAt = (attributes \ "createdAt").as[Date]
def updatedBy = (attributes \ "updatedBy").as[String]
Expand Down
4 changes: 2 additions & 2 deletions app/org/elastic4play/services/DeleteSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import play.api.libs.json.JsObject

import org.elastic4play.NotFoundError
import org.elastic4play.database.DBRemove
import org.elastic4play.database.{ DBRemove, ModifyConfig }
import org.elastic4play.models.{ AbstractModelDef, EntityDef }

@Singleton
Expand All @@ -21,7 +21,7 @@ class DeleteSrv @Inject() (
def apply[M <: AbstractModelDef[M, E], E <: EntityDef[M, E]](model: M, id: String)(implicit authContext: AuthContext): Future[E] = {
for {
entity getSrv[M, E](model, id)
newEntity updateSrv.doUpdate(entity, model.removeAttribute)
newEntity updateSrv.doUpdate(entity, model.removeAttribute, ModifyConfig.default)
_ = eventSrv.publish(AuditOperation(newEntity, AuditableAction.Delete, JsObject.empty, authContext))
} yield newEntity
}
Expand Down
22 changes: 11 additions & 11 deletions app/org/elastic4play/services/UpdateSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.scalactic.{ Bad, One }

import org.elastic4play.JsonFormat.dateFormat
import org.elastic4play.controllers.Fields
import org.elastic4play.database.DBModify
import org.elastic4play.database.{ DBModify, ModifyConfig }
import org.elastic4play.models.{ AbstractModelDef, BaseEntity, BaseModelDef, EntityDef }
import org.elastic4play.utils.{ RichFuture, RichOr }
import org.elastic4play.{ AttributeCheckingError, UnknownAttributeError }
Expand Down Expand Up @@ -46,12 +46,12 @@ class UpdateSrv @Inject() (
.fold(attrs Future.successful(JsObject(attrs)), errors Future.failed(AttributeCheckingError(model.modelName, errors)))
}

private[services] def doUpdate[E <: BaseEntity](entity: E, attributes: JsObject)(implicit authContext: AuthContext): Future[E] = {
private[services] def doUpdate[E <: BaseEntity](entity: E, attributes: JsObject, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[E] = {
for {
attributesAfterHook entity.model.updateHook(entity, addMetaFields(attributes))
checkedAttributes checkAttributes(attributesAfterHook, entity.model)
attributesWithAttachment attachmentSrv(entity.model)(checkedAttributes)
newEntity dbModify(entity, attributesWithAttachment)
newEntity dbModify(entity, attributesWithAttachment, modifyConfig)
} yield newEntity.asInstanceOf[E]
}

Expand All @@ -63,33 +63,33 @@ class UpdateSrv @Inject() (

private[services] def removeMetaFields(attrs: JsObject): JsObject = attrs - "updatedBy" - "updatedAt"

def apply[M <: AbstractModelDef[M, E], E <: EntityDef[M, E]](model: M, id: String, fields: Fields)(implicit authContext: AuthContext): Future[E] = {
def apply[M <: AbstractModelDef[M, E], E <: EntityDef[M, E]](model: M, id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[E] = {
for {
entity getSrv[M, E](model, id)
newEntity apply[E](entity, fields)
newEntity apply[E](entity, fields, modifyConfig)
} yield newEntity
}

def apply[M <: AbstractModelDef[M, E], E <: EntityDef[M, E]](model: M, ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[E]]] = {
def apply[M <: AbstractModelDef[M, E], E <: EntityDef[M, E]](model: M, ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[E]]] = {
Future.sequence {
ids.map { id
getSrv[M, E](model, id)
.flatMap(entity apply[E](entity, fields).toTry)
.flatMap(entity apply[E](entity, fields, modifyConfig).toTry)
}
}
}

def apply[E <: BaseEntity](entity: E, fields: Fields)(implicit authContext: AuthContext): Future[E] = {
def apply[E <: BaseEntity](entity: E, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[E] = {
for {
attributes fieldsSrv.parse(fields, entity.model).toFuture
newEntity doUpdate(entity, attributes)
newEntity doUpdate(entity, attributes, modifyConfig)
_ = eventSrv.publish(AuditOperation(newEntity, AuditableAction.Update, removeMetaFields(attributes), authContext))
} yield newEntity
}

def apply[E <: BaseEntity](entitiesAttributes: Seq[(E, Fields)])(implicit authContext: AuthContext): Future[Seq[Try[E]]] = {
def apply[E <: BaseEntity](entitiesAttributes: Seq[(E, Fields)], modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[E]]] = {
Future.sequence(entitiesAttributes.map {
case (entity, fields) apply(entity, fields).toTry
case (entity, fields) apply(entity, fields, modifyConfig).toTry
})
}
}
2 changes: 1 addition & 1 deletion test/org/elastic4play/database/DBFindSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class DBFindSpec extends PlaySpecification with Mockito {

val db = mock[DBConfiguration]
val dbfind = new DBFind(pageSize, keepAlive, db, ec, mat)
dbfind.hit2json(hit) must_== (doc +
DBUtils.hit2json(hit) must_== (doc +
("_id" JsString(id)) +
("_parent" JsString(parent)) +
("_routing" JsString(routing)) +
Expand Down

0 comments on commit 06d45ec

Please sign in to comment.