Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Speed up v2/pods/::status
Browse files Browse the repository at this point in the history
Summary:
This replace the Akka stream with a simple `Future.sequence` and uses
the same `AppInfoBaseData` instance for each pod. This we utilize the
cached instances and not limit ourselves to eight parallel threads. To
not overload Marathon we use the fixed size thread pool from #5973.

However, even with #5973 we might allocate a lot of futures and
runnables.

JIRA issues: MARATHON-8563
  • Loading branch information
jeschkies committed Feb 8, 2019
1 parent 9e4a5a3 commit 167889a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
9 changes: 3 additions & 6 deletions src/main/scala/mesosphere/marathon/api/v2/PodsResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import javax.ws.rs.core.{Context, MediaType, Response}

import akka.event.EventStream
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.scaladsl.Sink
import com.wix.accord.Validator
import mesosphere.marathon.api.v2.validation.PodsValidation
import mesosphere.marathon.api.v2.Validation.validateOrThrow
Expand All @@ -21,7 +21,6 @@ import mesosphere.marathon.core.appinfo.{PodSelector, PodStatusService, Selector
import mesosphere.marathon.core.event._
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.pod.{PodDefinition, PodManager}
import mesosphere.marathon.core.storage.repository.RepositoryConstants
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.raml.{Pod, Raml}
import mesosphere.marathon.state.{PathId, Timestamp, VersionInfo}
Expand Down Expand Up @@ -251,10 +250,8 @@ class PodsResource @Inject() (
@GET
@Path("::status")
def allStatus(@Context req: HttpServletRequest): Response = authenticated(req) { implicit identity =>
val future = Source(podSystem.ids()).mapAsync(RepositoryConstants.maxConcurrency) { id =>
podStatusService.selectPodStatus(id, authzSelector)
}.filter(_.isDefined).map(_.get).runWith(Sink.seq)

val ids = podSystem.ids()
val future = podStatusService.selectPodStatuses(ids, authzSelector)
ok(Json.stringify(Json.toJson(result(future))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ trait PodStatusService {
* @return the status of the pod at the given path, if such a pod exists
*/
def selectPodStatus(id: PathId, selector: PodSelector = Selector.all): Future[Option[PodStatus]]

/**
* @return the statuses of the pods at the given paths, if the pod exists
*/
def selectPodStatuses(ids: Set[PathId], selector: PodSelector = Selector.all): Future[Seq[PodStatus]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ private[appinfo] class DefaultInfoService(
}
}

override def selectPodStatuses(ids: Set[PathId], selector: PodSelector): Future[Seq[PodStatus]] = {
val baseData = newBaseData()

val pods = ids.toVector.flatMap(groupManager.pod(_)).filter(selector.matches)
resolvePodInfos(pods, baseData)
}

override def selectApp(id: PathId, selector: AppSelector, embed: Set[AppInfo.Embed]): Future[Option[AppInfo]] = {
logger.debug(s"queryForAppId $id")
groupManager.app(id) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ class PodsResourceTest extends AkkaUnitTest with Mockito with JerseyTest {
val f = Fixture()

podSystem.ids().returns(Set(PathId("mypod")))
podStatusService.selectPodStatus(any, any).returns(Future(Some(PodStatus("mypod", Pod("mypod", containers = Seq.empty), PodState.Stable, statusSince = OffsetDateTime.now(), lastUpdated = OffsetDateTime.now(), lastChanged = OffsetDateTime.now()))))
podStatusService.selectPodStatuses(any, any).returns(Future(Seq(PodStatus("mypod", Pod("mypod", containers = Seq.empty), PodState.Stable, statusSince = OffsetDateTime.now(), lastUpdated = OffsetDateTime.now(), lastChanged = OffsetDateTime.now()))))

val response = f.podsResource.allStatus(f.auth.request)

Expand Down

0 comments on commit 167889a

Please sign in to comment.