Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Adding new option to the jdbc to allow users to specify create table …
…column types when table is created on write
  • Loading branch information
sureshthalamati committed Mar 23, 2017
commit 0380bbfc5f7d3f5c8e15446ad284b69f90bbdfff
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,13 @@ the following case-insensitive options:
This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>). This option applies only to writing.
</td>
</tr>

<tr>
<td><code>createTableColumnTypes</code></td>
<td>
The database column data types to use instead of the defaults, when creating the table. Data type information should be specified as key(column name)-value(data type) pairs in JSON (e.g: <code>{"name":"varchar(128)", "comments":"clob(20k)"})</code>. You can use <code>org.apache.spark.sql.types.MetadataBuilder</code> to build the metadata and generate the JSON string required for this option. This option applies only to writing.
</td>
</tr>
</table>

<div class="codetabs">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
// $example off:schema_merging$
// $example off:basic_parquet_example$
import org.apache.spark.sql.SparkSession;
// $example on:jdbc_dataset$
import org.apache.spark.sql.types.MetadataBuilder;
// $example off:jdbc_dataset$

public class JavaSQLDataSourceExample {

Expand Down Expand Up @@ -258,6 +261,12 @@ private static void runJdbcDatasetExample(SparkSession spark) {

jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
String columnTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json();
jdbcDF.write()
.option("createTableColumnTypes", columnTypes)
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// $example off:jdbc_dataset$
}
}
6 changes: 6 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ def jdbc_dataset_example(spark):
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "{\"name\":\"VARCHAR(128)\"}") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# $example off:jdbc_dataset$


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.examples.sql
import java.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.MetadataBuilder

object SQLDataSourceExample {

Expand Down Expand Up @@ -181,6 +182,12 @@ object SQLDataSourceExample {

jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
val createTableColTypes = new MetadataBuilder().putString("name", "VARCHAR(128)").build().json
jdbcDF.write
.option("createTableColumnTypes", createTableColTypes)
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// $example off:jdbc_dataset$
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class JDBCOptions(
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
// TODO: to reuse the existing partition parameters for those partition specific options
val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS, "")
val createTableColumnTypes = parameters.get(JDBC_CREATE_TABLE_COLUMN_TYPES)
val batchSize = {
val size = parameters.getOrElse(JDBC_BATCH_INSERT_SIZE, "1000").toInt
require(size >= 1,
Expand Down Expand Up @@ -154,6 +155,7 @@ object JDBCOptions {
val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
}
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,21 @@ object JdbcUtils extends Logging {
/**
* Compute the schema string for this RDD.
*/
def schemaString(schema: StructType, url: String): String = {
def schemaString(
schema: StructType,
url: String,
createTableColumnTypes: Option[String] = None): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
val dbColumnTypeMetadata =
createTableColumnTypes.map(Metadata.fromJson).getOrElse(Metadata.empty)
schema.fields foreach { field =>
val name = dialect.quoteIdentifier(field.name)
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val typ: String = if (dbColumnTypeMetadata.contains(field.name)) {
dbColumnTypeMetadata.getString(field.name)
} else {
getJdbcType(field.dataType, dialect).databaseTypeDefinition
}
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}
Expand Down Expand Up @@ -728,9 +737,10 @@ object JdbcUtils extends Logging {
conn: Connection,
schema: StructType,
options: JDBCOptions): Unit = {
val strSchema = schemaString(schema, options.url)
val table = options.table
val createTableOptions = options.createTableOptions
val createTableColumnTypes = options.createTableColumnTypes
val strSchema = schemaString(schema, options.url, createTableColumnTypes)
// Create the table if the table does not exist.
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -362,4 +362,55 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(sql("select * from people_view").count() == 2)
}
}

test("SPARK-10849: create table using user specified column type.") {
val data = Seq[Row](
Row(1, "dave", "Boston", "electric cars"),
Row(2, "mary", "boston", "building planes")
)
val schema = StructType(
StructField("id", IntegerType) ::
StructField("name", StringType) ::
StructField("city", StringType) ::
StructField("descr", StringType) ::
Nil)
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)

// Using metadata builder to generate metadata json string for create table column types.
val mdb = new MetadataBuilder()
mdb.putString("name", "NVARCHAR(123)")
// Use H2 varchar_ignorecase type instead of TEXT to perform case-insensitive comparisions
mdb.putString("city", "VARCHAR_IGNORECASE(20)")
val createTableColTypes = mdb.build().json
assert(JdbcUtils.schemaString(df.schema, url1, Option(createTableColTypes)) ==
s""""id" INTEGER , "name" NVARCHAR(123) , "city" VARCHAR_IGNORECASE(20) , "descr" TEXT """)
// create the table with the user specified data types, and verify the data
df.write.option("createTableColumnTypes", createTableColTypes)
.jdbc(url1, "TEST.DBCOLTYPETEST", properties)
assert(spark.read.jdbc(url1,
"""(select * from TEST.DBCOLTYPETEST where "city"='Boston')""", properties).count == 2)
}

test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid data type") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val invalidCreateTableColTypes =
new MetadataBuilder().putString("name", "INVALID(123)").build().json
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
df.write.mode(SaveMode.Overwrite)
.option("createTableColumnTypes", invalidCreateTableColTypes)
.jdbc(url1, "TEST.USERDBTYPETEST", properties)
}.getMessage()
assert(msg.contains("Unknown data type: \"INVALID\""))
}

test("SPARK-10849: jdbcCreateTableColumnTypes option with invalid json string") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val msg = intercept[com.fasterxml.jackson.core.JsonParseException] {
df.write.mode(SaveMode.Overwrite)
.option("createTableColumnTypes", """{"name":"NVARCHAR(12)"""")
.jdbc(url1, "TEST.USERDBTYPETEST", properties)
}.getMessage()
assert(
msg.contains("expected close marker for OBJECT (from [Source: {\"name\":\"NVARCHAR(12)\";"))
}
}