Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
customDataFrameColumnTypes -> customSchema
  • Loading branch information
wangyum committed Sep 10, 2017
commit 0b67f0f40b531d86d44e978140f85f68af1de65a
4 changes: 2 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1337,9 +1337,9 @@ the following case-insensitive options:
</tr>

<tr>
<td><code>customDataFrameColumnTypes</code></td>
<td><code>customSchema</code></td>
<td>
The DataFrame column data types to use instead of the defaults when reading data from jdbc API. (e.g: <code>"id DECIMAL(38, 0), name STRING")</code>. The specified types should be valid spark sql data types. This option applies only to reading.
The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ object SQLDataSourceExample {
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying dataframe column data types on read
connectionProperties.put("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING")
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo

// custom schema can read data
val props = new Properties()
props.put("customDataFrameColumnTypes",
props.put("customSchema",
s"ID DECIMAL(${DecimalType.MAX_PRECISION}, 0), N1 INT, N2 BOOLEAN")
val dfRead = spark.read.jdbc(jdbcUrl, "tableWithCustomSchema", props)

Expand Down
14 changes: 2 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
assertJdbcAPISpecifiedDataFrameSchema()
assertNoSpecifiedSchema("jdbc")
// properties should override settings in extraOptions.
this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
Expand Down Expand Up @@ -268,7 +268,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need a test case for this.

assertJdbcAPISpecifiedDataFrameSchema()
assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
Expand Down Expand Up @@ -678,16 +678,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
}

/**
* A convenient function for validate specified column types schema in jdbc API.
*/
private def assertJdbcAPISpecifiedDataFrameSchema(): Unit = {
if (userSpecifiedSchema.nonEmpty) {
throw new AnalysisException("Please use customDataFrameColumnTypes option to " +
"specified column types.")
}
}

/**
* A convenient function for schema validation in datasources supporting
* `columnNameOfCorruptRecord` as an option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class JDBCOptions(
// TODO: to reuse the existing partition parameters for those partition specific options
val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS, "")
val createTableColumnTypes = parameters.get(JDBC_CREATE_TABLE_COLUMN_TYPES)
val customDataFrameColumnTypes = parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES)
val customSchema = parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES)

val batchSize = {
val size = parameters.getOrElse(JDBC_BATCH_INSERT_SIZE, "1000").toInt
Expand Down Expand Up @@ -164,7 +164,7 @@ object JDBCOptions {
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customDataFrameColumnTypes")
val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ private[sql] case class JDBCRelation(

override val schema: StructType = {
val schema = JDBCRDD.resolveTable(jdbcOptions)
val customDataFrameColumnTypes = jdbcOptions.customDataFrameColumnTypes
if (customDataFrameColumnTypes.isDefined) {
JdbcUtils.parseUserSpecifiedColumnTypes(schema, customDataFrameColumnTypes.get,
val customSchema = jdbcOptions.customSchema
if (customSchema.isDefined) {
JdbcUtils.parseUserSpecifiedColumnTypes(schema, customSchema.get,
sqlContext.sessionState.conf.resolver)
} else {
schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ object JdbcUtils extends Logging {
}

/**
* Parses the user specified customDataFrameColumnTypes option value to DataFrame schema,
* Parses the user specified customSchema option value to DataFrame schema,
* and returns it if it's all columns are equals to default schema's.
*/
def parseUserSpecifiedColumnTypes(
Expand All @@ -779,7 +779,7 @@ object JdbcUtils extends Logging {
val userSchema = CatalystSqlParser.parseTableSchema(columnTypes)

SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
userSchema.map(_.name), "in the customSchema option value", nameEquality)

if (userSchema.size != schema.size) {
throw new AnalysisException("Please provide all the columns, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,18 +960,18 @@ class JDBCSuite extends SparkFunSuite
val e1 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
}.getMessage
assert(e1.contains("Please use customDataFrameColumnTypes option to specified column types."))
assert(e1.contains("Please use customSchema option to specified column types."))

val e2 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
}.getMessage
assert(e2.contains("Please use customDataFrameColumnTypes option to specified column types."))
assert(e2.contains("Please use customSchema option to specified column types."))
}

test("jdbc API support custom schema") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
val props = new Properties()
props.put("customDataFrameColumnTypes", "NAME STRING, THEID BIGINT")
props.put("customSchema", "NAME STRING, THEID BIGINT")
val schema = StructType(Seq(
StructField("NAME", StringType, true), StructField("THEID", LongType, true)))
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
Expand All @@ -987,7 +987,7 @@ class JDBCSuite extends SparkFunSuite
|CREATE TEMPORARY VIEW people_view
|USING org.apache.spark.sql.jdbc
|OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', PassWord 'testPass',
|customDataFrameColumnTypes 'NAME STRING, THEID INT')
|customSchema 'NAME STRING, THEID INT')
""".stripMargin.replaceAll("\n", " "))
val schema = StructType(
Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
Expand Down