Skip to content

Commit 72ec7a6

Browse files
WeiWendabeliefer
authored andcommitted
[CORE][TEST] Migrate some XSQL test classes to github #43
**What changes were proposed in this pull request?** Add `org.apache.spark.sql.xsql.test.SharedSparkSession` to provide a local `SparkSession` which `enableXSQLSupport` by default. So that user can test `spark.sql()` as long as they mix in `SharedSparkSession` in their `ScalaTest Suite`.
1 parent 2ae2104 commit 72ec7a6

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.xsql.test
19+
20+
import java.io.{File, FileInputStream}
21+
import java.net.URLDecoder
22+
import java.util.Properties
23+
24+
import scala.collection.JavaConverters._
25+
26+
import org.scalatest.Suite
27+
28+
import org.apache.spark.SparkConf
29+
import org.apache.spark.sql.DataFrame
30+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
31+
import org.apache.spark.sql.test.TestSparkSession
32+
import org.apache.spark.sql.xsql.XSQLSessionCatalog
33+
import org.apache.spark.sql.xsql.execution.command.{PushDownQueryCommand, ScanTableCommand}
34+
import org.apache.spark.sql.xsql.util.Utils
35+
36+
/**
37+
* Helper trait for SQL test suites where all tests share a single [[XSQLTestSparkSession]].
38+
*/
39+
trait SharedSparkSession extends org.apache.spark.sql.test.SharedSparkSession { self: Suite =>
40+
41+
def getResourceFile(path: String): File = {
42+
new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile)
43+
}
44+
45+
/**
46+
* Similar to [[org.apache.spark.sql.test.SQLTestUtilsBase]]'s activateDatabase, but have
47+
* `ds` parameter in addition. Activates database `ds`.`db` before executing `f`, then switches
48+
* back to previous database after `f` returns.
49+
*/
50+
protected def activateDatabase(ds: String, db: String)(f: => Unit): Unit = {
51+
val catalog = spark.sessionState.catalog.asInstanceOf[XSQLSessionCatalog]
52+
val catalogDB = catalog.getCurrentCatalogDatabase
53+
val currentDS = catalogDB.get.dataSourceName
54+
val currentDB = catalogDB.get.name
55+
catalog.setCurrentDatabase(ds, db)
56+
try f
57+
finally catalog.setCurrentDatabase(currentDS, currentDB)
58+
}
59+
60+
/**
61+
* Check whether the [[LogicalPlan]] of [[DataFrame]] contains pushdown operation as subquery
62+
* for fast.
63+
*/
64+
def assertSubQueryPushDown(df: DataFrame): Unit = {
65+
val analyzed = df.queryExecution.analyzed
66+
val expressions = analyzed.flatMap(_.subqueries)
67+
assert(
68+
expressions
69+
.exists(e => e.isInstanceOf[PushDownQueryCommand] || e.isInstanceOf[ScanTableCommand]))
70+
}
71+
72+
/**
73+
* Check whether the [[LogicalPlan]] of [[DataFrame]] contains pushdown operation for fast.
74+
*/
75+
def assertContainsPushDown(df: DataFrame, num: Int = 1): Unit = {
76+
val analyzed = df.queryExecution.analyzed
77+
val pds = analyzed.collect {
78+
case e: ScanTableCommand =>
79+
e
80+
case e: PushDownQueryCommand =>
81+
e
82+
}
83+
assert(pds.size == num)
84+
}
85+
86+
/**
87+
* Check whether the [[LogicalPlan]] of [[DataFrame]] is wholly pushdown operation for fast.
88+
*/
89+
def assertPushDown(df: DataFrame): Unit = {
90+
val analyzed = df.queryExecution.analyzed
91+
assert(analyzed.isInstanceOf[ScanTableCommand] || analyzed.isInstanceOf[PushDownQueryCommand])
92+
}
93+
94+
/**
95+
* Check whether the result of [[DataFrame]] is non-empty for fast.
96+
*/
97+
def assertResultNonEmpty(df: DataFrame): Unit = {
98+
df.show()
99+
assert(df.count() > 0)
100+
}
101+
102+
/**
103+
* Check whether the result of [[DataFrame]] is empty for fast.
104+
*/
105+
def assertResultEmpty(df: DataFrame): Unit = {
106+
df.show()
107+
assert(df.count() == 0)
108+
}
109+
110+
/**
111+
* Start a local[2] [[org.apache.spark.sql.SparkSession]] with XSQL support, and load
112+
* configuration from `xsql.conf` found in classpath.
113+
*/
114+
override protected def createSparkSession: TestSparkSession = {
115+
val properties = new Properties()
116+
val path = Utils.getPropertiesFile(file = "xsql.conf")
117+
properties.load(new FileInputStream(URLDecoder.decode(path, "utf-8")))
118+
val conf = new SparkConf()
119+
val propertiesMap = properties
120+
.stringPropertyNames()
121+
.asScala
122+
.map(key => (key, properties.getProperty(key)))
123+
.toMap
124+
for ((key, value) <- propertiesMap if key.startsWith("spark.")) {
125+
conf.set(key, value, false)
126+
}
127+
conf.set("spark.sql.caseSensitive", "true")
128+
new XSQLTestSparkSession(conf)
129+
}
130+
131+
/**
132+
* Stop the underlying [[org.apache.spark.SparkContext]], if any.
133+
*/
134+
protected override def afterAll(): Unit = {
135+
if (spark != null) {
136+
spark.stop()
137+
}
138+
}
139+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.xsql.test
19+
20+
import org.apache.spark.{SparkConf, SparkContext}
21+
import org.apache.spark.sql.internal.SessionState
22+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
23+
import org.apache.spark.sql.test.TestSparkSession
24+
import org.apache.spark.sql.xsql.XSQLSessionStateBuilder
25+
26+
/**
27+
* A special `SparkSession` prepared for XSQL testing.
28+
*/
29+
class XSQLTestSparkSession(sc: SparkContext) extends TestSparkSession(sc) { self =>
30+
def this(sparkConf: SparkConf) {
31+
this(
32+
new SparkContext(
33+
"local[2]",
34+
"test-sql-context",
35+
sparkConf.set("spark.sql.testkey", "true").set(CATALOG_IMPLEMENTATION, "xsql")))
36+
}
37+
38+
@transient
39+
override lazy val sessionState: SessionState = {
40+
new XSQLSessionStateBuilder(this, None).build()
41+
}
42+
}

0 commit comments

Comments
 (0)