Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Added Yin's test case
  • Loading branch information
rxin committed Nov 4, 2015
commit 0c7e1cd60ede0306a54549847c7c3e30493a17a1
32 changes: 22 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ class DataFrame private[sql](

/**
* Returns a new [[DataFrame]] with each partition sorted by the given expressions.
*
* This is the same operation as "SORT BY" in SQL (Hive QL).
*
* @group dfops
* @since 1.6.0
*/
Expand All @@ -619,6 +622,9 @@ class DataFrame private[sql](

/**
* Returns a new [[DataFrame]] with each partition sorted by the given expressions.
*
* This is the same operation as "SORT BY" in SQL (Hive QL).
*
* @group dfops
* @since 1.6.0
*/
Expand Down Expand Up @@ -1504,22 +1510,28 @@ class DataFrame private[sql](
}

/**
* Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
* `numPartitions`. The resulting DataFrame is hash partitioned.
* @group dfops
* @since 1.6.0
*/
* Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
* `numPartitions`. The resulting DataFrame is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group dfops
* @since 1.6.0
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
}

/**
* Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
* the existing number of partitions. The resulting DataFrame is hash partitioned.
* @group dfops
* @since 1.6.0
*/
* Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
* the existing number of partitions. The resulting DataFrame is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group dfops
* @since 1.6.0
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): DataFrame = {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,5 +411,17 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
sqlContext.uncacheTable("t2")
sqlContext.dropTempTable("t1")
sqlContext.dropTempTable("t2")

// One side of join is not partitioned in the desired way. Need to shuffle.
testData.repartition(6, $"value").registerTempTable("t1")
testData2.repartition(6, $"a").registerTempTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")

verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"), 2)
sqlContext.uncacheTable("t1")
sqlContext.uncacheTable("t2")
sqlContext.dropTempTable("t1")
sqlContext.dropTempTable("t2")
}
}