Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-4782] Added generic inferSchema method to allow schema infer f…
…or an RDD of a generic type T for which the user provides a mapping function from RDD[T] => RDD[Map[String,Any]]
  • Loading branch information
lucarosellini committed Jun 1, 2015
commit 65b44fa3219538265055285cee072267af0a7b85
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,24 @@ private[sql] object JsonRDD extends Logging {

private[sql] def inferSchema(
json: RDD[String],
samplingRatio: Double = 1.0,
samplingRatio: Double,
columnNameOfCorruptRecords: String): StructType = {

inferSchema(json, samplingRatio, columnNameOfCorruptRecords, parseJson)
}

private[sql] def inferSchema[T <: AnyRef](
json: RDD[T],
samplingRatio: Double = 1.0,
columnNameOfCorruptRecords: String,
parseData: (RDD[T], String) => RDD[Map[String, Any]]): StructType = {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
val allKeys =
if (schemaData.isEmpty()) {
Set.empty[(String, DataType)]
} else {
parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
parseData(json,columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
}
createSchema(allKeys)
}
Expand Down