diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 7f9e92f58516..298e3d36c145 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -166,7 +166,6 @@ com.microsoft.sqlserver mssql-jdbc - 7.2.1.jre8 test diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 42d64873c44d..6c633af1fde8 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.tags.DockerTest @DockerTest class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { - override val imageName = "mcr.microsoft.com/mssql/server:2017-GA-ubuntu" + override val imageName = "mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04" override val env = Map( "SA_PASSWORD" -> "Sapass123", "ACCEPT_EULA" -> "Y" diff --git a/pom.xml b/pom.xml index deaf87f15539..1b225bde6774 100644 --- a/pom.xml +++ b/pom.xml @@ -970,6 +970,12 @@ 11.5.0.0 test + + com.microsoft.sqlserver + mssql-jdbc + 8.2.2.jre8 + test + org.apache.curator curator-recipes diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 7c5fcba9c213..47f11f6e593f 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -145,6 +145,11 @@ jcc test + + com.microsoft.sqlserver + mssql-jdbc + test + org.apache.parquet parquet-avro diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 73e73e59be57..6c310ced3788 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -60,6 +60,10 @@ private[jdbc] object ConnectionProvider extends Logging { logDebug("DB2 connection provider found") new DB2ConnectionProvider(driver, options) + case MSSQLConnectionProvider.driverClass => + logDebug("MS SQL connection provider found") + new MSSQLConnectionProvider(driver, options) + case _ => throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + "Kerberos authentication") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala new file mode 100644 index 000000000000..2950aa9b4db9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +import java.security.PrivilegedExceptionAction +import java.sql.{Connection, Driver} +import java.util.Properties + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + +private[sql] class MSSQLConnectionProvider( + driver: Driver, + options: JDBCOptions, + parserMethod: String = "parseAndMergeProperties" + ) extends SecureConnectionProvider(driver, options) { + override val appEntry: String = { + val configName = "jaasConfigurationName" + val appEntryDefault = "SQLJDBCDriver" + + val parseURL = try { + // The default parser method signature is the following: + // private Properties parseAndMergeProperties(String Url, Properties suppliedProperties) + val m = driver.getClass.getDeclaredMethod(parserMethod, classOf[String], classOf[Properties]) + m.setAccessible(true) + Some(m) + } catch { + case _: NoSuchMethodException => None + } + + parseURL match { + case Some(m) => + logDebug("Property parser method found, using it") + m.invoke(driver, options.url, null).asInstanceOf[Properties] + .getProperty(configName, appEntryDefault) + + case None => + logDebug("Property parser method not found, using custom parsing mechanism") + options.url.split(';').map(_.split('=')) + .find(kv => kv.length == 2 && kv(0) == configName) + .getOrElse(Array(configName, appEntryDefault))(1) + } + } + + override def getConnection(): Connection = { + setAuthenticationConfigIfNeeded() + UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + MSSQLConnectionProvider.super.getConnection() + } + } + ) + } + + override def getAdditionalProperties(): Properties = { + val result = new Properties() + // These props needed to reach internal kerberos authentication in the JDBC driver + result.put("integratedSecurity", "true") + result.put("authenticationScheme", "JavaKerberos") + result + } + + override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry() + /** + * Couple of things to mention here (v8.2.2 client): + * 1. MS SQL supports JAAS application name configuration + * 2. MS SQL sets a default JAAS config if "java.security.auth.login.config" is not set + */ + val entryUsesKeytab = configEntry != null && + configEntry.exists(_.getOptions().get("useKeyTab") == "true") + if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { + setAuthenticationConfig(parent) + } + } +} + +private[sql] object MSSQLConnectionProvider { + val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala index 8e3381077cbb..589f13cf6ad5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala @@ -30,7 +30,7 @@ private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptio override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { val (parent, configEntry) = getConfigWithAppEntry() /** - * Couple of things to mention here: + * Couple of things to mention here (v2.5.4 client): * 1. MariaDB doesn't support JAAS application name configuration * 2. MariaDB sets a default JAAS config if "java.security.auth.login.config" is not set */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala new file mode 100644 index 000000000000..249f1e36347e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { + test("setAuthenticationConfigIfNeeded default parser must set authentication if not set") { + val driver = registerDriver(MSSQLConnectionProvider.driverClass) + val defaultProvider = new MSSQLConnectionProvider( + driver, options("jdbc:sqlserver://localhost/mssql")) + val customProvider = new MSSQLConnectionProvider( + driver, options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) + + testProviders(defaultProvider, customProvider) + } + + test("setAuthenticationConfigIfNeeded custom parser must set authentication if not set") { + val parserMethod = "IntentionallyNotExistingMethod" + val driver = registerDriver(MSSQLConnectionProvider.driverClass) + val defaultProvider = new MSSQLConnectionProvider( + driver, options("jdbc:sqlserver://localhost/mssql"), parserMethod) + val customProvider = new MSSQLConnectionProvider( + driver, + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), + parserMethod) + + testProviders(defaultProvider, customProvider) + } + + private def testProviders( + defaultProvider: SecureConnectionProvider, + customProvider: SecureConnectionProvider) = { + assert(defaultProvider.appEntry !== customProvider.appEntry) + testSecureConnectionProvider(defaultProvider) + testSecureConnectionProvider(customProvider) + } +}