Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ def _infer_schema(row):

def _create_converter(obj, dataType):
"""Create an converter to drop the names of fields in obj """
if not _has_struct(dataType):
return lambda x: x

elif isinstance(dataType, ArrayType):
if isinstance(dataType, ArrayType):
conv = _create_converter(obj[0], dataType.elementType)
return lambda row: map(conv, row)

Expand All @@ -510,6 +507,9 @@ def _create_converter(obj, dataType):
conv = _create_converter(value, dataType.valueType)
return lambda row: dict((k, conv(v)) for k, v in row.iteritems())

elif not isinstance(dataType, StructType):
return lambda x: x

# dataType must be StructType
names = [f.name for f in dataType.fields]

Expand All @@ -529,8 +529,7 @@ def _create_converter(obj, dataType):
elif hasattr(obj, "__dict__"): # object
conv = lambda o: [o.__dict__.get(n, None) for n in names]

nested = any(_has_struct(f.dataType) for f in dataType.fields)
if not nested:
if all(isinstance(f.dataType, PrimitiveType) for f in dataType.fields):
return conv

row = conv(obj)
Expand Down Expand Up @@ -1037,7 +1036,8 @@ def inferSchema(self, rdd):
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated")
warnings.warn("Using RDD of dict to inferSchema is deprecated,"
"please use pyspark.Row instead")

schema = _infer_schema(first)
rdd = rdd.mapPartitions(lambda rows: _drop_schema(rows, schema))
Expand Down