Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ed932d2
Temporarily renames Dataset to DS
liancheng Mar 1, 2016
e59e940
Renames DataFrame to Dataset[T]
liancheng Mar 1, 2016
b357371
Fixes Java API compilation failures
liancheng Mar 1, 2016
3783e31
Fixes styling issues
liancheng Mar 1, 2016
a02a922
Fixes compilation failure introduced while rebasing
liancheng Mar 1, 2016
3db81f8
Temporarily disables MiMA check for convenience
liancheng Mar 1, 2016
f67f497
Fixes infinite recursion in Dataset constructor
liancheng Mar 1, 2016
f921583
Fixes test failures
liancheng Mar 3, 2016
fa22261
Migrates encoder stuff to the new Dataset
liancheng Mar 3, 2016
8cf5672
Makes some shape-keeping operations typed
liancheng Mar 5, 2016
712ee19
Adds collectRows() for Java API
liancheng Mar 6, 2016
c73b91f
Migrates joinWith operations
liancheng Mar 6, 2016
54cb36a
Migrates typed select
liancheng Mar 7, 2016
cbd7519
Renames typed groupBy to groupByKey
liancheng Mar 7, 2016
f1a2903
Migrates typed groupBy
liancheng Mar 7, 2016
15b4193
Migrates functional transformers
liancheng Mar 7, 2016
9aff0e2
Removes the old DS class and gets everything compiled
liancheng Mar 7, 2016
f053852
Fixes compilation error
liancheng Mar 7, 2016
3a7aff4
Row encoder should produce GenericRowWithSchema
liancheng Mar 8, 2016
9f8e0ad
Fixes compilation error after rebasing
liancheng Mar 8, 2016
bc081a9
Migrated Dataset.showString should handle typed objects
liancheng Mar 8, 2016
6b69f43
MapObjects should also handle DecimalType and DateType
liancheng Mar 8, 2016
29cb70f
Always use eager analysis
liancheng Mar 9, 2016
ba86e09
Fixes compilation error after rebasing
liancheng Mar 10, 2016
5727f48
Fixes compilation error after rebasing
liancheng Mar 10, 2016
cd63810
Temporarily ignores Python UDT test cases
liancheng Mar 10, 2016
4c8d139
fix python
cloud-fan Mar 10, 2016
cf0c112
Merge pull request #9 from cloud-fan/ds-to-df
liancheng Mar 10, 2016
91942cf
fix pymllib
cloud-fan Mar 10, 2016
736fbcb
Merge pull request #10 from cloud-fan/ds-to-df
liancheng Mar 10, 2016
488a82e
MIMA
yhuai Mar 10, 2016
343c611
Fix typo...
yhuai Mar 10, 2016
63d4d69
MIMA: Exclude DataFrame class.
yhuai Mar 10, 2016
f6bcd50
Revert "MIMA: Exclude DataFrame class."
yhuai Mar 10, 2016
49c6fc2
Revert "Fix typo..."
yhuai Mar 10, 2016
d52ce17
Revert "MIMA"
yhuai Mar 10, 2016
7d29c06
Merge remote-tracking branch 'upstream/master' into ds-to-df
yhuai Mar 11, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Always use eager analysis
  • Loading branch information
liancheng committed Mar 10, 2016
commit 29cb70fa85e5e5dc632a9f7d86fff6f2bdcf0f14
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan


// TODO: don't swallow original stack trace if it exists
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
class AnalysisException protected[sql] (
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None)
val startPosition: Option[Int] = None,
val plan: Option[LogicalPlan] = None)
extends Exception with Serializable {

def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
Expand Down
21 changes: 4 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -128,23 +127,13 @@ class Dataset[T] private[sql](
encoder: Encoder[T])
extends Queryable with Serializable {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be renamed to Dataset.scala. Didn't do this yet for better diff view.

queryExecution.assertAnalyzed()

// Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.

/**
* A constructor that automatically analyzes the logical plan.
*
* This reports error eagerly as the [[DataFrame]] is constructed, unless
* [[SQLConf.dataFrameEagerAnalysis]] is turned off.
*/
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sqlContext, {
val qe = sqlContext.executePlan(logicalPlan)
if (sqlContext.conf.dataFrameEagerAnalysis) {
qe.assertAnalyzed() // This should force analysis and throw errors if there are any
}
qe
}, encoder)
this(sqlContext, sqlContext.executePlan(logicalPlan), encoder)
}

@transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
Expand All @@ -164,9 +153,7 @@ class Dataset[T] private[sql](
* same object type (that will be possibly resolved to a different schema).
*/
private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(encoder)
if (sqlContext.conf.dataFrameEagerAnalysis) {
unresolvedTEncoder.validate(logicalPlan.output)
}
unresolvedTEncoder.validate(logicalPlan.output)

/** The encoder for this [[Dataset]] that has been resolved to its output schema. */
private[sql] val resolvedTEncoder: ExpressionEncoder[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}

Expand All @@ -31,7 +31,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
*/
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {

def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)
def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch {
case e: AnalysisException =>
throw new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to also pass the stack trace over

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the simplest way is to make AnalysisException's plan field mutable

}

lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)

Expand Down
26 changes: 9 additions & 17 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("analysis error should be eagerly reported") {
// Eager analysis.
withSQLConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key -> "true") {
intercept[Exception] { testData.select('nonExistentName) }
intercept[Exception] {
testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
}
intercept[Exception] {
testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
}
intercept[Exception] {
testData.groupBy($"abcd").agg(Map("key" -> "sum"))
}
intercept[Exception] { testData.select('nonExistentName) }
intercept[Exception] {
testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
}

// No more eager analysis once the flag is turned off
withSQLConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key -> "false") {
testData.select('nonExistentName)
intercept[Exception] {
testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
}
intercept[Exception] {
testData.groupBy($"abcd").agg(Map("key" -> "sum"))
}
}

Expand All @@ -72,7 +64,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Row(1, 1) :: Nil)
}

test("invalid plan toString, debug mode") {
ignore("invalid plan toString, debug mode") {
// Turn on debug mode so we can see invalid query plans.
import org.apache.spark.sql.execution.debug._

Expand Down
22 changes: 11 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ abstract class QueryTest extends PlanTest {
protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
val analyzedDF = try df catch {
case ae: AnalysisException =>
val currentValue = sqlContext.conf.dataFrameEagerAnalysis
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
val partiallyAnalzyedPlan = df.queryExecution.analyzed
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, currentValue)
fail(
s"""
|Failed to analyze query: $ae
|$partiallyAnalzyedPlan
|
|${stackTraceToString(ae)}
|""".stripMargin)
if (ae.plan.isDefined) {
fail(
s"""
|Failed to analyze query: $ae
|${ae.plan.get}
|
|${stackTraceToString(ae)}
|""".stripMargin)
} else {
throw ae
}
}

checkJsonFormat(analyzedDF)
Expand Down