Skip to content

Commit 92eb16d

Browse files
author
Marcelo Vanzin
committed
[SPARK-26420][k8s] Generate more unique IDs when creating k8s resource names.
Using the current time as an ID is more prone to clashes than people generally realize, so try to make things a bit more unique without necessarily using a UUID, which would eat too much space in the names otherwise. The implemented approach uses some bits from the current time, plus some random bits, which should be more resistant to clashes.
1 parent b6c6875 commit 92eb16d

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ private[spark] object KubernetesConf {
194194
}
195195

196196
def getResourceNamePrefix(appName: String): String = {
197-
val launchTime = System.currentTimeMillis()
198-
s"$appName-$launchTime"
197+
val id = KubernetesUtils.uniqueID()
198+
s"$appName-$id"
199199
.trim
200200
.toLowerCase(Locale.ROOT)
201201
.replaceAll("\\s+", "-")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
package org.apache.spark.deploy.k8s
1818

1919
import java.io.File
20+
import java.security.SecureRandom
2021

2122
import scala.collection.JavaConverters._
2223

2324
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
2425
import io.fabric8.kubernetes.client.KubernetesClient
26+
import org.apache.commons.codec.binary.Hex
2527

2628
import org.apache.spark.{SparkConf, SparkException}
2729
import org.apache.spark.internal.Logging
2830
import org.apache.spark.util.Utils
2931

3032
private[spark] object KubernetesUtils extends Logging {
3133

34+
private lazy val RNG = new SecureRandom()
35+
3236
/**
3337
* Extract and parse Spark configuration properties with a given name prefix and
3438
* return the result as a Map. Keys must not have more than one value.
@@ -205,4 +209,23 @@ private[spark] object KubernetesUtils extends Logging {
205209
def formatTime(time: String): String = {
206210
if (time != null) time else "N/A"
207211
}
212+
213+
/**
214+
* Generates a unique ID to be used as part of identifiers. The returned ID is a hex string
215+
* of a 64-bit value containing the 40 LSBs from the current time + 24 random bits from a
216+
* cryptographically strong RNG. (40 bits gives about 30 years worth of "unique" timestamps.)
217+
*
218+
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
219+
* (not unique enough).
220+
*/
221+
def uniqueID(): String = {
222+
val random = new Array[Byte](3)
223+
synchronized {
224+
RNG.nextBytes(random)
225+
}
226+
227+
val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL)
228+
Hex.encodeHexString(random) + time
229+
}
230+
208231
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@ import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
2222

23-
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
23+
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
2424
import org.apache.spark.deploy.k8s.Constants._
2525
import org.apache.spark.internal.{config, Logging}
26-
import org.apache.spark.util.{Clock, SystemClock}
2726

28-
private[spark] class DriverServiceFeatureStep(
29-
kubernetesConf: KubernetesDriverConf,
30-
clock: Clock = new SystemClock)
27+
private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf)
3128
extends KubernetesFeatureConfigStep with Logging {
3229
import DriverServiceFeatureStep._
3330

@@ -42,7 +39,7 @@ private[spark] class DriverServiceFeatureStep(
4239
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
4340
preferredServiceName
4441
} else {
45-
val randomServiceId = clock.getTimeMillis()
42+
val randomServiceId = KubernetesUtils.uniqueID()
4643
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
4744
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
4845
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
2828
import org.apache.spark.internal.config._
29-
import org.apache.spark.util.ManualClock
3029

3130
class DriverServiceFeatureStepSuite extends SparkFunSuite {
3231

@@ -71,7 +70,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
7170
val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
7271
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
7372
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
74-
verifySparkConfHostNames(additionalProps, expectedHostName)
73+
assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName)
7574
}
7675

7776
test("Ports should resolve to defaults in SparkConf and in the service.") {
@@ -92,25 +91,22 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
9291
}
9392

9493
test("Long prefixes should switch to using a generated name.") {
95-
val clock = new ManualClock()
96-
clock.setTime(10000)
9794
val sparkConf = new SparkConf(false)
9895
.set(KUBERNETES_NAMESPACE, "my-namespace")
99-
val configurationStep = new DriverServiceFeatureStep(
100-
KubernetesTestConf.createDriverConf(
101-
sparkConf = sparkConf,
102-
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
103-
labels = DRIVER_LABELS),
104-
clock)
96+
val kconf = KubernetesTestConf.createDriverConf(
97+
sparkConf = sparkConf,
98+
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
99+
labels = DRIVER_LABELS)
100+
val configurationStep = new DriverServiceFeatureStep(kconf)
101+
105102
val driverService = configurationStep
106103
.getAdditionalKubernetesResources()
107104
.head
108105
.asInstanceOf[Service]
109-
val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
110-
assert(driverService.getMetadata.getName === expectedServiceName)
111-
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
106+
assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix))
107+
112108
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
113-
verifySparkConfHostNames(additionalProps, expectedHostName)
109+
assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix))
114110
}
115111

116112
test("Disallow bind address and driver host to be set explicitly.") {
@@ -156,10 +152,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
156152
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
157153
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
158154
}
159-
160-
private def verifySparkConfHostNames(
161-
driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
162-
assert(driverSparkConf(
163-
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName)
164-
}
165155
}

0 commit comments

Comments
 (0)