Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Optimize v2/groups request performance #5973

Merged
merged 8 commits into from
Feb 9, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/main/scala/mesosphere/marathon/core/CoreModuleImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import mesosphere.marathon.core.task.termination.TaskTerminationModule
import mesosphere.marathon.core.task.tracker.InstanceTrackerModule
import mesosphere.marathon.core.task.update.TaskStatusUpdateProcessor
import mesosphere.marathon.storage.StorageModule
import mesosphere.util.NamedExecutionContext
import mesosphere.util.state.MesosLeaderInfo
import scala.concurrent.ExecutionContext

import scala.concurrent.ExecutionContext
import scala.util.Random

/**
Expand Down Expand Up @@ -81,7 +82,7 @@ class CoreModuleImpl @Inject() (
)

// TASKS

val storageExecutionContext = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "storage-module")
override lazy val taskTrackerModule =
new InstanceTrackerModule(clock, marathonConf, leadershipModule,
storageModule.instanceRepository, instanceUpdateSteps)(actorsModule.materializer)
Expand All @@ -90,7 +91,7 @@ class CoreModuleImpl @Inject() (
marathonConf,
lifecycleState)(
actorsModule.materializer,
ExecutionContexts.global,
storageExecutionContext,
actorSystem.scheduler,
actorSystem)

Expand Down Expand Up @@ -199,10 +200,11 @@ class CoreModuleImpl @Inject() (

// GROUP MANAGER

val groupManagerExecutionContext = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "group-manager-module")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 8 threads? Did we measure that 8 is a meaningful number? I'm not trying to overcomplicate things, but it seems to me that making these configurable would allow us to also easily run different tests to verify that the number makes sense.

Copy link
Contributor Author

@zen-dog zen-dog Feb 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 seems like a low enough number that it shouldn't matter. More important is the fact that we separate the thread pools from each other.

override lazy val groupManagerModule: GroupManagerModule = new GroupManagerModule(
marathonConf,
scheduler,
storageModule.groupRepository)(ExecutionContexts.global, eventStream, authModule.authorizer)
storageModule.groupRepository)(groupManagerExecutionContext, eventStream, authModule.authorizer)

// PODS

Expand Down Expand Up @@ -259,13 +261,15 @@ class CoreModuleImpl @Inject() (
// to inject it in provideSchedulerActor(...) method.
//
// TODO: this can be removed when MarathonSchedulerActor becomes a core component

val schedulerActionsExecutionContext = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "scheduler-actions")
override lazy val schedulerActions: SchedulerActions = new SchedulerActions(
storageModule.groupRepository,
healthModule.healthCheckManager,
taskTrackerModule.instanceTracker,
appOfferMatcherModule.launchQueue,
eventStream,
taskTerminationModule.taskKillService)(ExecutionContexts.global)
taskTerminationModule.taskKillService)(schedulerActionsExecutionContext)

override lazy val marathonScheduler: MarathonScheduler = new MarathonScheduler(eventStream, launcherModule.offerProcessor, taskStatusUpdateProcessor, storageModule.frameworkIdRepository, mesosLeaderInfo, marathonConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ package core.appinfo
import java.time.Clock

import com.google.inject.Inject
import mesosphere.marathon.MarathonSchedulerService
import mesosphere.marathon.core.appinfo.impl.{ AppInfoBaseData, DefaultInfoService }
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.storage.repository.TaskFailureRepository
import mesosphere.util.NamedExecutionContext

/**
* Provides a service to query information related to apps.
Expand All @@ -21,8 +21,11 @@ class AppInfoModule @Inject() (
healthCheckManager: HealthCheckManager,
marathonSchedulerService: MarathonSchedulerService,
taskFailureRepository: TaskFailureRepository) {

val ec = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "app-info-module")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ec? are we trying to stay within a linter line length rule :)

could we make this more readable pls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few akka/scala specific and accepted "short names" e.g. ec for ExecutionContext, mat for Materializer etc. Falls into the same category as loop counter for(i=0,..)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this thread pool be ideally related to the number of threads available?

Should we make it configurable? (even if via an undocumented environment variable)?

It seems like we should ideally create the execution context in the module and share it with the pods status controller?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ec is fine for me for the reason @zen-dog pointed out.
I agree with @timcharper that the number of threads could as well be configurable.


private[this] val appInfoBaseData = () => new AppInfoBaseData(
clock, taskTracker, healthCheckManager, marathonSchedulerService, taskFailureRepository, groupManager)
clock, taskTracker, healthCheckManager, marathonSchedulerService, taskFailureRepository, groupManager)(ec)

def appInfoService: AppInfoService = infoService
def groupInfoService: GroupInfoService = infoService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core.appinfo.impl

import java.time.Clock

import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.appinfo.{ AppInfo, EnrichedTask, TaskCounts, TaskStatsByVersion }
import mesosphere.marathon.core.deployment.{ DeploymentPlan, DeploymentStepInfo }
import mesosphere.marathon.core.group.GroupManager
Expand All @@ -14,11 +15,10 @@ import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.raml.{ PodInstanceState, PodInstanceStatus, PodState, PodStatus, Raml }
import mesosphere.marathon.state._
import mesosphere.marathon.storage.repository.TaskFailureRepository
import org.slf4j.LoggerFactory

import scala.async.Async.{ async, await }
import scala.collection.immutable.{ Map, Seq }
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal

// TODO(jdef) pods rename this to something like ResourceInfoBaseData
Expand All @@ -28,12 +28,9 @@ class AppInfoBaseData(
healthCheckManager: HealthCheckManager,
deploymentService: DeploymentService,
taskFailureRepository: TaskFailureRepository,
groupManager: GroupManager) {
groupManager: GroupManager)(implicit ec: ExecutionContext) extends StrictLogging {

import AppInfoBaseData._
import mesosphere.marathon.core.async.ExecutionContexts.global

if (log.isDebugEnabled) log.debug(s"new AppInfoBaseData $this")
logger.debug(s"new AppInfoBaseData $this")

lazy val runningDeployments: Future[Seq[DeploymentStepInfo]] = deploymentService.listRunningDeployments()

Expand All @@ -48,7 +45,7 @@ class AppInfoBaseData(
}

lazy val runningDeploymentsByAppFuture: Future[Map[PathId, Seq[Identifiable]]] = {
log.debug("Retrieving running deployments")
logger.debug("Retrieving running deployments")

val allRunningDeploymentsFuture: Future[Seq[DeploymentPlan]] = runningDeployments.map(_.map(_.plan))

Expand All @@ -67,32 +64,31 @@ class AppInfoBaseData(
}

lazy val instancesByRunSpecFuture: Future[InstanceTracker.InstancesBySpec] = {
log.debug("Retrieve tasks")
logger.debug("Retrieve tasks")
instanceTracker.instancesBySpec()
}

def appInfoFuture(app: AppDefinition, embed: Set[AppInfo.Embed]): Future[AppInfo] = {
@SuppressWarnings(Array("OptionGet", "TryGet"))
def appInfoFuture(app: AppDefinition, embeds: Set[AppInfo.Embed]): Future[AppInfo] = async {
val appData = new AppData(app)
embed.foldLeft(Future.successful(AppInfo(app))) { (infoFuture, embed) =>
infoFuture.flatMap { info =>
embed match {
case AppInfo.Embed.Counts =>
appData.taskCountsFuture.map(counts => info.copy(maybeCounts = Some(counts)))
case AppInfo.Embed.Readiness =>
readinessChecksByAppFuture.map(checks => info.copy(maybeReadinessCheckResults = Some(checks(app.id))))
case AppInfo.Embed.Deployments =>
runningDeploymentsByAppFuture.map(deployments => info.copy(maybeDeployments = Some(deployments(app.id))))
case AppInfo.Embed.LastTaskFailure =>
appData.maybeLastTaskFailureFuture.map { maybeLastTaskFailure =>
info.copy(maybeLastTaskFailure = maybeLastTaskFailure)
}
case AppInfo.Embed.Tasks =>
appData.enrichedTasksFuture.map(tasks => info.copy(maybeTasks = Some(tasks)))
case AppInfo.Embed.TaskStats =>
appData.taskStatsFuture.map(taskStats => info.copy(maybeTaskStats = Some(taskStats)))
}
}
}

val taskCountsOpt: Option[TaskCounts] = if (embeds.contains(AppInfo.Embed.Counts)) Some(await(appData.taskCountsFuture)) else None
val readinessChecksByAppOpt: Option[Map[PathId, Seq[ReadinessCheckResult]]] = if (embeds.contains(AppInfo.Embed.Readiness)) Some(await(readinessChecksByAppFuture)) else None
val runningDeploymentsByAppOpt: Option[Map[PathId, Seq[Identifiable]]] = if (embeds.contains(AppInfo.Embed.Deployments)) Some(await(runningDeploymentsByAppFuture)) else None
val lastTaskFailureOpt: Option[TaskFailure] = if (embeds.contains(AppInfo.Embed.LastTaskFailure)) await(appData.maybeLastTaskFailureFuture) else None
val enrichedTasksOpt: Option[Seq[EnrichedTask]] = if (embeds.contains(AppInfo.Embed.Tasks)) Some(await(appData.enrichedTasksFuture)) else None
val taskStatsOpt: Option[TaskStatsByVersion] = if (embeds.contains(AppInfo.Embed.TaskStats)) Some(await(appData.taskStatsFuture)) else None

val appInfo = AppInfo(
app = app,
maybeTasks = enrichedTasksOpt,
maybeCounts = taskCountsOpt,
maybeDeployments = runningDeploymentsByAppOpt.map(_.apply(app.id)),
maybeReadinessCheckResults = readinessChecksByAppOpt.map(_.apply(app.id)),
maybeLastTaskFailure = lastTaskFailureOpt,
maybeTaskStats = taskStatsOpt
)
appInfo
}

/**
Expand All @@ -104,15 +100,11 @@ class AppInfoBaseData(
private[this] class AppData(app: AppDefinition) {
lazy val now: Timestamp = clock.now()

lazy val instancesByIdFuture: Future[Map[Instance.Id, Instance]] = instancesByRunSpecFuture.map(_.specInstances(app.id)
.foldLeft(Map.newBuilder[Instance.Id, Instance]) { (result, instance) => result += instance.instanceId -> instance }
.result()
)

lazy val instancesFuture: Future[Seq[Instance]] = instancesByIdFuture.map(_.values.to[Seq])
lazy val instancesFuture: Future[Vector[Instance]] = instancesByRunSpecFuture
.map(_.specInstances(app.id).toVector)

lazy val healthByInstanceIdFuture: Future[Map[Instance.Id, Seq[Health]]] = {
log.debug(s"retrieving health counts for app [${app.id}]")
logger.debug(s"retrieving health counts for app [${app.id}]")
healthCheckManager.statuses(app.id)
}.recover {
case NonFatal(e) => throw new RuntimeException(s"while retrieving health counts for app [${app.id}]", e)
Expand All @@ -128,7 +120,7 @@ class AppInfoBaseData(
}

lazy val taskCountsFuture: Future[TaskCounts] = {
log.debug(s"calculating task counts for app [${app.id}]")
logger.debug(s"calculating task counts for app [${app.id}]")
for {
tasks <- tasksForStats
} yield TaskCounts(tasks)
Expand All @@ -137,14 +129,14 @@ class AppInfoBaseData(
}

lazy val taskStatsFuture: Future[TaskStatsByVersion] = {
log.debug(s"calculating task stats for app [${app.id}]")
logger.debug(s"calculating task stats for app [${app.id}]")
for {
tasks <- tasksForStats
} yield TaskStatsByVersion(app.versionInfo, tasks)
}

lazy val enrichedTasksFuture: Future[Seq[EnrichedTask]] = {
log.debug(s"assembling rich tasks for app [${app.id}]")
logger.debug(s"assembling rich tasks for app [${app.id}]")
def statusesToEnrichedTasks(instances: Seq[Instance], statuses: Map[Instance.Id, collection.Seq[Health]]): Seq[EnrichedTask] = {
instances.map { instance =>
EnrichedTask(instance, instance.appTask, statuses.getOrElse(instance.instanceId, Nil).to[Seq])
Expand All @@ -160,45 +152,44 @@ class AppInfoBaseData(
}

lazy val maybeLastTaskFailureFuture: Future[Option[TaskFailure]] = {
log.debug(s"retrieving last task failure for app [${app.id}]")
logger.debug(s"retrieving last task failure for app [${app.id}]")
taskFailureRepository.get(app.id)
}.recover {
case NonFatal(e) => throw new RuntimeException(s"while retrieving last task failure for app [${app.id}]", e)
}
}

@SuppressWarnings(Array("all")) // async/await
def podStatus(podDef: PodDefinition): Future[PodStatus] =
async { // linter:ignore UnnecessaryElseBranch
val now = clock.now().toOffsetDateTime
val instances = await(instancesByRunSpecFuture).specInstances(podDef.id)
val specByVersion: Map[Timestamp, Option[PodDefinition]] = await(Future.sequence(
// TODO(jdef) if repositories ever support a bulk-load interface, use it here
instances.map(_.runSpecVersion).distinct.map { version =>
groupManager.podVersion(podDef.id, version.toOffsetDateTime).map(version -> _)
}
)).toMap
val instanceStatus = instances.flatMap { inst => podInstanceStatus(inst)(specByVersion.apply) }
val statusSince = if (instanceStatus.isEmpty) now else instanceStatus.map(_.statusSince).max
val state = await(podState(podDef.instances, instanceStatus, isPodTerminating(podDef.id)))

// TODO(jdef) pods need termination history
PodStatus(
id = podDef.id.toString,
spec = Raml.toRaml(podDef),
instances = instanceStatus,
status = state,
statusSince = statusSince,
lastUpdated = now,
lastChanged = statusSince
)
}
def podStatus(podDef: PodDefinition): Future[PodStatus] = async { // linter:ignore UnnecessaryElseBranch
val now = clock.now().toOffsetDateTime
val instances = await(instancesByRunSpecFuture).specInstances(podDef.id)
val specByVersion: Map[Timestamp, Option[PodDefinition]] = await(Future.sequence(
// TODO(jdef) if repositories ever support a bulk-load interface, use it here
instances.map(_.runSpecVersion).distinct.map { version =>
groupManager.podVersion(podDef.id, version.toOffsetDateTime).map(version -> _)
}
)).toMap
val instanceStatus = instances.flatMap { inst => podInstanceStatus(inst)(specByVersion.apply) }
val statusSince = if (instanceStatus.isEmpty) now else instanceStatus.map(_.statusSince).max
val state = await(podState(podDef.instances, instanceStatus, isPodTerminating(podDef.id)))

// TODO(jdef) pods need termination history
PodStatus(
id = podDef.id.toString,
spec = Raml.toRaml(podDef),
instances = instanceStatus,
status = state,
statusSince = statusSince,
lastUpdated = now,
lastChanged = statusSince
)
}

def podInstanceStatus(instance: Instance)(f: Timestamp => Option[PodDefinition]): Option[PodInstanceStatus] = {
val maybePodSpec: Option[PodDefinition] = f(instance.runSpecVersion)

if (maybePodSpec.isEmpty)
log.warn(s"failed to generate pod instance status for instance ${instance.instanceId}, " +
logger.warn(s"failed to generate pod instance status for instance ${instance.instanceId}, " +
s"pod version ${instance.runSpecVersion} failed to load from persistent store")

maybePodSpec.map { pod => Raml.toRaml(pod -> instance) }
Expand Down Expand Up @@ -228,7 +219,3 @@ class AppInfoBaseData(
state
}
}

object AppInfoBaseData {
private val log = LoggerFactory.getLogger(getClass)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package mesosphere.marathon
package core.appinfo.impl

import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.appinfo.AppInfo.Embed
import mesosphere.marathon.core.appinfo._
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.pod.PodDefinition
import mesosphere.marathon.raml.PodStatus
import mesosphere.marathon.state._
import mesosphere.marathon.stream.Implicits._
import org.slf4j.LoggerFactory
import mesosphere.util.NamedExecutionContext

import scala.async.Async.{ async, await }
import scala.collection.immutable.Seq
Expand All @@ -17,15 +18,14 @@ import scala.concurrent.Future

private[appinfo] class DefaultInfoService(
groupManager: GroupManager,
newBaseData: () => AppInfoBaseData) extends AppInfoService with GroupInfoService with PodStatusService {
import mesosphere.marathon.core.async.ExecutionContexts.global
newBaseData: () => AppInfoBaseData) extends AppInfoService with GroupInfoService with PodStatusService with StrictLogging {

private[this] val log = LoggerFactory.getLogger(getClass)
implicit val ec = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "default-info-service")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this really has an impact if app info and the group manager have their own thread pools. I'd like to understand where we benefit from a fixed worker thread pool and where we should be using the fork join thread pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the scenario with one thread pool and it adds roughly 20% response time. Not that much but I think dividing them still makes sense - Dīvide et Imperā!


@SuppressWarnings(Array("all")) // async/await
override def selectPodStatus(id: PathId, selector: PodSelector): Future[Option[PodStatus]] =
async { // linter:ignore UnnecessaryElseBranch
log.debug(s"query for pod $id")
logger.debug(s"query for pod $id")
val maybePod = groupManager.pod(id)
maybePod.filter(selector.matches) match {
case Some(pod) => Some(await(newBaseData().podStatus(pod)))
Expand All @@ -34,7 +34,7 @@ private[appinfo] class DefaultInfoService(
}

override def selectApp(id: PathId, selector: AppSelector, embed: Set[AppInfo.Embed]): Future[Option[AppInfo]] = {
log.debug(s"queryForAppId $id")
logger.debug(s"queryForAppId $id")
groupManager.app(id) match {
case Some(app) if selector.matches(app) => newBaseData().appInfoFuture(app, embed).map(Some(_))
case None => Future.successful(None)
Expand All @@ -44,7 +44,7 @@ private[appinfo] class DefaultInfoService(
@SuppressWarnings(Array("all")) // async/await
override def selectAppsBy(selector: AppSelector, embed: Set[AppInfo.Embed]): Future[Seq[AppInfo]] =
async { // linter:ignore UnnecessaryElseBranch
log.debug("queryAll")
logger.debug("queryAll")
val rootGroup = groupManager.rootGroup()
val selectedApps: IndexedSeq[AppDefinition] = rootGroup.transitiveApps.filterAs(selector.matches)(collection.breakOut)
val infos = await(resolveAppInfos(selectedApps, embed))
Expand All @@ -56,7 +56,7 @@ private[appinfo] class DefaultInfoService(
embed: Set[AppInfo.Embed]): Future[Seq[AppInfo]] =

async { // linter:ignore UnnecessaryElseBranch
log.debug(s"queryAllInGroup $groupId")
logger.debug(s"queryAllInGroup $groupId")
val maybeGroup: Option[Group] = groupManager.group(groupId)
val maybeApps: Option[IndexedSeq[AppDefinition]] =
maybeGroup.map(_.transitiveApps.filterAs(selector.matches)(collection.breakOut))
Expand Down Expand Up @@ -164,7 +164,7 @@ private[appinfo] class DefaultInfoService(

private[this] def resolvePodInfos(
specs: Seq[RunSpec],
baseData: AppInfoBaseData = newBaseData()): Future[Seq[PodStatus]] = Future.sequence(specs.collect {
baseData: AppInfoBaseData): Future[Seq[PodStatus]] = Future.sequence(specs.collect {
case pod: PodDefinition =>
baseData.podStatus(pod)
})
Expand Down
Loading