Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
simplify the code about sql text in DF
  • Loading branch information
LantaoJin committed Mar 15, 2018
commit 92293c6b0890eeb439dddf2157122bc850a8a711
64 changes: 32 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

private[sql] object Dataset {
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
def apply[T: Encoder](sparkSession: SparkSession,
logicalPlan: LogicalPlan, sqlText: String = ""): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]], sqlText)
// Eagerly bind the encoder so we verify that the encoder matches the underlying
// schema. The user will get an error if this is not the case.
dataset.deserializer
dataset
}

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
def ofRows(sparkSession: SparkSession,
logicalPlan: LogicalPlan, sqlText: String = ""): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), sqlText)
}
}

Expand Down Expand Up @@ -166,31 +168,25 @@ private[sql] object Dataset {
class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
encoder: Encoder[T],
val sqlText: String = "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the exact rule you defined to decide whether or not we should propagate the sql text?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how does the SQL shell execute commands? like SELECT * FROM ..., does it display all the rows or add a LIMIT before displaying? Generally we should not propagate sql text, as a new DataFrame usually means the plan is changed, the SQL text is not accurate anymore.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I agree this comment. Before the discuss, let me reproduce the scenario our company met. Team A developed a framework to submit application with sql sentences in a file

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

In the biz.sql, there are many sql sentences like

create or replace temporary view view_a select xx from table ${old_db}.table_a where dt=${check_date};
insert overwrite table ${new_db}.table_a select xx from view_a join ${new_db}.table_b;
...

There is no case like
val df = spark.sql("xxxxx")
spark.range(10).collect()
df.filter(..).count()

Team B (Platform) need to capture the really sql sentences which are executed in whole cluster, as the sql files from Team A contains many variables. A better way is recording the really sql sentence in EventLog.

Ok, back to the discussion. The original purpose is to display the sql sentence which user inputs. spark.range(10).collect() isn't a sql sentence user inputs, either df.filter(..).count() . Only "xxxxx" is. So I have two proposals and a further think.

  1. Change the display behavior, only displays the sql which can trigger action. like "create table", "insert overwrite", etc. Do not care about the select sentence. That won't propagate sql text any more. The test case above won't show anything in SQL ui. Also, the ui will show "Sql text which triggers this execution" instead of "Sql text"
  2. Add a SQLCommandEvent and post an event with sql sentence in method SparkSession.sql(), then in the EventLoggingListener, just logging this to eventlog. I am not sure in this way, we still can get the sql text in ui.

Further more, what about open another ticket to add a command option --sqlfile biz.sql in spark-submit command. biz.sql must be a file consist by sql sentence. Base this implementation, not only client mode but also cluster mode can use pure sql.

How do you think? @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

How does com.ebay.SQLFramework process the sql file? just call spark.sql(xxxx).show or other stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your speculation is almost right. First call val df = spark.sql(), then separates the sql text with pattern matching to there type: count, limit and other. if count, then invoke the df.showString(2,20). if limit, just invoke df.limit(1).foreach, the last type other will do noting.

extends Serializable {

queryExecution.assertAnalyzed()

// Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.

def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
def this(sparkSession: SparkSession, logicalPlan: LogicalPlan,
encoder: Encoder[T], sqlText: String = "") = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, sqlText)
}

def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sqlContext.sparkSession, logicalPlan, encoder)
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan,
encoder: Encoder[T], sqlText: String = "") = {
this(sqlContext.sparkSession, logicalPlan, encoder, sqlText)
}

private var _sqlText: String = _

def setSqlText(sqlText: String): Dataset[T] = {
_sqlText = sqlText
this
}

def sqlText: String = _sqlText

@transient private[sql] val logicalPlan: LogicalPlan = {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
Expand Down Expand Up @@ -260,7 +256,7 @@ class Dataset[T] private[sql](
Column(col).cast(StringType)
}
}
val takeResult = newDf.select(castCols: _*).setSqlText(sqlText).take(numRows + 1)
val takeResult = newDf.select(castCols: _*).take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)

Expand Down Expand Up @@ -399,7 +395,8 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
def toDF(): DataFrame =
new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema), sqlText)

/**
* :: Experimental ::
Expand Down Expand Up @@ -631,7 +628,8 @@ class Dataset[T] private[sql](
outputPartitioning,
physicalPlan.outputOrdering,
isStreaming
)(sparkSession)).as[T]
)(sparkSession),
sqlText).as[T]
}

/**
Expand Down Expand Up @@ -1373,10 +1371,11 @@ class Dataset[T] private[sql](
planWithBarrier)

if (encoder.flat) {
new Dataset[U1](sparkSession, project, encoder)
new Dataset[U1](sparkSession, project, encoder, sqlText)
} else {
// Flattens inner fields of U1
new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder), sqlText)
.map(_._1)
}
}

Expand All @@ -1390,7 +1389,7 @@ class Dataset[T] private[sql](
val namedColumns =
columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders), sqlText)
}

/**
Expand Down Expand Up @@ -1466,7 +1465,7 @@ class Dataset[T] private[sql](
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
Filter(condition.expr, planWithBarrier)
}.setSqlText(sqlText)
}

/**
* Filters rows using the given SQL expression.
Expand Down Expand Up @@ -2032,7 +2031,7 @@ class Dataset[T] private[sql](
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new Dataset[T](
sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder)
sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder, sqlText)
}.toArray
}

Expand Down Expand Up @@ -2592,7 +2591,7 @@ class Dataset[T] private[sql](
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, planWithBarrier),
implicitly[Encoder[U]])
implicitly[Encoder[U]], sqlText)
}

/**
Expand Down Expand Up @@ -2622,7 +2621,8 @@ class Dataset[T] private[sql](
val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
Dataset.ofRows(
sparkSession,
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier))
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier),
sqlText)
}

/**
Expand Down Expand Up @@ -3301,21 +3301,21 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
Dataset.ofRows(sparkSession, logicalPlan, sqlText)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
Dataset(sparkSession, logicalPlan, sqlText)
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
Dataset.ofRows(sparkSession, logicalPlan, sqlText).asInstanceOf[Dataset[U]]
} else {
Dataset(sparkSession, logicalPlan)
Dataset(sparkSession, logicalPlan, sqlText)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,8 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
val df = Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
df.setSqlText(substitutor.substitute(sqlText))
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText),
substitutor.substitute(sqlText))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @LantaoJin .
What you need is just grapping the initial SQL text here, you can use Spark extension. Please refer Spark Atlas Connector for a sample code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to refactor this PR into ParserExtension and UI part. I think that will be less intrusive than the current implementation.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, in general, the initial SQL texts easily become meaningless when another operations are added. In your example, the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

val df = spark.sql("xxxxx")
df.filter(...).collect() // shows sql text "xxxxx"

As another example, please try the following. It will show you select a,b from t1.

scala> spark.sql("select a,b from t1").select("a").show
+---+
|  a|
+---+
|  1|
+---+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

Yes. We know this, so current implementation which bind sql text to DF is not good.

}

/**
Expand Down