diff --git a/build.gradle b/build.gradle index af56dd3b6107c..3d525dd994152 100644 --- a/build.gradle +++ b/build.gradle @@ -355,7 +355,7 @@ gradle.taskGraph.whenReady { graph -> onlyIf { ! Boolean.getBoolean('skip.tests') } jvmArgs '-ea', '-XX:+HeapDumpOnOutOfMemoryError','-XX:+UseConcMarkSweepGC', - '-XX:+UseParNewGC', '-XX:+CMSClassUnloadingEnabled', '-XX:MaxPermSize=512m' + '-XX:+UseParNewGC', '-XX:+CMSClassUnloadingEnabled' minHeapSize '4g' maxHeapSize '4g' // disable assertions for hive tests as in Spark's pom.xml because HiveCompatibilitySuite currently fails (SPARK-4814) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 5cd4935e225ee..d5d874e3375fb 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalLocale = Locale.getDefault private val originalColumnBatchSize = TestHive.conf.columnBatchSize private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning - private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc + private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled def testCases: Seq[(String, File)] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index e8fef0fa66ac8..aeb8c6e674e34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -30,7 +30,8 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo @@ -38,7 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ -import org.apache.spark.sql.internal.{SharedState, SQLConf} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState, SharedState} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -81,10 +83,10 @@ class TestHiveContext( } override def newSession(): TestHiveContext = { - new TestHiveContext(sparkSession.newSession()) + new TestHiveContext(sparkSession.newSession().asInstanceOf[TestHiveSparkSession]) } - override def sessionState: TestHiveSessionState = sparkSession.sessionState + override def sessionState: SessionState = sparkSession.sessionState def setCacheTables(c: Boolean): Unit = { sparkSession.setCacheTables(c) @@ -112,18 +114,21 @@ class TestHiveContext( * @param loadTestTables if true, load the test tables. They can only be loaded when running * in the JVM, i.e when calling from Python this flag has to be false. */ -private[hive] class TestHiveSparkSession( - @transient private val sc: SparkContext, - @transient private val existingSharedState: Option[SharedState], - private val loadTestTables: Boolean) - extends SparkSession(sc) with Logging { self => +trait TestHiveSparkSession extends SparkSession with Logging { self => - def this(sc: SparkContext, loadTestTables: Boolean) { - this( - sc, - existingSharedState = None, - loadTestTables) - } + protected def sc: SparkContext + + protected def existingSharedState: Option[SharedState] + + protected def loadTestTables: Boolean + + def hiveDefaultTableFilePath(name: TableIdentifier): String + + def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan + + def metadataHive: HiveClient + + def reset(): Unit { // set the metastore temporary configuration val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map( @@ -144,16 +149,6 @@ private[hive] class TestHiveSparkSession( existingSharedState.getOrElse(new SharedState(sc)) } - // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection - // logic based on the setting of CATALOG_IMPLEMENTATION. - @transient - override lazy val sessionState: TestHiveSessionState = - new TestHiveSessionState(self) - - override def newSession(): TestHiveSparkSession = { - new TestHiveSparkSession(sc, Some(sharedState), loadTestTables) - } - private var cacheTables: Boolean = false def setCacheTables(c: Boolean): Unit = { @@ -391,7 +386,7 @@ private[hive] class TestHiveSparkSession( hiveQTestUtilTables.foreach(registerTestTable) } - private val loadedTables = new collection.mutable.HashSet[String] + protected val loadedTables = new collection.mutable.HashSet[String] def loadTestTable(name: String) { if (!(loadedTables contains name)) { @@ -413,6 +408,38 @@ private[hive] class TestHiveSparkSession( * tests. */ protected val originalUDFs: JavaSet[String] = FunctionRegistry.getFunctionNames +} + +private[hive] class TestHiveSparkSessionImpl( + @transient protected val sc: SparkContext, + @transient protected val existingSharedState: Option[SharedState], + protected val loadTestTables: Boolean) + extends SparkSession(sc) with TestHiveSparkSession { + + def this(sc: SparkContext, loadTestTables: Boolean) { + this( + sc, + existingSharedState = None, + loadTestTables) + } + + // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection + // logic based on the setting of CATALOG_IMPLEMENTATION. + @transient + override lazy val sessionState: TestHiveSessionState = + new TestHiveSessionState(this) + + override def hiveDefaultTableFilePath(name: TableIdentifier): String = + sessionState.catalog.hiveDefaultTableFilePath(name) + + override def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = + sessionState.catalog.getCachedDataSourceTable(table) + + override def metadataHive: HiveClient = sessionState.metadataHive + + override def newSession(): TestHiveSparkSession = { + new TestHiveSparkSessionImpl(sc, Some(sharedState), loadTestTables) + } /** * Resets the test instance by deleting any tables that have been created. @@ -428,6 +455,7 @@ private[hive] class TestHiveSparkSession( } } + val sessionState = this.sessionState.asInstanceOf[TestHiveSessionState] sharedState.cacheManager.clearCache() loadedTables.clear() sessionState.catalog.clearTempTables() @@ -513,7 +541,7 @@ private[hive] class TestHiveSessionState( } -private[hive] object TestHiveContext { +object TestHiveContext { /** * A map used to store all confs that need to be overridden in sql/hive unit tests. @@ -524,15 +552,15 @@ private[hive] object TestHiveContext { SQLConf.SHUFFLE_PARTITIONS.key -> "5" ) - private def newSparkSession(sc: SparkContext, + private def newSparkSession(sparkContext: SparkContext, loadTestTables: Boolean): TestHiveSparkSession = { + val sc = HiveUtils.withHiveExternalCatalog(sparkContext) try { - Utils.classForName("org.apache.spark.sql.test.TestHiveSnappySession") + Utils.classForName("org.apache.spark.sql.hive.TestHiveSnappySession") .getConstructor(classOf[SparkContext], classOf[Boolean]) .newInstance(sc, Boolean.box(loadTestTables)).asInstanceOf[TestHiveSparkSession] } catch { - case _: Exception => - new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables) + case _: Exception => new TestHiveSparkSessionImpl(sc, loadTestTables) } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala index 9bf84ab1fb7a2..26cc435f92e91 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala @@ -24,8 +24,8 @@ import org.apache.spark.SparkFunSuite trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { - protected val spark: SparkSession = TestHive.sparkSession - protected val hiveContext: TestHiveContext = TestHive + protected lazy val spark: SparkSession = TestHive.sparkSession + protected lazy val hiveContext: TestHiveContext = TestHive protected override def afterAll(): Unit = { try { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 0a280b495215c..7832f841eee44 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -115,7 +115,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } @@ -147,7 +147,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } @@ -178,7 +178,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(IntegerType, StringType)) checkAnswer(table("t"), Row(1, "val_1")) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index deb40f0464016..eb7cdd6e618f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -379,7 +379,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv """.stripMargin) val expectedPath = - sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) + sparkSession.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf()) fs.delete(filesystemPath, true) @@ -491,7 +491,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("DROP TABLE savedJsonTable") intercept[AnalysisException] { read.json( - sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) + sparkSession.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -760,7 +760,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv serde = None, compressed = false, properties = Map( - "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + "path" -> sparkSession.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( DATASOURCE_PROVIDER -> "json", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3b9437da372c2..bc83faaae65a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -51,7 +51,7 @@ class HiveDDLSuite dbPath: Option[String] = None): Boolean = { val expectedTablePath = if (dbPath.isEmpty) { - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) + hiveContext.sparkSession.hiveDefaultTableFilePath(tableIdentifier) } else { new Path(new Path(dbPath.get), tableIdentifier.table).toString } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index fbb228e0873e3..9627fee78e931 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -455,7 +455,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. - sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { + sparkSession.getCachedDataSourceTable(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case LogicalRelation(_: HadoopFsRelation, _, _) => // OK case other => @@ -483,14 +483,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default")) // First, make sure the converted test_parquet is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -503,7 +503,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. sql( @@ -521,7 +521,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default")) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -530,14 +530,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -553,7 +553,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) sessionState.refreshTable("test_parquet_partitioned_cache_test") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sparkSession.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 2eafe18b85844..b65d849de4274 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -76,7 +76,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(hiveContext.sparkSession.hiveDefaultTableFilePath(identifier))) } /**