Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #1254 - Add flag to define Marathon PORT prefix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gkleiman committed Jul 15, 2015
1 parent 75d9b7e commit 3ce6aa9
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 36 deletions.
5 changes: 5 additions & 0 deletions docs/docs/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ The core functionality flags can be also set by environment variable `MARATHON_O
file containing the authentication secret
* `--marathon_store_timeout` (Optional. Default: 2000 (2 seconds)): Maximum time
in milliseconds, to wait for persistent storage operations to complete.
* <span class="label label-default">v0.10.0</span> `--env_vars_prefix` (Optional. Default: None):
The prefix to add to the name of task's environment variables created
automatically by Marathon.
_Note: This prefix will not be added to variables that are already prefixed,
such as `MESOS_TASK_ID` and `MARATHON_APP_ID`

## Tuning Flags for Offer Matching/Launching Tasks

Expand Down
9 changes: 9 additions & 0 deletions docs/docs/task-environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ addition to those set by Mesos.
Each element of the `env` field of each task's app definition describes
an environment variable.

_Note: No custom prefix will be added to these variable names, even if one is
specified via the `--env_vars_prefix` command line flag.

## Host Ports

Host ports are the ports at which the application is reachable on the host running
Expand All @@ -30,13 +33,19 @@ port, then `PORT` has the same value as `PORT0`.
The `PORTS` variable contains a comma-separated list of all assigned
host ports.

_Note: It is possible to specify a custom prefix for these variable
names through the `--env_vars_prefix` command line flag.

## App Metadata

- `MARATHON_APP_ID` contains the complete path of the corresponding app
definition. For example `/httpServer`, or `/webshop/db`.
- `MARATHON_APP_VERSION` contains the version of the app definition which
was used to start this task. For example `2015-04-02T09:37:00.596Z`.

_Note: No custom prefix will be added to these variable names, even if one is
specified via the `--env_vars_prefix` command line flag.

## Task Metadata

- `MESOS_TASK_ID` contains the ID of this task as used by Mesos. For example
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/mesosphere/marathon/MarathonConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ trait MarathonConf extends ScallopConf with ZookeeperConf with IterativeOfferMat
noshort = true
)

lazy val envVarsPrefix = opt[String]("env_vars_prefix",
descr = "Prefix to use for environment variables",
noshort = true
)

//Internal settings, that are not intended for external use
lazy val internalStoreBackend = opt[String]("internal_store_backend",
descr = "The backend storage system to use. One of zk, mesos_zk, mem",
Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/mesosphere/marathon/state/AppDefinition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ case class AppDefinition(
}

def toProto: Protos.ServiceDefinition = {
val commandInfo = TaskBuilder.commandInfo(this, None, None, Seq.empty)
val commandInfo = TaskBuilder.commandInfo(
app = this,
taskId = None,
host = None,
ports = Seq.empty,
envPrefix = None
)
val cpusResource = ScalarResource(Resource.CPUS, cpus)
val memResource = ScalarResource(Resource.MEM, mem)
val diskResource = ScalarResource(Resource.DISK, disk)
Expand Down
20 changes: 16 additions & 4 deletions src/main/scala/mesosphere/mesos/TaskBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ class TaskBuilder(app: AppDefinition,
containerWithPortMappings.toMesos()
}

val envPrefix: Option[String] = config.envVarsPrefix.get
executor match {
case CommandExecutor() =>
builder.setCommand(TaskBuilder.commandInfo(app, Some(taskId), host, ports))
builder.setCommand(TaskBuilder.commandInfo(app, Some(taskId), host, ports, envPrefix))
containerProto.foreach(builder.setContainer)

case PathExecutor(path) =>
val executorId = f"marathon-${taskId.getValue}" // Fresh executor
val executorPath = s"'$path'" // TODO: Really escape this.
val cmd = app.cmd orElse app.args.map(_ mkString " ") getOrElse ""
val shell = s"chmod ug+rx $executorPath && exec $executorPath $cmd"
val command = TaskBuilder.commandInfo(app, Some(taskId), host, ports).toBuilder.setValue(shell)
val command = TaskBuilder.commandInfo(app, Some(taskId), host, ports, envPrefix).toBuilder.setValue(shell)

val info = ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(executorId))
Expand Down Expand Up @@ -178,12 +179,16 @@ class TaskBuilder(app: AppDefinition,

object TaskBuilder {

def commandInfo(app: AppDefinition, taskId: Option[TaskID], host: Option[String], ports: Seq[Long]): CommandInfo = {
def commandInfo(app: AppDefinition,
taskId: Option[TaskID],
host: Option[String],
ports: Seq[Long],
envPrefix: Option[String]): CommandInfo = {
val containerPorts = for (pms <- app.portMappings) yield pms.map(_.containerPort)
val declaredPorts = containerPorts.getOrElse(app.ports)
val envMap: Map[String, String] =
taskContextEnv(app, taskId) ++
portsEnv(declaredPorts, ports) ++ host.map("HOST" -> _) ++
addPrefix(envPrefix, portsEnv(declaredPorts, ports) ++ host.map("HOST" -> _).toMap) ++
app.env

val builder = CommandInfo.newBuilder()
Expand Down Expand Up @@ -267,6 +272,13 @@ object TaskBuilder {
}
}

def addPrefix(envVarsPrefix: Option[String], env: Map[String, String]): Map[String, String] = {
envVarsPrefix match {
case Some(prefix) => env.map { case (key: String, value: String) => (prefix + key, value) }
case None => env
}
}

def taskContextEnv(app: AppDefinition, taskId: Option[TaskID]): Map[String, String] =
if (taskId.isEmpty)
Map.empty
Expand Down
4 changes: 3 additions & 1 deletion src/test/scala/mesosphere/marathon/MarathonTestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ trait MarathonTestHelper {
maxTasksPerOffer: Int = 1,
maxTasksPerOfferCycle: Int = 10,
mesosRole: Option[String] = None,
acceptedResourceRoles: Option[Set[String]] = None): MarathonConf = {
acceptedResourceRoles: Option[Set[String]] = None,
envVarsPrefix: Option[String] = None): MarathonConf = {

var args = Seq(
"--master", "127.0.0.1:5050",
Expand All @@ -40,6 +41,7 @@ trait MarathonTestHelper {

mesosRole.foreach(args ++= Seq("--mesos_role", _))
acceptedResourceRoles.foreach(v => args ++= Seq("--default_accepted_resource_roles", v.mkString(",")))
envVarsPrefix.foreach(args ++ Seq("--env_vars_prefix", _))
makeConfig(args: _*)
}

Expand Down
115 changes: 87 additions & 28 deletions src/test/scala/mesosphere/mesos/TaskBuilderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -565,14 +565,15 @@ class TaskBuilderTest extends MarathonSpec {
test("AppContextEnvironment") {
val command =
TaskBuilder.commandInfo(
AppDefinition(
app = AppDefinition(
id = "/test".toPath,
ports = Seq(8080, 8081),
version = Timestamp(0)
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap
Expand All @@ -588,7 +589,7 @@ class TaskBuilderTest extends MarathonSpec {

val command =
TaskBuilder.commandInfo(
AppDefinition(
app = AppDefinition(
id = "/test".toPath,
ports = Seq(8080, 8081),
version = Timestamp(0),
Expand All @@ -601,9 +602,10 @@ class TaskBuilderTest extends MarathonSpec {
"PORT_8081" -> "port8081"
)
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap
Expand All @@ -617,26 +619,77 @@ class TaskBuilderTest extends MarathonSpec {
}

test("PortsEnvWithOnlyPorts") {
val command =
TaskBuilder.commandInfo(
app = AppDefinition(
ports = Seq(8080, 8081)
),
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap

assert("1000" == env("PORT_8080"))
assert("1001" == env("PORT_8081"))
}

test("PortsEnvWithCustomPrefix") {
val command =
TaskBuilder.commandInfo(
AppDefinition(
ports = Seq(8080, 8081)
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
Seq(1000, 1001),
Some("CUSTOM_PREFIX_")
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap

assert("1000" == env("PORT_8080"))
assert("1001" == env("PORT_8081"))
assert("1000,1001" == env("CUSTOM_PREFIX_PORTS"))

assert("1000" == env("CUSTOM_PREFIX_PORT"))

assert("1000" == env("CUSTOM_PREFIX_PORT0"))
assert("1000" == env("CUSTOM_PREFIX_PORT_8080"))

assert("1001" == env("CUSTOM_PREFIX_PORT1"))
assert("1001" == env("CUSTOM_PREFIX_PORT_8081"))

assert("host.mega.corp" == env("CUSTOM_PREFIX_HOST"))

assert(Seq("HOST", "PORTS", "PORT0", "PORT1").forall(k => !env.contains(k)))
assert(Seq("MESOS_TASK_ID", "MARATHON_APP_ID", "MARATHON_APP_VERSION").forall(env.contains))
}

test("PortsEnvWithOnlyMappings") {
test("OnlyWhitelistedUnprefixedVariablesWithCustomPrefix") {
val command =
TaskBuilder.commandInfo(
AppDefinition(
ports = Seq(8080, 8081)
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001),
Some("P_")
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap

val nonPrefixedEnvVars = env.filterKeys(!_.startsWith("P_"))
val whiteList = Seq("MESOS_TASK_ID", "MARATHON_APP_ID", "MARATHON_APP_VERSION")

assert(nonPrefixedEnvVars.keySet.forall(whiteList.contains))
}

test("PortsEnvWithOnlyMappings") {
val command =
TaskBuilder.commandInfo(
app = AppDefinition(
container = Some(Container(
docker = Some(Docker(
network = Some(Network.BRIDGE),
Expand All @@ -647,9 +700,10 @@ class TaskBuilderTest extends MarathonSpec {
))
))
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap
Expand All @@ -661,7 +715,7 @@ class TaskBuilderTest extends MarathonSpec {
test("PortsEnvWithBothPortsAndMappings") {
val command =
TaskBuilder.commandInfo(
AppDefinition(
app = AppDefinition(
ports = Seq(22, 23),
container = Some(Container(
docker = Some(Docker(
Expand All @@ -673,9 +727,10 @@ class TaskBuilderTest extends MarathonSpec {
))
))
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)
val env: Map[String, String] =
command.getEnvironment.getVariablesList.asScala.toList.map(v => v.getName -> v.getValue).toMap
Expand All @@ -691,19 +746,19 @@ class TaskBuilderTest extends MarathonSpec {

val command =
TaskBuilder.commandInfo(
AppDefinition(
app = AppDefinition(
id = "testApp".toPath,
cpus = 1.0,
mem = 64.0,
disk = 1.0,
executor = "//cmd",
uris = Seq("http://www.example.com", "http://www.example.com/test.tgz",
"example.tar.gz"),
uris = Seq("http://www.example.com", "http://www.example.com/test.tgz", "example.tar.gz"),
ports = Seq(8080, 8081)
),
Some(TaskID("task-123")),
Some("host.mega.corp"),
Seq(1000, 1001)
taskId = Some(TaskID("task-123")),
host = Some("host.mega.corp"),
ports = Seq(1000, 1001),
envPrefix = None
)

val uriinfo1 = command.getUris(0)
Expand All @@ -716,15 +771,19 @@ class TaskBuilderTest extends MarathonSpec {
}

def buildIfMatches(
offer: Offer, app: AppDefinition, mesosRole: Option[String] = None,
acceptedResourceRoles: Option[Set[String]] = None) = {
offer: Offer,
app: AppDefinition,
mesosRole: Option[String] = None,
acceptedResourceRoles: Option[Set[String]] = None,
envVarsPrefix: Option[String] = None) = {
val taskTracker = mock[TaskTracker]

val builder = new TaskBuilder(app,
s => TaskID(s.toString), taskTracker,
defaultConfig(
mesosRole = mesosRole,
acceptedResourceRoles = acceptedResourceRoles))
acceptedResourceRoles = acceptedResourceRoles,
envVarsPrefix = envVarsPrefix))

builder.buildIfMatches(offer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package mesosphere.util.state
import mesosphere.marathon.StoreCommandFailedException
import mesosphere.marathon.integration.setup.IntegrationFunSuite
import org.scalatest.concurrent.ScalaFutures._
import org.scalatest.time.{Seconds, Span}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.time.{ Seconds, Span }
import org.scalatest.{ BeforeAndAfter, Matchers }

/**
* Common tests for all persistent stores.
Expand Down

0 comments on commit 3ce6aa9

Please sign in to comment.