Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``inferSchema`` option or specify the schema explicitly using ``schema``.

:param path: string, or list of strings, for input path(s).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: . -> ,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks :)

or RDD of Strings storing CSV rows.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets the single character as a separator for each field and value.
Expand Down Expand Up @@ -408,6 +409,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
"""
self._set_opts(
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
Expand All @@ -420,7 +425,22 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
if type(path) == list:
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
elif isinstance(path, RDD):
def func(iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
if isinstance(x, unicode):
x = x.encode("utf-8")
yield x
keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a way within Python and this seems working:

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1ed452d895b..0f54065b3ee 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -438,7 +438,10 @@ class DataFrameReader(OptionUtils):
             keyed = path.mapPartitions(func)
             keyed._bypass_serializer = True
             jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
-            return self._df(self._jreader.csv(jrdd))
+            jdataset = self._spark._jsqlContext.createDataset(
+                jrdd.rdd(),
+                self._spark._sc._jvm.Encoders.STRING())
+            return self._df(self._jreader.csv(jdataset))
         else:
             raise TypeError("path can be only string, list or RDD")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goldmedal, it'd be great if you could double check whether this really works and it can be shorten or cleaner. This was just my rough try only to reach the goal so I am not sure if it is the best way.

Copy link
Contributor Author

@goldmedal goldmedal Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, This way looks good. I'll try it. Thanks for your suggestion.

return self._df(self._jreader.csv(jrdd))
else:
raise TypeError("path can be only string, list or RDD")

@since(1.5)
def orc(self, path):
Expand Down
34 changes: 34 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,40 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
csv(Seq(path): _*)
}

/**
* Loads a `JavaRDD[String]` storing storing CSV rows and returns the result as a `DataFrame`.
*
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
* this function goes through the input once to determine the input schema.
*
* If the schema is not specified using `schema` function and `inferSchema` option is disabled,
* it determines the columns as string types and it reads only the first line to determine the
* names and the number of fields.
*
* @param csvRDD input RDD with one CSV row per record
* @since 2.2.0
*/
@deprecated("Use csv(Dataset[String]) instead.", "2.2.0")
def csv(csvRDD: JavaRDD[String]): DataFrame = csv(csvRDD.rdd)

/**
* Loads a `RDD[String]` storing storing CSV rows and returns the result as a `DataFrame`.
*
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
* this function goes through the input once to determine the input schema.
*
* If the schema is not specified using `schema` function and `inferSchema` option is disabled,
* it determines the columns as string types and it reads only the first line to determine the
* names and the number of fields.
*
* @param csvRDD input RDD with one CSV row per record
* @since 2.2.0
*/
@deprecated("Use csv(Dataset[String]) instead.", "2.2.0")
def csv(csvRDD: RDD[String]): DataFrame = {
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait ... I think we shouldn't introduce an RDD API in Scala side. I was thinking doing this within Python-side, or maybe adding a private wrapper in Scala side if required .. Will take a closer look tomorrow (KST).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reviewing :)
umm.. I followed spark.read.json's way to add them. Although json(jsonRDD :RDD[String] has been deprecated, PySpark still use it to create a DataFrame. I think adding a private wrapper in Scala maybe better because not only PySpark but SparkR maybe need it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. +1 for @HyukjinKwon's advice. We cannot add a deprecated method which doesn't exist in 2.2.0 at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...It's weird to add a deprecated method. :) We either add a special wrapper for this purpose or doing this in python-side if possible and not complicated.

csv(sparkSession.createDataset(csvRDD)(Encoders.STRING))
}

/**
* Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false)
}

test("simple csv test with string RDD") {
val csvRDD = spark.sparkContext.textFile(testFile(carsFile))
val cars = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvRDD)

verifyCars(cars, withHeader = true, checkTypes = true)

val carsWithoutHeader = spark.read
.option("header", "false")
.csv(csvRDD)

verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false)
}

test("test inferring booleans") {
val result = spark.read
.format("csv")
Expand Down