Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Committing to fix code review comments
  • Loading branch information
Salil Surendran committed Feb 5, 2017
commit a0c7c22e4097adbdf12e52db37c26a2246d4eddd
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener}
import org.apache.spark.sql.util.{OutputParams}

/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -247,7 +248,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
options = extraOptions.toMap)

val destination = source match {
case "jdbc" => extraOptions.get("dbtable")
case "jdbc" => extraOptions.get(JDBCOptions.JDBC_TABLE_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JDBC, the source value might not be jdbc. For example, jDbC, JDBC, org.apache.spark.sql.jdbc.DefaultSource, org.apache.spark.sql.jdbc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please give me some more info. Looking at the DataFrameWriter#jdbc method it sets the source as "jdbc". Are there other places that this source is being set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

    df.write.format("org.apache.spark.sql.jdbc")
    .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
    .save()

case _ => extraOptions.get("path")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the external data source connectors, it might not have path.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for methods like saveAsTable() there is no path. Do you see a issue here?

Copy link
Member

@gatorsmile gatorsmile Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputParams The output parameters in case the method is invoked as a result of a write operation.

It sounds like OutputParams is designed for the write path. It is being used for description? Could we make it more general? For example, using a Map[String, String] like data structure? In the future, we can easily use/extend it for the other code paths?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it more general? For example, using a Map[String, String]

Being the person who requested this class instead of an opaque map, I think using an opaque map makes for a really bad user API. The listener now needs to know about "magic keys" that have special meaning, which can vary depending on the destination. So you end up making up some contract that certain keys have some special meanings an all sources need to use them that way, so basically you end up encoding this class in a map.

That being said I'm not super happy with the way JDBC works, because there's still some information embedded in the map. I thought about it a little but didn't come up with a good solution; embedding the table name in the JDBC URI sounds hacky and brittle. Best one I got is a separate field in this class (e.g. serverUri) that can be used to identify the server that is hosting the destination value (not needed for FS-based destinations since it's in the URI, but could be useful in other cases - maybe other table-based systems like Kudu or HBase).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make it more general instead of introducing a class for the write path only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. calling the save method adds a "path" key to the option map, but is that key name a public API?

yes, it is. e.g. df.write.format("parquet").option("path", some_path).save(), the path is a "magic key" and we've exposed it to users, so path is a public API and if we change it, we will break existing applications.

Copy link
Contributor

@cloud-fan cloud-fan Feb 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all the "magic keys" in the options used by DataFrameWriter are public APIs, they are not going to change and users need to know about them if they wanna fine-grained control to the write operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all the "magic keys" in the options used by DataFrameWriter are public APIs

That's good to know, but they only seem to be, at best, indirectly documented. The DataFrameWriter API doesn't say anything about the keys used by any of the methods, and sql-programming-guide.md only touches on a handful of them; for example, none of the JDBC keys are documented.

If you want to introduce an external public interface, we need a careful design. This should be done in a separate PR.

I agree that it needs a careful design and the current one doesn't cover all the options. But this PR is of very marginal value without this information being exposed in some way. If you guys feel strongly that it should be a map and that's it, I guess it will be hard to argue. Then we'll have to do that and document all the keys used internally by Spark and make them public, and promise ourselves that we'll never break them.

My belief is that a more structured type would help here. Since the current code is obviously not enough, we could have something that's more future-proof, like:

// Generic, just exposes the raw options, no stability guarantee past what SQL API provides.
class QueryExecutionParams(options: Map[])

// For FS-based sources
class FsOutputParams(dataSourceType: String, path: String, options: Map[]) extends QueryExecutionParams

// For JDBC
class JdbcOutputParams(table: String, url: String, options: Map[]) extends QueryExecutionParams

// Add others that are interesting.

Then listeners can easily handle future params by matching and handling the generic params.

Anyway, my opinion is that a raw map is not a very good API, regardless of API stability; it's hard to use and easy to break. But I'll defer to you guys if you really don't like my suggestions.

Copy link
Member

@gatorsmile gatorsmile Feb 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes those are public APIs.

}
val outputParams = OutputParams(source, destination, extraOptions.toMap)
Expand Down Expand Up @@ -467,7 +468,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
)
val qe = df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
val outputParams = new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
withAction("saveAsTable", qe, outputParams)(qe.toRdd)
}

Expand Down