Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26420][k8s] Generate more unique IDs when creating k8s resource names. #23805

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ private[spark] object KubernetesConf {
}

def getResourceNamePrefix(appName: String): String = {
val launchTime = System.currentTimeMillis()
s"$appName-$launchTime"
val id = KubernetesUtils.uniqueID()
s"$appName-$id"
.trim
.toLowerCase(Locale.ROOT)
.replaceAll("\\s+", "-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package org.apache.spark.deploy.k8s

import java.io.File
import java.security.SecureRandom

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] object KubernetesUtils extends Logging {

private lazy val RNG = new SecureRandom()

/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
Expand Down Expand Up @@ -205,4 +209,23 @@ private[spark] object KubernetesUtils extends Logging {
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}

/**
* Generates a unique ID to be used as part of identifiers. The returned ID is a hex string
* of a 64-bit value containing the 40 LSBs from the current time + 24 random bits from a
* cryptographically strong RNG. (40 bits gives about 30 years worth of "unique" timestamps.)
*
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
* (not unique enough).
*/
def uniqueID(): String = {
val random = new Array[Byte](3)
synchronized {
RNG.nextBytes(random)
}

val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL)
Hex.encodeHexString(random) + time
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class DriverServiceFeatureStep(
kubernetesConf: KubernetesDriverConf,
clock: Clock = new SystemClock)
private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf)
extends KubernetesFeatureConfigStep with Logging {
import DriverServiceFeatureStep._

Expand All @@ -42,7 +39,7 @@ private[spark] class DriverServiceFeatureStep(
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = clock.getTimeMillis()
val randomServiceId = KubernetesUtils.uniqueID()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.internal.config._
import org.apache.spark.util.ManualClock

class DriverServiceFeatureStepSuite extends SparkFunSuite {

Expand Down Expand Up @@ -71,7 +70,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
verifySparkConfHostNames(additionalProps, expectedHostName)
assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName)
}

test("Ports should resolve to defaults in SparkConf and in the service.") {
Expand All @@ -92,25 +91,22 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
}

test("Long prefixes should switch to using a generated name.") {
val clock = new ManualClock()
clock.setTime(10000)
val sparkConf = new SparkConf(false)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val configurationStep = new DriverServiceFeatureStep(
KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS),
clock)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS)
val configurationStep = new DriverServiceFeatureStep(kconf)

val driverService = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
assert(driverService.getMetadata.getName === expectedServiceName)
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix))
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @vanzin . Can we check the actual pattern you introduced here? Maybe, we can use .startWith and .endWith.

val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"

Negative assertions(at line 106 and 109) looks insufficient to check the new logic. I know this test case is focusing only on the long name case. If something happens on the patterns, this test doesn't catch the change at all. Previously, we check the name pattern exactly in this test case.

Another test case is doing some check the post-fix pattern, but it's also partial because it used kconf.resourceNamePrefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something happens on the patterns, this test doesn't catch the change at all

I'm not sure what you mean by that.

It's not possible to have a positive test. The whole point of the change is to introduce randomness to the identifier being generated.

The tests set the resource prefix to something that does not start with spark-, which is the prefix that is used in the auto-generated path. So checking that the final identifier does not start with the requested prefix is enough to test that the change is occurring.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The randomness is not about the whole string. It's only about a part of it. Specifically, this PR changes this test case from spark-10000-driver-svc to spark-b805936936efbcd5-driver-svc.

The tests set the resource prefix to something that does not start with spark-, which is the prefix that is used in the auto-generated path.

Previously, the randomness is determined by time, 10000. Now, it has a better string, b805936936efbcd5.

What I asked is adding the following two lines as a positive test case.

     assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix))
+    assert(driverService.getMetadata.getName.startsWith("spark-"))
+    assert(driverService.getMetadata.getName.endsWith("driver-svc"))

The other stuff of this PR looks good. I like this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with you. So, I mentioned like I know this test case is focusing only on the long name case.

So checking that the final identifier does not start with the requested prefix is enough to test that the change is occurring.

There was no other test case to check this new pattern. So, I asked to keep some coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What new pattern? The generated service name? Why is it interesting to test it?

The test checks that the long name is not used. It doesn't matter what name is used as long as it's not the long name. And that's already achieved by the test. As you've said yourself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test to check uniqueness. But checking that the generated name follows a specific pattern is a rather pointless test.


val additionalProps = configurationStep.getAdditionalPodSystemProperties()
verifySparkConfHostNames(additionalProps, expectedHostName)
assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix))
}

test("Disallow bind address and driver host to be set explicitly.") {
Expand Down Expand Up @@ -156,10 +152,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
}

private def verifySparkConfHostNames(
driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
assert(driverSparkConf(
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName)
}
}