-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5775] [SQL] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table #4792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,6 +126,9 @@ private[sql] case class ParquetTableScan( | |
| conf) | ||
|
|
||
| if (requestedPartitionOrdinals.nonEmpty) { | ||
| // This check if based on CatalystConverter.createRootConverter. | ||
| val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) | ||
|
|
||
| baseRDD.mapPartitionsWithInputSplit { case (split, iter) => | ||
| val partValue = "([^=]+)=([^=]+)".r | ||
| val partValues = | ||
|
|
@@ -143,19 +146,47 @@ private[sql] case class ParquetTableScan( | |
| relation.partitioningAttributes | ||
| .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) | ||
|
|
||
| new Iterator[Row] { | ||
| def hasNext = iter.hasNext | ||
| def next() = { | ||
| val row = iter.next()._2.asInstanceOf[SpecificMutableRow] | ||
|
|
||
| // Parquet will leave partitioning columns empty, so we fill them in here. | ||
| var i = 0 | ||
| while (i < requestedPartitionOrdinals.size) { | ||
| row(requestedPartitionOrdinals(i)._2) = | ||
| partitionRowValues(requestedPartitionOrdinals(i)._1) | ||
| i += 1 | ||
| if (primitiveRow) { | ||
| new Iterator[Row] { | ||
| def hasNext = iter.hasNext | ||
| def next() = { | ||
| // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. | ||
| val row = iter.next()._2.asInstanceOf[SpecificMutableRow] | ||
|
|
||
| // Parquet will leave partitioning columns empty, so we fill them in here. | ||
| var i = 0 | ||
| while (i < requestedPartitionOrdinals.size) { | ||
| row(requestedPartitionOrdinals(i)._2) = | ||
| partitionRowValues(requestedPartitionOrdinals(i)._1) | ||
| i += 1 | ||
| } | ||
| row | ||
| } | ||
| } | ||
| } else { | ||
| // Create a mutable row since we need to fill in values from partition columns. | ||
| val mutableRow = new GenericMutableRow(output.size) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems it will be safer if we get |
||
| new Iterator[Row] { | ||
| def hasNext = iter.hasNext | ||
| def next() = { | ||
| // We are using CatalystGroupConverter and it returns a GenericRow. | ||
| // Since GenericRow is not mutable, we just cast it to a Row. | ||
| val row = iter.next()._2.asInstanceOf[Row] | ||
|
|
||
| var i = 0 | ||
| while (i < row.size) { | ||
| mutableRow(i) = row(i) | ||
| i += 1 | ||
| } | ||
| // Parquet will leave partitioning columns empty, so we fill them in here. | ||
| i = 0 | ||
| while (i < requestedPartitionOrdinals.size) { | ||
| mutableRow(requestedPartitionOrdinals(i)._2) = | ||
| partitionRowValues(requestedPartitionOrdinals(i)._1) | ||
| i += 1 | ||
| } | ||
| mutableRow | ||
| } | ||
| row | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -476,23 +476,53 @@ private[sql] case class ParquetRelation2( | |
| // When the data does not include the key and the key is requested then we must fill it in | ||
| // based on information from the input split. | ||
| if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) { | ||
| // This check if based on CatalystConverter.createRootConverter. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
| val primitiveRow = | ||
| requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) | ||
|
|
||
| baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) => | ||
| val partValues = selectedPartitions.collectFirst { | ||
| case p if split.getPath.getParent.toString == p.path => p.values | ||
| }.get | ||
|
|
||
| val requiredPartOrdinal = partitionKeyLocations.keys.toSeq | ||
|
|
||
| iterator.map { pair => | ||
| val row = pair._2.asInstanceOf[SpecificMutableRow] | ||
| var i = 0 | ||
| while (i < requiredPartOrdinal.size) { | ||
| // TODO Avoids boxing cost here! | ||
| val partOrdinal = requiredPartOrdinal(i) | ||
| row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) | ||
| i += 1 | ||
| if (primitiveRow) { | ||
| iterator.map { pair => | ||
| // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. | ||
| val row = pair._2.asInstanceOf[SpecificMutableRow] | ||
| var i = 0 | ||
| while (i < requiredPartOrdinal.size) { | ||
| // TODO Avoids boxing cost here! | ||
| val partOrdinal = requiredPartOrdinal(i) | ||
| row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) | ||
| i += 1 | ||
| } | ||
| row | ||
| } | ||
| } else { | ||
| // Create a mutable row since we need to fill in values from partition columns. | ||
| val mutableRow = new GenericMutableRow(requestedSchema.size) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems it will be safer if we get
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not necessary. Unlike |
||
| iterator.map { pair => | ||
| // We are using CatalystGroupConverter and it returns a GenericRow. | ||
| // Since GenericRow is not mutable, we just cast it to a Row. | ||
| val row = pair._2.asInstanceOf[Row] | ||
| var i = 0 | ||
| while (i < row.size) { | ||
| // TODO Avoids boxing cost here! | ||
| mutableRow(i) = row(i) | ||
| i += 1 | ||
| } | ||
|
|
||
| i = 0 | ||
| while (i < requiredPartOrdinal.size) { | ||
| // TODO Avoids boxing cost here! | ||
| val partOrdinal = requiredPartOrdinal(i) | ||
| mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) | ||
| i += 1 | ||
| } | ||
| mutableRow | ||
| } | ||
| row | ||
| } | ||
| } | ||
| } else { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a typo at here... if => is.