Skip to content

Commit 88e6bdb

Browse files
Attempting to fix loss of schema
1 parent 37e0a0a commit 88e6bdb

File tree

3 files changed

+15
-11
lines changed

3 files changed

+15
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ private[sql] case class ParquetRelation(val path: String)
5555
.getSchema
5656

5757
/** Attributes */
58+
// TODO: THIS POTENTIALLY LOOSES TYPE INFORMATION!!!!
59+
// e.g. short <-> INT32 and byte <-> INT32
5860
override val output =
5961
ParquetTypesConverter
6062
.convertToAttributes(parquetSchema)
@@ -132,7 +134,9 @@ private[sql] object ParquetRelation {
132134
}
133135
ParquetRelation.enableLogForwarding()
134136
ParquetTypesConverter.writeMetaData(attributes, path, conf)
135-
new ParquetRelation(path.toString)
137+
new ParquetRelation(path.toString) {
138+
override val output = attributes
139+
}
136140
}
137141

138142
private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
126126

127127
override def init(configuration: Configuration): WriteSupport.WriteContext = {
128128
//attributes = DataType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
129-
attributes = if (attributes == null) {
130-
RowWriteSupport.getSchema(configuration) match {
131-
case s: StructType => s.toAttributes
132-
case other => sys.error(s"Can convert $attributes to row")
133-
}
134-
} else attributes
129+
attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes
135130
schema = if (schema == null) ParquetTypesConverter.convertFromAttributes(attributes) else schema
136131
// ParquetTypesConverter.convertToAttributes(schema)
137132
log.debug(s"write support initialized for requested schema $attributes")
@@ -192,7 +187,9 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
192187
)
193188
)
194189
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
190+
case ShortType => writer.addInteger(value.asInstanceOf[Int])
195191
case LongType => writer.addLong(value.asInstanceOf[Long])
192+
case ByteType => writer.addInteger(value.asInstanceOf[Int])
196193
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
197194
case FloatType => writer.addFloat(value.asInstanceOf[Float])
198195
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
@@ -298,7 +295,9 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
298295
)
299296
)
300297
case IntegerType => writer.addInteger(record.getInt(index))
298+
case ShortType => writer.addInteger(record.getShort(index))
301299
case LongType => writer.addLong(record.getLong(index))
300+
case ByteType => writer.addInteger(record.getByte(index))
302301
case DoubleType => writer.addDouble(record.getDouble(index))
303302
case FloatType => writer.addFloat(record.getFloat(index))
304303
case BooleanType => writer.addBoolean(record.getBoolean(index))

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,13 +292,14 @@ private[parquet] object ParquetTypesConverter {
292292
}
293293

294294
def convertFromString(string: String): Seq[Attribute] = {
295-
val decoded: Array[Byte] = BaseEncoding.base64().decode(string)
296-
SparkSqlSerializer.deserialize(decoded)
295+
DataType(string) match {
296+
case s: StructType => s.toAttributes
297+
case other => sys.error(s"Can convert $string to row")
298+
}
297299
}
298300

299301
def convertToString(schema: Seq[Attribute]): String = {
300-
val serialized: Array[Byte] = SparkSqlSerializer.serialize(schema)
301-
BaseEncoding.base64().encode(serialized)
302+
StructType.fromAttributes(schema).toString
302303
}
303304

304305
def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration) {

0 commit comments

Comments
 (0)