-
Notifications
You must be signed in to change notification settings - Fork 840
Optimize v2/groups
request performance
#5973
Changes from 6 commits
035e3ad
93df16c
896aabf
d8338be
658e102
f3b44f4
2737722
157bf7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,12 @@ package core.appinfo | |
import java.time.Clock | ||
|
||
import com.google.inject.Inject | ||
import mesosphere.marathon.MarathonSchedulerService | ||
import mesosphere.marathon.core.appinfo.impl.{ AppInfoBaseData, DefaultInfoService } | ||
import mesosphere.marathon.core.group.GroupManager | ||
import mesosphere.marathon.core.health.HealthCheckManager | ||
import mesosphere.marathon.core.task.tracker.InstanceTracker | ||
import mesosphere.marathon.storage.repository.TaskFailureRepository | ||
import mesosphere.util.NamedExecutionContext | ||
|
||
/** | ||
* Provides a service to query information related to apps. | ||
|
@@ -21,8 +21,11 @@ class AppInfoModule @Inject() ( | |
healthCheckManager: HealthCheckManager, | ||
marathonSchedulerService: MarathonSchedulerService, | ||
taskFailureRepository: TaskFailureRepository) { | ||
|
||
val ec = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "app-info-module") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
could we make this more readable pls? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are a few akka/scala specific and accepted "short names" e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this thread pool be ideally related to the number of threads available? Should we make it configurable? (even if via an undocumented environment variable)? It seems like we should ideally create the execution context in the module and share it with the pods status controller? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
private[this] val appInfoBaseData = () => new AppInfoBaseData( | ||
clock, taskTracker, healthCheckManager, marathonSchedulerService, taskFailureRepository, groupManager) | ||
clock, taskTracker, healthCheckManager, marathonSchedulerService, taskFailureRepository, groupManager)(ec) | ||
|
||
def appInfoService: AppInfoService = infoService | ||
def groupInfoService: GroupInfoService = infoService | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,15 @@ | ||
package mesosphere.marathon | ||
package core.appinfo.impl | ||
|
||
import com.typesafe.scalalogging.StrictLogging | ||
import mesosphere.marathon.core.appinfo.AppInfo.Embed | ||
import mesosphere.marathon.core.appinfo._ | ||
import mesosphere.marathon.core.group.GroupManager | ||
import mesosphere.marathon.core.pod.PodDefinition | ||
import mesosphere.marathon.raml.PodStatus | ||
import mesosphere.marathon.state._ | ||
import mesosphere.marathon.stream.Implicits._ | ||
import org.slf4j.LoggerFactory | ||
import mesosphere.util.NamedExecutionContext | ||
|
||
import scala.async.Async.{ async, await } | ||
import scala.collection.immutable.Seq | ||
|
@@ -17,15 +18,14 @@ import scala.concurrent.Future | |
|
||
private[appinfo] class DefaultInfoService( | ||
groupManager: GroupManager, | ||
newBaseData: () => AppInfoBaseData) extends AppInfoService with GroupInfoService with PodStatusService { | ||
import mesosphere.marathon.core.async.ExecutionContexts.global | ||
newBaseData: () => AppInfoBaseData) extends AppInfoService with GroupInfoService with PodStatusService with StrictLogging { | ||
|
||
private[this] val log = LoggerFactory.getLogger(getClass) | ||
implicit val ec = NamedExecutionContext.fixedThreadPoolExecutionContext(8, "default-info-service") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this really has an impact if app info and the group manager have their own thread pools. I'd like to understand where we benefit from a fixed worker thread pool and where we should be using the fork join thread pool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested the scenario with one thread pool and it adds roughly 20% response time. Not that much but I think dividing them still makes sense - Dīvide et Imperā! |
||
|
||
@SuppressWarnings(Array("all")) // async/await | ||
override def selectPodStatus(id: PathId, selector: PodSelector): Future[Option[PodStatus]] = | ||
async { // linter:ignore UnnecessaryElseBranch | ||
log.debug(s"query for pod $id") | ||
logger.debug(s"query for pod $id") | ||
val maybePod = groupManager.pod(id) | ||
maybePod.filter(selector.matches) match { | ||
case Some(pod) => Some(await(newBaseData().podStatus(pod))) | ||
|
@@ -34,7 +34,7 @@ private[appinfo] class DefaultInfoService( | |
} | ||
|
||
override def selectApp(id: PathId, selector: AppSelector, embed: Set[AppInfo.Embed]): Future[Option[AppInfo]] = { | ||
log.debug(s"queryForAppId $id") | ||
logger.debug(s"queryForAppId $id") | ||
groupManager.app(id) match { | ||
case Some(app) if selector.matches(app) => newBaseData().appInfoFuture(app, embed).map(Some(_)) | ||
case None => Future.successful(None) | ||
|
@@ -44,7 +44,7 @@ private[appinfo] class DefaultInfoService( | |
@SuppressWarnings(Array("all")) // async/await | ||
override def selectAppsBy(selector: AppSelector, embed: Set[AppInfo.Embed]): Future[Seq[AppInfo]] = | ||
async { // linter:ignore UnnecessaryElseBranch | ||
log.debug("queryAll") | ||
logger.debug("queryAll") | ||
val rootGroup = groupManager.rootGroup() | ||
val selectedApps: IndexedSeq[AppDefinition] = rootGroup.transitiveApps.filterAs(selector.matches)(collection.breakOut) | ||
val infos = await(resolveAppInfos(selectedApps, embed)) | ||
|
@@ -56,7 +56,7 @@ private[appinfo] class DefaultInfoService( | |
embed: Set[AppInfo.Embed]): Future[Seq[AppInfo]] = | ||
|
||
async { // linter:ignore UnnecessaryElseBranch | ||
log.debug(s"queryAllInGroup $groupId") | ||
logger.debug(s"queryAllInGroup $groupId") | ||
val maybeGroup: Option[Group] = groupManager.group(groupId) | ||
val maybeApps: Option[IndexedSeq[AppDefinition]] = | ||
maybeGroup.map(_.transitiveApps.filterAs(selector.matches)(collection.breakOut)) | ||
|
@@ -164,7 +164,7 @@ private[appinfo] class DefaultInfoService( | |
|
||
private[this] def resolvePodInfos( | ||
specs: Seq[RunSpec], | ||
baseData: AppInfoBaseData = newBaseData()): Future[Seq[PodStatus]] = Future.sequence(specs.collect { | ||
baseData: AppInfoBaseData): Future[Seq[PodStatus]] = Future.sequence(specs.collect { | ||
case pod: PodDefinition => | ||
baseData.podStatus(pod) | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 8 threads? Did we measure that 8 is a meaningful number? I'm not trying to overcomplicate things, but it seems to me that making these configurable would allow us to also easily run different tests to verify that the number makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 seems like a low enough number that it shouldn't matter. More important is the fact that we separate the thread pools from each other.