Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding schema_of_csv and tests
  • Loading branch information
MaxGekk committed Sep 21, 2018
commit 22cfb46c7677fa02c2a6d8fce36d7a1f0732131b
25 changes: 25 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,31 @@ object functions {
withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap))
}

/**
* Parses a column containing a CSV string and infers its schema.
*
* @param e a string column containing CSV data.
*
* @group collection_funcs
* @since 2.5.0
*/
def schema_of_csv(e: Column): Column = withExpr(new SchemaOfCsv(e.expr))

/**
* Parses a column containing a CSV string and infers its schema using options.
*
* @param e a string column containing CSV data.
* @param options options to control how the CSV is parsed. accepts the same options and the
* json data source. See [[DataFrameReader#csv]].
* @return a column with string literal containing schema in DDL format.
*
* @group collection_funcs
* @since 2.5.0
*/
def schema_of_csv(e: Column, options: java.util.Map[String, String]): Column = {
withExpr(SchemaOfCsv(e.expr, options.asScala.toMap))
}

// scalastyle:off line.size.limit
// scalastyle:off parameter.number

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import collection.JavaConverters._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -71,4 +73,10 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
df1.selectExpr("from_csv(value, 'a INT')"),
Row(Row(1)) :: Nil)
}

test("infers schemas using options") {
val df = spark.range(1)
.select(schema_of_csv(lit("0.1 1"), Map("sep" -> " ").asJava))
checkAnswer(df, Seq(Row("struct<_c0:double,_c1:int>")))
}
}