-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for … #16664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
751ded0
b0392ed
752125a
ecf9f34
a0c7c22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.TableIdentifier | |
| import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} | ||
| import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable | ||
| import org.apache.spark.sql.execution.QueryExecution | ||
| import org.apache.spark.sql.execution.command.DDLUtils | ||
| import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} | ||
| import org.apache.spark.sql.sources.BaseRelation | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener} | ||
|
|
||
| /** | ||
| * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, | ||
|
|
@@ -189,6 +191,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| this | ||
| } | ||
|
|
||
| /** | ||
| * Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener} | ||
| * methods. | ||
|
||
| * | ||
| * @param funcName A identifier for the method executing the query | ||
| * @param qe the @see [[QueryExecution]] object associated with the | ||
| * query | ||
|
||
| * @param outputParams The output parameters useful for query analysis | ||
| * @param action the function that executes the query after which the listener methods gets | ||
| * called. | ||
| */ | ||
| private def executeAndCallQEListener( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about renaming it
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you are saying rename the method executeAndCallQEListener to withAction?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
| funcName: String, | ||
|
||
| qe: QueryExecution, | ||
| outputParams: OutputParams)(action: => Unit) = { | ||
| try { | ||
| val start = System.nanoTime() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| action | ||
| val end = System.nanoTime() | ||
| df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some(outputParams)) | ||
| } catch { | ||
| case e: Exception => | ||
| df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some(outputParams)) | ||
| throw e | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Saves the content of the `DataFrame` at the specified path. | ||
| * | ||
|
|
@@ -218,7 +247,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| bucketSpec = getBucketSpec, | ||
| options = extraOptions.toMap) | ||
|
|
||
| dataSource.write(mode, df) | ||
| val destination = source match { | ||
| case "jdbc" => extraOptions.get("dbtable") | ||
|
||
| case _ => extraOptions.get("path") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the external data source connectors, it might not have
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes for methods like saveAsTable() there is no path. Do you see a issue here?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It sounds like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Being the person who requested this class instead of an opaque map, I think using an opaque map makes for a really bad user API. The listener now needs to know about "magic keys" that have special meaning, which can vary depending on the destination. So you end up making up some contract that certain keys have some special meanings an all sources need to use them that way, so basically you end up encoding this class in a map. That being said I'm not super happy with the way JDBC works, because there's still some information embedded in the map. I thought about it a little but didn't come up with a good solution; embedding the table name in the JDBC URI sounds hacky and brittle. Best one I got is a separate field in this class (e.g.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to make it more general instead of introducing a class for the write path only.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes, it is. e.g.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually all the "magic keys" in the options used by
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's good to know, but they only seem to be, at best, indirectly documented. The
I agree that it needs a careful design and the current one doesn't cover all the options. But this PR is of very marginal value without this information being exposed in some way. If you guys feel strongly that it should be a map and that's it, I guess it will be hard to argue. Then we'll have to do that and document all the keys used internally by Spark and make them public, and promise ourselves that we'll never break them. My belief is that a more structured type would help here. Since the current code is obviously not enough, we could have something that's more future-proof, like: Then listeners can easily handle future params by matching and handling the generic params. Anyway, my opinion is that a raw map is not a very good API, regardless of API stability; it's hard to use and easy to break. But I'll defer to you guys if you really don't like my suggestions.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes those are public APIs. |
||
| } | ||
|
|
||
| executeAndCallQEListener( | ||
| "save", | ||
| df.queryExecution, | ||
| OutputParams(source, destination, extraOptions.toMap)) { | ||
|
||
| dataSource.write(mode, df) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: the style issue. withAction("save", df.queryExecution, OutputParams(source, destination, extraOptions.toMap)) {
dataSource.write(mode, df)
} |
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -244,6 +283,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * | ||
| * Because it inserts data to an existing table, format or options will be ignored. | ||
| * | ||
| * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with | ||
| * @see[[OutputParams]] having datasourceType set as the string parameter passed to the | ||
| * @see[[DataFrameWriter#format]] method and destination set as the name of the table into which | ||
| * data is being inserted into. | ||
| * | ||
| * @since 1.4.0 | ||
| */ | ||
| def insertInto(tableName: String): Unit = { | ||
|
|
@@ -261,13 +305,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| ) | ||
| } | ||
|
|
||
| df.sparkSession.sessionState.executePlan( | ||
| val qe = df.sparkSession.sessionState.executePlan( | ||
| InsertIntoTable( | ||
| table = UnresolvedRelation(tableIdent), | ||
| partition = Map.empty[String, Option[String]], | ||
| child = df.logicalPlan, | ||
| overwrite = mode == SaveMode.Overwrite, | ||
| ifNotExists = false)).toRdd | ||
| ifNotExists = false)) | ||
| executeAndCallQEListener( | ||
| "insertInto", | ||
| qe, | ||
| new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { | ||
| qe.toRdd | ||
| } | ||
|
||
| } | ||
|
|
||
| private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => | ||
|
|
@@ -324,7 +374,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
|
|
||
| private def assertNotPartitioned(operation: String): Unit = { | ||
| if (partitioningColumns.isDefined) { | ||
| throw new AnalysisException( s"'$operation' does not support partitioning") | ||
| throw new AnalysisException(s"'$operation' does not support partitioning") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -359,6 +409,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL | ||
| * specific format. | ||
| * | ||
| * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with a | ||
| * @see[[OutputParams]] object having datasourceType set as the string parameter passed to the | ||
| * @see[[DataFrameWriter#format]] and destination set as the name of the table being | ||
| * written to | ||
| * @since 1.4.0 | ||
| */ | ||
| def saveAsTable(tableName: String): Unit = { | ||
|
|
@@ -428,8 +482,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| partitionColumnNames = partitioningColumns.getOrElse(Nil), | ||
| bucketSpec = getBucketSpec | ||
| ) | ||
| df.sparkSession.sessionState.executePlan( | ||
| CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd | ||
| val qe = df.sparkSession.sessionState.executePlan( | ||
| CreateTable(tableDesc, mode, Some(df.logicalPlan))) | ||
| executeAndCallQEListener( | ||
| "saveAsTable", | ||
| qe, | ||
| new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { | ||
|
||
| qe.toRdd | ||
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -493,6 +553,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * indicates a timestamp format. Custom date formats follow the formats at | ||
| * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> | ||
| * </ul> | ||
| * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with | ||
| * @see[[OutputParams]] having datasourceType set as string constant "json" and | ||
| * destination set as the path to which the data is written | ||
| * | ||
| * @since 1.4.0 | ||
| */ | ||
|
|
@@ -514,6 +577,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override | ||
| * `spark.sql.parquet.compression.codec`.</li> | ||
| * </ul> | ||
| * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with | ||
| * @see[[OutputParams]] having datasourceType set as string constant "parquet" and | ||
| * destination set as the path to which the data is written | ||
|
||
| * | ||
| * @since 1.4.0 | ||
| */ | ||
|
|
@@ -534,6 +600,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). | ||
| * This will override `orc.compress`.</li> | ||
| * </ul> | ||
| * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with | ||
| * @see[[OutputParams]] having datasourceType set as string constant "orc" and | ||
| * destination set as the path to which the data is written | ||
| * | ||
| * @since 1.5.0 | ||
| * @note Currently, this method can only be used after enabling Hive support | ||
|
|
@@ -560,6 +629,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, | ||
| * `snappy` and `deflate`). </li> | ||
| * </ul> | ||
| * Calls the callback methods in e@see[[QueryExecutionListener]] methods after query execution | ||
| * with @see[[OutputParams]] having datasourceType set as string constant "text" and | ||
| * destination set as the path to which the data is written | ||
| * | ||
| * @since 1.6.0 | ||
| */ | ||
|
|
@@ -599,6 +671,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
| * indicates a timestamp format. Custom date formats follow the formats at | ||
| * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> | ||
| * </ul> | ||
| * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with | ||
| * @see[[OutputParams]] having datasourceType set as string constant "csv" and | ||
| * destination set as the path to which the data is written | ||
| * | ||
| * @since 2.0.0 | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,12 +40,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} | |
| import org.apache.spark.sql.execution._ | ||
| import org.apache.spark.sql.execution.datasources.LogicalRelation | ||
| import org.apache.spark.sql.execution.ui.SQLListener | ||
| import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} | ||
| import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} | ||
| import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
| import org.apache.spark.sql.sources.BaseRelation | ||
| import org.apache.spark.sql.streaming._ | ||
| import org.apache.spark.sql.types.{DataType, LongType, StructType} | ||
| import org.apache.spark.sql.util.ExecutionListenerManager | ||
| import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
||
|
|
@@ -876,6 +876,9 @@ object SparkSession { | |
| } | ||
| session = new SparkSession(sparkContext) | ||
| options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } | ||
| for (qeListener <- createQueryExecutionListeners(session.sparkContext.getConf)) { | ||
| session.listenerManager.register(qeListener) | ||
| } | ||
| defaultSession.set(session) | ||
|
|
||
| // Register a successfully instantiated context to the singleton. This should be at the | ||
|
|
@@ -893,6 +896,12 @@ object SparkSession { | |
| } | ||
| } | ||
|
|
||
| private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = { | ||
| conf.get(SQLConf.QUERY_EXECUTION_LISTENERS) | ||
| .map(Utils.classForName(_)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: -> |
||
| .map(_.newInstance().asInstanceOf[QueryExecutionListener]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simply throwing Could you use the try and catch to issue a better error message when we are unable to create/initialize the class? Thanks! |
||
| } | ||
|
|
||
| /** | ||
| * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]]. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -655,6 +655,13 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val QUERY_EXECUTION_LISTENERS = | ||
|
||
| ConfigBuilder("spark.sql.queryExecutionListeners") | ||
| .doc("QueryExecutionListeners to be attached to the SparkSession") | ||
|
||
| .stringConf | ||
| .toSequence | ||
| .createWithDefault(Nil) | ||
|
|
||
| object Deprecated { | ||
| val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,27 +44,49 @@ trait QueryExecutionListener { | |
| * @param qe the QueryExecution object that carries detail information like logical plan, | ||
| * physical plan, etc. | ||
| * @param durationNs the execution time for this query in nanoseconds. | ||
| * | ||
| * @note This can be invoked by multiple different threads. | ||
| * @param outputParams The output parameters in case the method is invoked as a result of a | ||
| * write operation. In case of a read will be @see[[None]] | ||
| */ | ||
| @DeveloperApi | ||
| def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit | ||
|
|
||
| def onSuccess( | ||
| funcName: String, | ||
| qe: QueryExecution, | ||
| durationNs: Long, | ||
| outputParams: Option[OutputParams]): Unit | ||
| /** | ||
| * A callback function that will be called when a query execution failed. | ||
| * | ||
| * @param funcName the name of the action that triggered this query. | ||
| * @param qe the QueryExecution object that carries detail information like logical plan, | ||
| * physical plan, etc. | ||
| * @param exception the exception that failed this query. | ||
| * @param outputParams The output parameters in case the method is invoked as a result of a | ||
| * write operation. In case of a read will be @see[[None]] | ||
| * | ||
| * @note This can be invoked by multiple different threads. | ||
| */ | ||
| @DeveloperApi | ||
| def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit | ||
| def onFailure( | ||
| funcName: String, | ||
| qe: QueryExecution, | ||
| exception: Exception, | ||
| outputParams: Option[OutputParams]): Unit | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Contains extra information useful for query analysis passed on from the methods in | ||
| * @see[[org.apache.spark.sql.DataFrameWriter]] while writing to a datasource | ||
| * @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc. | ||
| * @param destination path or table name written to | ||
| * @param options the map containing the output options for the underlying datasource | ||
| * specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method | ||
| * @param writeParams will contain any extra information that the write method wants to provide | ||
| */ | ||
| case class OutputParams( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks reasonable to provide more information to the listeners for write operations. However, this will be public, I think we should think about it more carefully to get a better design, can we do it later?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry arguments to this class seem to have been picked pretty randomly. Can you explain more why these parameters are picked? |
||
| datasourceType: String, | ||
| destination: Option[String], | ||
| options: Map[String, String], | ||
| writeParams: Map[String, String] = Map.empty) | ||
| /** | ||
| * :: Experimental :: | ||
| * | ||
|
|
@@ -98,18 +120,26 @@ class ExecutionListenerManager private[sql] () extends Logging { | |
| listeners.clear() | ||
| } | ||
|
|
||
| private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { | ||
| private[sql] def onSuccess( | ||
| funcName: String, | ||
| qe: QueryExecution, | ||
| duration: Long, | ||
| outputParams: Option[OutputParams] = None): Unit = { | ||
| readLock { | ||
| withErrorHandling { listener => | ||
| listener.onSuccess(funcName, qe, duration) | ||
| listener.onSuccess(funcName, qe, duration, outputParams) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { | ||
| private[sql] def onFailure( | ||
| funcName: String, | ||
| qe: QueryExecution, | ||
| exception: Exception, | ||
| outputParams: Option[OutputParams] = None): Unit = { | ||
| readLock { | ||
| withErrorHandling { listener => | ||
| listener.onFailure(funcName, qe, exception) | ||
| listener.onFailure(funcName, qe, exception, outputParams) | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this new option belongs in this section. It has nothing to do with performance and this description now sounds weird. A separate section for it would be better, even if it's the only option there.