Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Resolved conflict
  • Loading branch information
sarutak committed Oct 28, 2016
commit 929f2a85a06795ce91f8aff588ab18ee4cb3d804
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val df = spark.createDataFrame(rdd, StructType(schemas), false)
assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
}

test("""SPARK-17154: df("column_name") should return correct result when we do self-join""") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when:

val joined = df.join(df, "inner")  // columns: col1, col2, col3, col1, col2, col3
val selected  = joined.select(df("col1"))

As there are two plans with the same plan id, the breadth-first search will get one plan among them. So df("col") will be resolved. However, I think in this case, we should have an ambiguous error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good question!

I'm also thinking about this. If a plan id matches more than one sub-tree in the logical plan, should we just fail the query instead of using BFS to pick the first one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, direct-self-join (means both child Datasets are same) is still ambiguous.
In this case, df("colmn-name") will refers to a Dataset of the right side in the proposed implementation.

I'm wondering a direct-self-join like df.join(df, , ) is similar to a query like as follows.

SELECT ... FROM my_table df join my_table df on ;

Those queries should not be valid so I also think we shouldn't allow users to join two same Datasets and warn to duplicate the Dataset if they intend to do direct-self-join.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also thinking about this. If a plan id matches more than one sub-tree in the logical plan, should we just fail the query instead of using BFS to pick the first one?

If logical-plan on the right side is copied by dedupRight, there should be multiple logical-plans which have same planId so it maybe better to fail the query in case of direct-self-join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I can't immediately think out the actual use case the self-join of two same Datasets, I am still wondering do we want to disallow it? Conceptually, it should work, even you can't select columns from it due to ambiguousness. But I think you can still save it or do other operators on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should support a self-join of the same Dataset/DateFrame of the same name. That is,

df.join(df)

should be blocked. We can ask the user to express it as

df.join(df.as("df2"))

, which is clearer. We certainly must not support

df.join(df, df("col1") === df("col2")

, which blindly put "col1" and "col2" to the first df. @sarutak 's solution does change the behaviour to an error.

val df = Seq(
(1, "a", "A"),
Expand All @@ -1624,6 +1624,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(selected3, Row(1) :: Row(2) :: Row(null) :: Row(4) :: Row(5) :: Nil)
}

test("SPARK-17409: Do Not Optimize Query in CTAS (Data source tables) More Than Once") {
withTable("bar") {
withTempView("foo") {
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
sql("select 0 as id").createOrReplaceTempView("foo")
val df = sql("select * from foo group by id")
// If we optimize the query in CTAS more than once, the following saveAsTable will fail
// with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])`
df.write.mode("overwrite").saveAsTable("bar")
checkAnswer(spark.table("bar"), Row(0) :: Nil)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
assert(tableMetadata.provider == Some("json"),
"the expected table is a data source table using json")
}
}
}
}

test("copy results for sampling with replacement") {
val df = Seq((1, 0), (2, 0), (3, 0)).toDF("a", "b")
val sampleDf = df.sample(true, 2.00)
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.