Skip to content

Commit

Permalink
#409 Handle concurrent access to observable mini-reports
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 23, 2018
1 parent 0d8a45e commit c07a066
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 66 deletions.
44 changes: 28 additions & 16 deletions thehive-backend/app/services/AlertSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import models._

import org.elastic4play.InternalError
import org.elastic4play.controllers.{ Fields, FileInputValue }
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services.JsonFormat.attachmentFormat
import org.elastic4play.services.QueryDSL.{ groupByField, parent, selectCount, withId }
import org.elastic4play.services._
Expand Down Expand Up @@ -125,29 +126,40 @@ class AlertSrv(
}

def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Alert] =
updateSrv[AlertModel, Alert](alertModel, id, fields)
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Alert] =
updateSrv[AlertModel, Alert](alertModel, id, fields, modifyConfig)

def update(alert: Alert, fields: Fields)(implicit authContext: AuthContext): Future[Alert] =
updateSrv(alert, fields)
update(alert, fields, ModifyConfig.default)

def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = {
updateSrv[AlertModel, Alert](alertModel, ids, fields)
}
def update(alert: Alert, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Alert] =
updateSrv(alert, fields, modifyConfig)

def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] =
bulkUpdate(ids, fields, ModifyConfig.default)

def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] =
updateSrv[AlertModel, Alert](alertModel, ids, fields, modifyConfig)

def bulkUpdate(updates: Seq[(Alert, Fields)])(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] =
updateSrv[Alert](updates)
bulkUpdate(updates, ModifyConfig.default)

def bulkUpdate(updates: Seq[(Alert, Fields)], modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] =
updateSrv[Alert](updates, modifyConfig)

def markAsRead(alert: Alert)(implicit authContext: AuthContext): Future[Alert] = {
def markAsRead(alert: Alert, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = {
alert.caze() match {
case Some(_) updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Imported"))
case None updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Ignored"))
case Some(_) updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Imported"), modifyConfig)
case None updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Ignored"), modifyConfig)
}
}

def markAsUnread(alert: Alert)(implicit authContext: AuthContext): Future[Alert] = {
def markAsUnread(alert: Alert, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = {
alert.caze() match {
case Some(_) updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Updated"))
case None updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "New"))
case Some(_) updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Updated"), modifyConfig)
case None updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "New"), modifyConfig)
}
}

Expand Down Expand Up @@ -259,8 +271,8 @@ class AlertSrv(
updatedCase
}

def setCase(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Alert] = {
updateSrv(alert, Fields(Json.obj("case" caze.id, "status" AlertStatus.Imported)))
def setCase(alert: Alert, caze: Case, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = {
updateSrv(alert, Fields(Json.obj("case" caze.id, "status" AlertStatus.Imported)), modifyConfig)
}

def delete(id: String)(implicit Context: AuthContext): Future[Alert] =
Expand All @@ -272,8 +284,8 @@ class AlertSrv(

def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(alertModel, queryDef, aggs: _*)

def setFollowAlert(alertId: String, follow: Boolean)(implicit authContext: AuthContext): Future[Alert] = {
updateSrv[AlertModel, Alert](alertModel, alertId, Fields(Json.obj("follow" follow)))
def setFollowAlert(alertId: String, follow: Boolean, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = {
updateSrv[AlertModel, Alert](alertModel, alertId, Fields(Json.obj("follow" follow)), modifyConfig)
}

def similarCases(alert: Alert): Future[Seq[CaseSimilarity]] = {
Expand Down
21 changes: 15 additions & 6 deletions thehive-backend/app/services/ArtifactSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import models.{ CaseResolutionStatus, CaseStatus, _ }

import org.elastic4play.ConflictError
import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._
import org.elastic4play.utils.{ RichFuture, RichOr }

Expand Down Expand Up @@ -43,13 +44,21 @@ class ArtifactSrv @Inject() (
}
}

private def updateIfDeleted(caze: Case, fields: Fields)(implicit authContext: AuthContext): Future[Artifact] = {
private def updateIfDeleted(caze: Case, fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Artifact] = {
fieldsSrv.parse(fields, artifactModel).toFuture.flatMap { attrs
val updatedArtifact = for {
id artifactModel.computeId(caze, attrs)
artifact getSrv[ArtifactModel, Artifact](artifactModel, id)
if artifact.status() == ArtifactStatus.Deleted
updatedArtifact updateSrv[ArtifactModel, Artifact](artifactModel, artifact.id, fields.unset("data").unset("dataType").unset("attachment").set("status", "Ok"))
updatedArtifact updateSrv[ArtifactModel, Artifact](
artifactModel,
artifact.id,
fields
.unset("data")
.unset("dataType")
.unset("attachment")
.set("status", "Ok"),
modifyConfig)
} yield updatedArtifact
updatedArtifact.recoverWith {
case _ Future.failed(ConflictError("Artifact already exists", attrs))
Expand Down Expand Up @@ -77,11 +86,11 @@ class ArtifactSrv @Inject() (
getSrv[ArtifactModel, Artifact](artifactModel, id)
}

def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Artifact] =
updateSrv[ArtifactModel, Artifact](artifactModel, id, fields)
def update(id: String, fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Artifact] =
updateSrv[ArtifactModel, Artifact](artifactModel, id, fields, modifyConfig)

def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Artifact]]] = {
updateSrv.apply[ArtifactModel, Artifact](artifactModel, ids, fields)
def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Seq[Try[Artifact]]] = {
updateSrv.apply[ArtifactModel, Artifact](artifactModel, ids, fields, modifyConfig)
}

def delete(id: String)(implicit Context: AuthContext): Future[Artifact] =
Expand Down
15 changes: 11 additions & 4 deletions thehive-backend/app/services/CaseSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import models._

import org.elastic4play.InternalError
import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
Expand Down Expand Up @@ -101,13 +102,19 @@ class CaseSrv(
getSrv[CaseModel, Case](caseModel, id)

def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Case] =
updateSrv[CaseModel, Case](caseModel, id, fields)
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Case] =
updateSrv[CaseModel, Case](caseModel, id, fields, modifyConfig)

def update(caze: Case, fields: Fields)(implicit authContext: AuthContext): Future[Case] =
updateSrv(caze, fields)
update(caze, fields, ModifyConfig.default)

def update(caze: Case, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Case] =
updateSrv(caze, fields, modifyConfig)

def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Case]]] = {
updateSrv[CaseModel, Case](caseModel, ids, fields)
def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Seq[Try[Case]]] = {
updateSrv[CaseModel, Case](caseModel, ids, fields, modifyConfig)
}

def delete(id: String)(implicit Context: AuthContext): Future[Case] =
Expand Down
6 changes: 5 additions & 1 deletion thehive-backend/app/services/CaseTemplateSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import models.{ CaseTemplate, CaseTemplateModel }

import org.elastic4play.NotFoundError
import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
Expand Down Expand Up @@ -39,7 +40,10 @@ class CaseTemplateSrv @Inject() (
}

def update(id: String, fields: Fields)(implicit Context: AuthContext): Future[CaseTemplate] =
updateSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, id, fields)
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[CaseTemplate] =
updateSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, id, fields, modifyConfig)

def delete(id: String)(implicit Context: AuthContext): Future[Unit] =
deleteSrv.realDelete[CaseTemplateModel, CaseTemplate](caseTemplateModel, id)
Expand Down
11 changes: 9 additions & 2 deletions thehive-backend/app/services/DashboardSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.stream.scaladsl.Source
import models._

import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
Expand All @@ -34,10 +35,16 @@ class DashboardSrv @Inject() (
getSrv[DashboardModel, Dashboard](dashboardModel, id)

def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Dashboard] =
updateSrv[DashboardModel, Dashboard](dashboardModel, id, fields)
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Dashboard] =
updateSrv[DashboardModel, Dashboard](dashboardModel, id, fields, modifyConfig)

def update(dashboard: Dashboard, fields: Fields)(implicit authContext: AuthContext): Future[Dashboard] =
updateSrv(dashboard, fields)
update(dashboard, fields, ModifyConfig.default)

def update(dashboard: Dashboard, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Dashboard] =
updateSrv(dashboard, fields, modifyConfig)

def delete(id: String)(implicit Context: AuthContext): Future[Dashboard] =
deleteSrv[DashboardModel, Dashboard](dashboardModel, id)
Expand Down
6 changes: 5 additions & 1 deletion thehive-backend/app/services/LogSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.scaladsl.Source
import models.{ Log, LogModel, Task, TaskModel }

import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
Expand All @@ -36,7 +37,10 @@ class LogSrv @Inject() (
getSrv[LogModel, Log](logModel, id)

def update(id: String, fields: Fields)(implicit Context: AuthContext): Future[Log] =
updateSrv[LogModel, Log](logModel, id, fields)
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[Log] =
updateSrv[LogModel, Log](logModel, id, fields, modifyConfig)

def delete(id: String)(implicit Context: AuthContext): Future[Log] =
deleteSrv[LogModel, Log](logModel, id)
Expand Down
17 changes: 12 additions & 5 deletions thehive-backend/app/services/TaskSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import akka.stream.scaladsl.{ Sink, Source }
import models._

import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
Expand Down Expand Up @@ -44,20 +45,26 @@ class TaskSrv @Inject() (
def get(id: String): Future[Task] =
getSrv[TaskModel, Task](taskModel, id)

def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Task] = {
def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Task] =
update(id, fields, ModifyConfig.default)

def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Task] = {
getSrv[TaskModel, Task](taskModel, id)
.flatMap { task update(task, fields) }
.flatMap { task update(task, fields, modifyConfig) }
}

def update(task: Task, fields: Fields)(implicit authContext: AuthContext): Future[Task] = {
def update(task: Task, fields: Fields)(implicit authContext: AuthContext): Future[Task] =
update(task, fields, ModifyConfig.default)

def update(task: Task, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Task] = {
// if update status from waiting to something else and owner is not set, then set owner to user
val f = if (task.status() == TaskStatus.Waiting &&
!fields.getString("status").forall(_ == TaskStatus.Waiting.toString) &&
!fields.contains("owner") &&
task.owner().isEmpty)
fields.set("owner", authContext.userId)
else fields
updateSrv(task, f)
updateSrv(task, f, modifyConfig)
}

def closeTasksOfCase(caseIds: String*)(implicit authContext: AuthContext): Future[Seq[Try[Task]]] = {
Expand All @@ -74,7 +81,7 @@ class TaskSrv @Inject() (
case task (task, completeTask)
}
.runWith(Sink.seq)
.flatMap { taskUpdate updateSrv(taskUpdate) }
.flatMap { taskUpdate updateSrv(taskUpdate, ModifyConfig.default) }
}

def delete(id: String)(implicit authContext: AuthContext): Future[Task] =
Expand Down
18 changes: 11 additions & 7 deletions thehive-backend/app/services/UserSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.stream.scaladsl.Source
import models.{ Roles, User, UserModel, UserStatus }

import org.elastic4play.controllers.Fields
import org.elastic4play.database.DBIndex
import org.elastic4play.database.{ DBIndex, ModifyConfig }
import org.elastic4play.services._
import org.elastic4play.utils.Instance
import org.elastic4play.{ AuthenticationError, AuthorizationError }
Expand Down Expand Up @@ -69,13 +69,17 @@ class UserSrv @Inject() (

override def get(id: String): Future[User] = getSrv[UserModel, User](userModel, id)

def update(id: String, fields: Fields)(implicit Context: AuthContext): Future[User] = {
updateSrv[UserModel, User](userModel, id, fields)
}
def update(id: String, fields: Fields)(implicit Context: AuthContext): Future[User] =
update(id, fields, ModifyConfig.default)

def update(user: User, fields: Fields)(implicit Context: AuthContext): Future[User] = {
updateSrv(user, fields)
}
def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[User] =
updateSrv[UserModel, User](userModel, id, fields, modifyConfig)

def update(user: User, fields: Fields)(implicit Context: AuthContext): Future[User] =
update(user, fields, ModifyConfig.default)

def update(user: User, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[User] =
updateSrv(user, fields, modifyConfig)

def delete(id: String)(implicit Context: AuthContext): Future[User] =
deleteSrv[UserModel, User](userModel, id)
Expand Down
39 changes: 27 additions & 12 deletions thehive-cortex/app/connectors/cortex/services/CortexSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

import org.elastic4play.database.ModifyConfig

object CortexConfig {
def getCortexClient(name: String, configuration: Configuration, ws: CustomWSAPI): Option[CortexClient] = {
val url = configuration.getOptional[String]("url").getOrElse(sys.error("url is missing")).replaceFirst("/*$", "")
Expand Down Expand Up @@ -131,13 +133,17 @@ class CortexSrv @Inject() (
createSrv[JobModel, Job, Artifact](jobModel, artifact, fields.set("artifactId", artifact.id))
}

private[CortexSrv] def update(jobId: String, fields: Fields)(implicit Context: AuthContext): Future[Job] = {
getJob(jobId).flatMap(job update(job, fields))
}
private[CortexSrv] def update(jobId: String, fields: Fields)(implicit Context: AuthContext): Future[Job] =
update(jobId, fields, ModifyConfig.default)

private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext): Future[Job] = {
updateSrv[Job](job, fields)
}
private[CortexSrv] def update(jobId: String, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[Job] =
getJob(jobId).flatMap(job update(job, fields, modifyConfig))

private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext): Future[Job] =
update(job, fields, ModifyConfig.default)

private[CortexSrv] def update(job: Job, fields: Fields, modifyConfig: ModifyConfig)(implicit Context: AuthContext): Future[Job] =
updateSrv[Job](job, fields, modifyConfig)

def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Job, NotUsed], Future[Long]) = {
findSrv[JobModel, Job](jobModel, queryDef, range, sortBy)
Expand Down Expand Up @@ -202,6 +208,12 @@ class CortexSrv @Inject() (
getSrv[JobModel, Job](jobModel, jobId)
}

def retryIf[A](f: Throwable Boolean, maxRetry: Int)(body: Future[A]): Future[A] = {
body.recoverWith {
case e if maxRetry > 0 && f(e) retryIf(f, maxRetry - 1)(body)
}
}

def updateJobWithCortex(jobId: String, cortexJobId: String, cortex: CortexClient)(implicit authContext: AuthContext): Unit = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
Expand All @@ -225,14 +237,17 @@ class CortexSrv @Inject() (
.toOption
.flatMap(r (r \ "summary").asOpt[JsObject])
.getOrElse(JsObject.empty)
for {
artifact artifactSrv.get(job.artifactId())
reports = Try(Json.parse(artifact.reports()).asOpt[JsObject]).toOption.flatten.getOrElse(JsObject.empty)
newReports = reports + (job.analyzerId() jobSummary)
} artifactSrv.update(job.artifactId(), Fields.empty.set("reports", newReports.toString))
retryIf(_ true, 5) {
for {
artifact artifactSrv.get(job.artifactId())
reports = Try(Json.parse(artifact.reports()).asOpt[JsObject]).toOption.flatten.getOrElse(JsObject.empty)
newReports = reports + (job.analyzerId() jobSummary)
} yield artifactSrv.update(job.artifactId(), Fields.empty.set("reports", newReports.toString), ModifyConfig(retryOnConflict = 0, version = Some(artifact.version)))
}
.recover {
case t logger.warn(s"Unable to insert summary report in artifact", t)
}

}
}
.onComplete {
Expand All @@ -256,7 +271,7 @@ class CortexSrv @Inject() (
def submitJob(cortexId: Option[String], analyzerId: String, artifactId: String)(implicit authContext: AuthContext): Future[Job] = {
val cortexClient = cortexId match {
case Some(id) Future.successful(cortexConfig.instances.find(_.name == id))
case None if (cortexConfig.instances.size <= 1) Future.successful(cortexConfig.instances.headOption)
case None if (cortexConfig.instances.lengthCompare(1) <= 0) Future.successful(cortexConfig.instances.headOption)
else {
Future // If there are several cortex, select the first which has the analyzer
.traverse(cortexConfig.instances)(c c.getAnalyzer(analyzerId).map(_ Some(c)).recover { case _ None })
Expand Down
Loading

0 comments on commit c07a066

Please sign in to comment.