Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b2ceb0b
removed TestData use from CachedTableSuite
BenFradet Jun 21, 2015
551ebc5
removed TestData use from ColumnExpressionSuite
BenFradet Jul 1, 2015
38bfb74
removed TestData use from JoinSuite
BenFradet Jul 1, 2015
2aa8029
remove use of TestData in SQLQuerySuite
BenFradet Jul 1, 2015
2a06172
removed TestData use from DataFrameSuite
BenFradet Jul 1, 2015
bc47611
removed TestData use from InMemoryColumnarQuerySuite
BenFradet Jul 7, 2015
68c93d0
removed TestData use from PlammerSuite
BenFradet Jul 7, 2015
c6ff44f
removed TestData use from DebuggingSuite
BenFradet Jul 8, 2015
6a2423d
remove TestData use from JavaDataFrameSuite
BenFradet Jul 9, 2015
0501ec3
remove TestData use from DataFrameAggregateSuite
BenFradet Jul 9, 2015
5b6035d
remove TestData use from DataFrameFunctionsSuite
BenFradet Jul 9, 2015
b852cb5
remove TestData use from JsonSuite
BenFradet Jul 9, 2015
146f7f8
refactored ColumnExpressionSuite, rand still failing
BenFradet Jul 10, 2015
8ecac7b
removed TestData case classes in JoinSuite
BenFradet Jul 10, 2015
004eb5c
refactored some unit tests in SQLQuerySuite
BenFradet Jul 13, 2015
bb2cbe1
removed case classes from DataFrameSute
BenFradet Jul 13, 2015
4b6f0f6
removed case classes from InMemoryColumnarQuerySuite
BenFradet Jul 13, 2015
d117740
removed TestData use from PartitionBatchPruningSuite
BenFradet Jul 13, 2015
6441cf7
removed case classes from PlannerSuite
BenFradet Jul 13, 2015
575b937
removed case classes from DebuggingSuite
BenFradet Jul 13, 2015
13ad56b
removed case classes from DataFrameAggregateSuite
BenFradet Jul 13, 2015
bb252d8
removed case classes from DataFrameFunctionsSuite
BenFradet Jul 13, 2015
7455262
removed TestData use from UDFSuite
BenFradet Jul 14, 2015
71a85bc
removed useless registering of tmp tables in DataFrameAggregateSuite
BenFradet Jul 14, 2015
f2f48d6
removed use of TestData case class in ColumnExpressionSuite
BenFradet Jul 14, 2015
af7a493
refactored test dfs in SQLQuerySuite
BenFradet Jul 14, 2015
2437d07
removed TestData case class from DataFrameJoinSuite
BenFradet Jul 14, 2015
df3d1e8
removed useless variable in UDFSuite
BenFradet Jul 14, 2015
2b89dad
fixed failing rand unit test in ColumnExpressionSuite
BenFradet Jul 14, 2015
262f70a
refactored test dataset in InMemoryColumnarQuerySuite
BenFradet Jul 14, 2015
453781b
removed useless registering of tmp tables in DataFrameSuite
BenFradet Jul 14, 2015
2edb8fd
removed useless case class in JoinSuite
BenFradet Jul 14, 2015
ba2aef9
removed unused variable from PlannerSuite
BenFradet Jul 14, 2015
8dfbab0
fixed scalastyle
BenFradet Jul 14, 2015
c3eeffa
fixed unit tests in DataFrameFunctionsSuite
BenFradet Aug 6, 2015
44c9630
fixed SortMergeJoin test in
BenFradet Aug 9, 2015
1f51144
remove TestData use from SQLMetricsSuite
BenFradet Aug 19, 2015
3a022ab
disabling of load test data in SQLTestUtils
BenFradet Aug 19, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.test.TestSQLContext;
Expand Down Expand Up @@ -234,7 +235,28 @@ public void testSampleBy() {
DataFrame df = context.range(0, 100, 1, 2).select(col("id").mod(3).as("key"));
DataFrame sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L);
Row[] actual = sampled.groupBy("key").count().orderBy("key").collect();
Row[] expected = new Row[] {RowFactory.create(0, 5), RowFactory.create(1, 8)};
Row[] expected = new Row[]{RowFactory.create(0, 5), RowFactory.create(1, 8)};
Assert.assertArrayEquals(expected, actual);
}

public static class TestData implements Serializable {
private int key;
private String value;

public int getKey() {
return key;
}

public void setKey(int key) {
this.key = key;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.storage.{StorageLevel, RDDBlockId}

private case class BigData(s: String)

class CachedTableSuite extends QueryTest with SharedSQLContext {
import testImplicits._

val testData = {
val df = (1 to 100).map(i => (i, i.toString)).toDF("key", "value")
df.registerTempTable("testData")
df
}

def rddIdOf(tableName: String): Int = {
val executedPlan = ctx.table(tableName).queryExecution.executedPlan
executedPlan.collect {
Expand Down Expand Up @@ -111,7 +115,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {

test("too big for memory") {
val data = "*" * 1000
ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => data).toDF("s")
.registerTempTable("bigData")
ctx.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
assert(ctx.table("bigData").count() === 200000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
Row("a") :: Nil)
}

val testData = (1 to 100).map(i => (i, i.toString)).toDF("key", "value")

val testData2 = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")

test("alias") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
assert(df.select(df("a").as("b")).columns.head === "b")
Expand Down Expand Up @@ -190,7 +196,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}

test("star qualified by data frame object") {
val df = testData.toDF
val df = testData
val goldAnswer = df.collect().toSeq
checkAnswer(df.select(df("*")), goldAnswer)

Expand Down Expand Up @@ -259,15 +265,22 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
testData2.collect().toSeq.map(r => Row(-r.getInt(0))))
}

case class TestData(key: Int, value: String)
case class ComplexData(m: Map[String, Int], s: TestData, a: Seq[Int], b: Boolean)
test("unary !") {
val complexData = Seq(
ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1), true),
ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2), false)
).toDF()
checkAnswer(
complexData.select(!$"b"),
complexData.collect().toSeq.map(r => Row(!r.getBoolean(3))))
}

val nullStrings = Seq((1, "abc"), (2, "ABC"), (3, null)).toDF("n", "s")
test("isNull") {
checkAnswer(
nullStrings.toDF.where($"s".isNull),
nullStrings.where($"s".isNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) eq null))

checkAnswer(
Expand All @@ -277,7 +290,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {

test("isNotNull") {
checkAnswer(
nullStrings.toDF.where($"s".isNotNull),
nullStrings.where($"s".isNotNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) ne null))

checkAnswer(
Expand Down Expand Up @@ -513,6 +526,10 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}

test("upper") {
val lowerCaseData = ((1 to 4) zip ('a' to 'd'))
.map(t => (t._1, t._2.toString))
.toDF("n", "l")

checkAnswer(
lowerCaseData.select(upper('l)),
('a' to 'd').map(c => Row(c.toString.toUpperCase))
Expand All @@ -534,6 +551,10 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}

test("lower") {
val upperCaseData = ((1 to 6) zip ('A' to 'F'))
.map(t => (t._1, t._2.toString))
.toDF("N", "L")

checkAnswer(
upperCaseData.select(lower('L)),
('A' to 'F').map(c => Row(c.toString.toLowerCase))
Expand Down Expand Up @@ -634,8 +655,10 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
// Because Rand function is not deterministic, the column rand is not deterministic.
// So, in the optimizer, we will not collapse Project [rand + 1 AS rand1, rand - 1 AS rand2]
// and Project [key, Rand 5 AS rand]. The final plan still has two Projects.
val localTestData = ctx.sparkContext.parallelize((1 to 100).map(i => (i, i.toString)))
.toDF("key", "value")
val dfWithTwoProjects =
testData
localTestData
.select($"key", (rand(5L) + 1).as("rand"))
.select(($"rand" + 1).as("rand1"), ($"rand" - 1).as("rand2"))
checkNumProjects(dfWithTwoProjects, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.DecimalType


class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
import testImplicits._

val testData2 = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")

test("groupBy") {
checkAnswer(
testData2.groupBy("a").agg(sum($"b")),
Expand Down Expand Up @@ -82,6 +85,11 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
}

test("average") {

val decimalData = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")

checkAnswer(
testData2.agg(avg('a)),
Row(2.0))
Expand Down Expand Up @@ -111,6 +119,8 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(6)) :: Nil)
}

val testData3 = Seq((1, None), (2, Some(2))).toDF("a", "b")

test("null average") {
checkAnswer(
testData3.agg(avg('b)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(result, Seq(Row(Row("v", 5.0)), Row(Row("v", 5.0))))
}

val testData2 = {
val df = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")
df.registerTempTable("testData2")
df
}

test("constant functions") {
checkAnswer(
sql("SELECT E()"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
}

test("join - self join") {
val testData = {
val df = (1 to 100).map(i => (i, i.toString)).toDF("key", "value")
df.registerTempTable("testData")
df
}
val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SharedSQLContex
class DataFrameSuite extends QueryTest with SharedSQLContext {
import testImplicits._

val testData = {
val df = (1 to 100).map(i => (i, i.toString)).toDF("key", "value")
df.registerTempTable("testData")
df
}

val testData2 = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")

test("analysis error should be eagerly reported") {
// Eager analysis.
withSQLConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key -> "true") {
Expand Down Expand Up @@ -80,7 +90,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
}

case class TestData(key: Int, value: String)
case class ComplexData(m: Map[String, Int], s: TestData, a: Seq[Int], b: Boolean)
test("access complex data") {
val complexData = Seq(
ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1), true),
ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2), false)).toDF()
assert(complexData.filter(complexData("a").getItem(0) === 2).count() == 1)
assert(complexData.filter(complexData("m").getItem("1") === 1).count() == 1)
assert(complexData.filter(complexData("s").getField("key") === 1).count() == 1)
Expand Down Expand Up @@ -182,7 +197,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("coalesce") {
assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)
assert(testData.select('key).coalesce(1).rdd.partitions.length === 1)

checkAnswer(
testData.select('key).coalesce(1).select('key),
Expand Down Expand Up @@ -233,6 +248,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Row(6))
}


case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]])
val arrayData = Seq(ArrayData(1 to 3, Seq(1 to 3)), ArrayData(2 to 4, Seq(2 to 4)))

test("global sorting") {
checkAnswer(
testData2.orderBy('a.asc, 'b.asc),
Expand Down Expand Up @@ -271,6 +290,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
}

case class MapData(data: scala.collection.Map[Int, String])

test("limit") {
checkAnswer(
testData.limit(10),
Expand All @@ -280,11 +301,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
arrayData.toDF().limit(1),
arrayData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq)))

val mapData = (5 to 1 by - 1)
.map(i => (1 to i) zip ('a' to ('a' + i).toChar).map(_.toString + (6 - i)))
.map(s => MapData(s.toMap))

checkAnswer(
mapData.toDF().limit(1),
mapData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq)))
}

val upperCaseData = ((1 to 6) zip ('A' to 'F'))
.map(t => (t._1, t._2.toString))
.toDF("N", "L")

val lowerCaseData = ((1 to 4) zip ('a' to 'd'))
.map(t => (t._1, t._2.toString))
.toDF("n", "l")

test("except") {
checkAnswer(
lowerCaseData.except(upperCaseData),
Expand Down Expand Up @@ -395,7 +428,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(df.schema.map(_.name) === Seq("key", "value"))
}


test("drop column after join with duplicate columns using column reference") {
val person = Seq((0, "mike", 30), (1, "jim", 20)).toDF("id", "name", "age")
val salary = Seq((0, 2000.0), (1, 1000.0)).toDF("personId", "salary")
val newSalary = salary.withColumnRenamed("personId", "id")
val col = newSalary("id")
// this join will result in duplicate "id" columns
Expand Down Expand Up @@ -603,6 +639,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-6899: type should match when using codegen") {
val decimalData = (for { a <- 1 to 3; b <- 1 to 2 } yield (a, b))
.map(t => (t._1, t._2))
.toDF("a", "b")
withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
checkAnswer(
decimalData.agg(avg('a)),
Expand All @@ -611,6 +650,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-7133: Implement struct, array, and map field accessor") {
val complexData = Seq(
ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1), true),
ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2), false)).toDF()
assert(complexData.filter(complexData("a")(0) === 2).count() == 1)
assert(complexData.filter(complexData("m")("1") === 1).count() == 1)
assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
Expand Down
Loading