Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import scala.collection.JavaConverters._

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka

/**
* Class to conveniently update Kafka config params, while logging the changes
*/
private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): this.type = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): this.type = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@ import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -485,7 +481,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
}

def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] =
ConfigUpdater("source", specifiedKafkaParams)
KafkaConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand All @@ -508,7 +504,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
def kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object] =
ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand Down Expand Up @@ -539,48 +535,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}

/** Class to conveniently update Kafka config params, while logging the changes */
private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): ConfigUpdater = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}

private[kafka010] def kafkaParamsForProducer(
parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
Expand All @@ -598,7 +552,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {

val specifiedKafkaParams = convertToSpecifiedParams(parameters)

ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setAuthenticationConfigIfNeeded()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config._

class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val testModule = "testModule"
private val testKey = "testKey"
private val testValue = "testValue"
private val otherTestValue = "otherTestValue"

test("set should always set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.set(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset without existing key should set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset with existing key should not set value") {
val params = Map[String, String](testKey -> testValue)

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, otherTestValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setAuthenticationConfigIfNeeded with global security should not set values") {
val params = Map.empty[String, String]
setGlobalKafkaClientConfig()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}

test("setAuthenticationConfigIfNeeded with token should set values") {
val params = Map.empty[String, String]
setSparkEnv(Map.empty)
addTokenToUGI()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 2)
assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
Kafka.TOKEN_SASL_MECHANISM.defaultValueString)
}

test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") {
val params = Map.empty[String, String]
setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID"))
addTokenToUGI()

val e = intercept[IllegalArgumentException] {
KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()
}

assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism."))
}

test("setAuthenticationConfigIfNeeded without security should not set values") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. So, setAuthenticationConfigIfNeeded is currently no-op in this case intentionally. Is it better to raise an IllegalArgumentException with some directional error message about requirements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems better to me too to let setAuthenticationConfigIfNeeded only works properly with security config. (need to remove IfNeeded then) It would fail fast and gives us the chance to provide proper guide, maybe missing configuration for authentication.

But no strong opinion since this looks like providing convenience vs being strict to help end users to find possible missing point, and both look reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the original implementation which was asked to change like this on other PRs (and I agree with it).
The reasoning behind was to keep fluent API and not having something like this(which is hardly testable):

    val updater = KafkaConfigUpdater("executor", specifiedKafkaParams)
      .set(...)
      .set(...)

    if (!globalSecurity && tokenAvailable)
      updater.setAuthenticationConfig(...)
    else if (somethingNew)
      updater.set(...)

    updater.build()

There is no flag like kafka.securityEnabled and Kafka can't tell it either.

Is it better to raise an IllegalArgumentException with some directional error message about requirements?

Security parameters shouldn't always set, for example:

  • If unsecure
  • If user provided JVM global JAAS configuration (this provide total freedom to users)

It would fail fast and gives us the chance to provide proper guide

  • No security => nothing to check and do
  • With global JAAS => it's not possible to parse JAAS config and suggest changes
  • With DT => The user warned in advance to provide SCRAM related sasl.mechanism because the related Kafka error message is cryptic a bit.

I personally don't see more possibilities to provide convenience for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the case end users intend to set security so call setAuthenticationConfig* in KafkaConfigUpdater but they missed setting up security config. I guess error message will be provided in Kafka side but it might be cryptic so seeking the possibility to provide better message.
No strong opinion and either is fine because end users can be indicated in any way. Just for clarifying my opinion.

val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}
import javax.security.auth.login.{AppConfigurationEntry, Configuration}

import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.mockito.Mockito.{doReturn, mock}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier

/**
* This is a trait which provides functionalities for Kafka delegation token related test suites.
*/
trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
self: SparkFunSuite =>

protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString

private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
"DummyModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
ju.Collections.emptyMap[String, Object]()
)

override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
if (name.equals("KafkaClient")) {
Array(entry)
} else {
null
}
}
}

override def afterEach(): Unit = {
try {
Configuration.setConfiguration(null)
UserGroupInformation.setLoginUser(null)
SparkEnv.set(null)
} finally {
super.afterEach()
}
}

protected def setGlobalKafkaClientConfig(): Unit = {
Configuration.setConfiguration(new KafkaJaasConfiguration)
}

protected def addTokenToUGI(): Unit = {
val token = new Token[KafkaDelegationTokenIdentifier](
tokenId.getBytes,
tokenPassword.getBytes,
KafkaTokenUtil.TOKEN_KIND,
KafkaTokenUtil.TOKEN_SERVICE
)
val creds = new Credentials()
creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
val conf = new SparkConf().setAll(settings)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
}
}
Loading