From 3c30c4aae5ea5c210967ae30c52a68544db2d048 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 24 Apr 2020 17:38:56 +0200 Subject: [PATCH 1/4] [SPARK-31337][SQL]Support MS SQL Kerberos login in JDBC connector --- external/docker-integration-tests/pom.xml | 1 - .../jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml | 6 ++ sql/core/pom.xml | 5 + .../jdbc/connection/ConnectionProvider.scala | 4 + .../connection/MSSQLConnectionProvider.scala | 94 +++++++++++++++++++ .../MariaDBConnectionProvider.scala | 2 +- .../MSSQLConnectionProviderSuite.scala | 51 ++++++++++ 8 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 7f9e92f58516..298e3d36c145 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -166,7 +166,6 @@ com.microsoft.sqlserver mssql-jdbc - 7.2.1.jre8 test diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 42d64873c44d..6c633af1fde8 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.tags.DockerTest @DockerTest class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { - override val imageName = "mcr.microsoft.com/mssql/server:2017-GA-ubuntu" + override val imageName = "mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04" override val env = Map( "SA_PASSWORD" -> "Sapass123", "ACCEPT_EULA" -> "Y" diff --git a/pom.xml b/pom.xml index fd4cebcd3731..e98bcc033f5f 100644 --- a/pom.xml +++ b/pom.xml @@ -970,6 +970,12 @@ 11.5.0.0 test + + com.microsoft.sqlserver + mssql-jdbc + 8.2.2.jre8 + test + org.apache.curator curator-recipes diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 7c5fcba9c213..47f11f6e593f 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -145,6 +145,11 @@ jcc test + + com.microsoft.sqlserver + mssql-jdbc + test + org.apache.parquet parquet-avro diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 73e73e59be57..6c310ced3788 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -60,6 +60,10 @@ private[jdbc] object ConnectionProvider extends Logging { logDebug("DB2 connection provider found") new DB2ConnectionProvider(driver, options) + case MSSQLConnectionProvider.driverClass => + logDebug("MS SQL connection provider found") + new MSSQLConnectionProvider(driver, options) + case _ => throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + "Kerberos authentication") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala new file mode 100644 index 000000000000..6d1f8f0e3c3f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +import java.security.PrivilegedExceptionAction +import java.sql.{Connection, Driver} +import java.util.Properties + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + +private[sql] class MSSQLConnectionProvider( + driver: Driver, + options: JDBCOptions, + parserMethod: String = "parseAndMergeProperties" + ) extends SecureConnectionProvider(driver, options) { + override val appEntry: String = { + val configName = "jaasConfigurationName" + val appEntryDefault = "SQLJDBCDriver" + + val parseURL = try { + val m = driver.getClass.getDeclaredMethod(parserMethod, classOf[String], classOf[Properties]) + m.setAccessible(true) + Some(m) + } catch { + case _: NoSuchMethodException => None + } + + parseURL match { + case Some(m) => + logDebug("Property parser method found, using it") + m.invoke(driver, options.url, null).asInstanceOf[Properties] + .getProperty(configName, appEntryDefault) + + case None => + logDebug("Property parser method not found, using custom parsing mechanism") + options.url.split(';').map(_.split('=')) + .find(kv => kv.length == 2 && kv(0) == configName) + .getOrElse(Array(configName, appEntryDefault))(1) + } + } + + override def getConnection(): Connection = { + setAuthenticationConfigIfNeeded() + UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + MSSQLConnectionProvider.super.getConnection() + } + } + ) + } + + override def getAdditionalProperties(): Properties = { + val result = new Properties() + result.put("integratedSecurity", "true") + result.put("authenticationScheme", "JavaKerberos") + result + } + + override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry() + /** + * Couple of things to mention here (v8.2.2 client): + * 1. MS SQL supports JAAS application name configuration + * 2. MS SQL sets a default JAAS config if "java.security.auth.login.config" is not set + */ + val entryUsesKeytab = configEntry != null && + configEntry.exists(_.getOptions().get("useKeyTab") == "true") + if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { + setAuthenticationConfig(parent) + } + } +} + +private[sql] object MSSQLConnectionProvider { + val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala index 8e3381077cbb..589f13cf6ad5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala @@ -30,7 +30,7 @@ private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptio override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { val (parent, configEntry) = getConfigWithAppEntry() /** - * Couple of things to mention here: + * Couple of things to mention here (v2.5.4 client): * 1. MariaDB doesn't support JAAS application name configuration * 2. MariaDB sets a default JAAS config if "java.security.auth.login.config" is not set */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala new file mode 100644 index 000000000000..c1f2ea4e4f08 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { + test("setAuthenticationConfigIfNeeded default parser must set authentication if not set") { + val driver = registerDriver(MSSQLConnectionProvider.driverClass) + val defaultProvider = new MSSQLConnectionProvider( + driver, options("jdbc:sqlserver://localhost/mssql")) + val customProvider = new MSSQLConnectionProvider( + driver, options(s"jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) + + testProviders(defaultProvider, customProvider) + } + + test("setAuthenticationConfigIfNeeded custom parser must set authentication if not set") { + val parserMethod = "IntentionallyNotExistingMethod" + val driver = registerDriver(MSSQLConnectionProvider.driverClass) + val defaultProvider = new MSSQLConnectionProvider( + driver, options("jdbc:sqlserver://localhost/mssql"), parserMethod) + val customProvider = new MSSQLConnectionProvider( + driver, + options(s"jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), + parserMethod) + + testProviders(defaultProvider, customProvider) + } + + private def testProviders( + defaultProvider: SecureConnectionProvider, + customProvider: SecureConnectionProvider) = { + assert(defaultProvider.appEntry !== customProvider.appEntry) + testSecureConnectionProvider(defaultProvider) + testSecureConnectionProvider(customProvider) + } +} From 88306f00cf6aea7636f432e3c9a04c0f44137770 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 25 May 2020 13:24:36 +0200 Subject: [PATCH 2/4] Fix deps --- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 3c3ce2dcdd6d..b5a10b5dba37 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -183,7 +183,6 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar -mssql-jdbc/6.2.1.jre7//mssql-jdbc-6.2.1.jre7.jar netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar nimbus-jose-jwt/4.41.1//nimbus-jose-jwt-4.41.1.jar objenesis/2.5.1//objenesis-2.5.1.jar From 16687d657c9641436ca16bb0f44fe95b4d20e9e4 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 25 May 2020 17:16:01 +0200 Subject: [PATCH 3/4] Review fix --- .../jdbc/connection/MSSQLConnectionProviderSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala index c1f2ea4e4f08..249f1e36347e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala @@ -23,7 +23,7 @@ class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { val defaultProvider = new MSSQLConnectionProvider( driver, options("jdbc:sqlserver://localhost/mssql")) val customProvider = new MSSQLConnectionProvider( - driver, options(s"jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) + driver, options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) testProviders(defaultProvider, customProvider) } @@ -35,7 +35,7 @@ class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { driver, options("jdbc:sqlserver://localhost/mssql"), parserMethod) val customProvider = new MSSQLConnectionProvider( driver, - options(s"jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), parserMethod) testProviders(defaultProvider, customProvider) From 0a718490ac5e8394674a5b8f64d9f8be382805fe Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 4 Jun 2020 13:03:35 +0200 Subject: [PATCH 4/4] Review fix --- .../datasources/jdbc/connection/MSSQLConnectionProvider.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala index 6d1f8f0e3c3f..2950aa9b4db9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -35,6 +35,8 @@ private[sql] class MSSQLConnectionProvider( val appEntryDefault = "SQLJDBCDriver" val parseURL = try { + // The default parser method signature is the following: + // private Properties parseAndMergeProperties(String Url, Properties suppliedProperties) val m = driver.getClass.getDeclaredMethod(parserMethod, classOf[String], classOf[Properties]) m.setAccessible(true) Some(m) @@ -69,6 +71,7 @@ private[sql] class MSSQLConnectionProvider( override def getAdditionalProperties(): Properties = { val result = new Properties() + // These props needed to reach internal kerberos authentication in the JDBC driver result.put("integratedSecurity", "true") result.put("authenticationScheme", "JavaKerberos") result