Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added UUID type.
Added array type writes
  • Loading branch information
mariusvniekerk committed Oct 21, 2015
commit 5db8589f72da740f63970b3fa545751152c9826d
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties
import scala.util.Try

import org.apache.spark.Logging
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}

Expand Down Expand Up @@ -92,7 +92,8 @@ object JdbcUtils extends Logging {
iterator: Iterator[Row],
rddSchema: StructType,
nullTypes: Array[Int],
batchSize: Int): Iterator[Byte] = {
batchSize: Int,
dialect: JdbcDialect): Iterator[Byte] = {
val conn = getConnection()
var committed = false
try {
Expand Down Expand Up @@ -121,6 +122,21 @@ object JdbcUtils extends Logging {
case TimestampType => stmt.setTimestamp(i + 1, row.getAs[java.sql.Timestamp](i))
case DateType => stmt.setDate(i + 1, row.getAs[java.sql.Date](i))
case t: DecimalType => stmt.setBigDecimal(i + 1, row.getDecimal(i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISTM we need to check if input types are valid for target databases in advance, e.g., in JavaUtils#saveTable.
JavaUtils#savePartition should simply put input data as given typed-data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the particular dialect does not support these types saveTable should toss an exception when building the nullTypes array

case ArrayType(elemType, _) =>
val elemDataBaseType = dialect.getJDBCType(elemType).map(_.databaseTypeDefinition).getOrElse(
dialect.getCommonJDBCType(elemType).map(_.databaseTypeDefinition).getOrElse(
throw new IllegalArgumentException(
s"Can't determine array element type for $elemType in field $i")
))
val array: Array[AnyRef] = elemType match {
case _: ArrayType =>
throw new IllegalArgumentException(s"Nested array writes to JDBC are not supported for field $i")
case BinaryType => row.getSeq[Array[Byte]](i).toArray
case TimestampType => row.getSeq[java.sql.Timestamp](i).toArray
case DateType => row.getSeq[java.sql.Date](i).toArray
case _ => row.getSeq[AnyRef](i).toArray
}
stmt.setArray(i + 1, conn.createArrayOf(elemDataBaseType, array))
case _ => throw new IllegalArgumentException(
s"Can't translate non-null value for field $i")
}
Expand Down Expand Up @@ -202,7 +218,7 @@ object JdbcUtils extends Logging {
val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize)
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ case object PostgresDialect extends JdbcDialect {
Some(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
Some(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("uuid")) {
Some(StringType)
} else if (sqlType == Types.ARRAY) {
typeName match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does we need underscores in the head of typeName?
I quickly checked actual strings returned by ResultSetMetaData#getColumnTypeName in postgresql-jdbc, and
I found that they have no underscore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underscores are particularly for the array types. Postgres prepends them to all array types here https://github.com/pgjdbc/pgjdbc/blob/REL9_4_1204/org/postgresql/jdbc2/TypeInfoCache.java#L159

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.

case "_bit" | "_bool" => Some(ArrayType(BooleanType))
Expand All @@ -235,7 +237,7 @@ case object PostgresDialect extends JdbcDialect {
case "_money" | "_float8" => Some(ArrayType(DoubleType))
case "_text" | "_varchar" | "_char" | "_bpchar" | "_name" => Some(ArrayType(StringType))
case "_bytea" => Some(ArrayType(BinaryType))
case "_timestamp" | "timestamptz" | "time" | "timetz" => Some(ArrayType(TimestampType))
case "_timestamp" | "_timestamptz" | "_time" | "_timetz" => Some(ArrayType(TimestampType))
case "_date" => Some(ArrayType(DateType))
case "_numeric"
if size != 0 || scale != 0 => Some(ArrayType(DecimalType(size, scale)))
Expand Down