Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fixes SPARK-10136 and adds more tests
  • Loading branch information
liancheng committed Aug 20, 2015
commit 34547d693001ce3bea3758c831f07e8ec33def81
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
|
|Parquet form:
|$parquetRequestedSchema
|
|Catalyst form:
|$catalystRequestedSchema
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -145,7 +146,16 @@ private[parquet] class CatalystRowConverter(
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
extends CatalystGroupConverter(updater) {
extends CatalystGroupConverter(updater) with Logging {

logDebug(
s"""Building row converter for the following schema:
|
|Parquet form:
|$parquetType
|Catalyst form:
|${catalystType.prettyJson}
""".stripMargin)

/**
* Updater used together with field converters within a [[CatalystRowConverter]]. It propagates
Expand Down Expand Up @@ -464,9 +474,15 @@ private[parquet] class CatalystRowConverter(

override def getConverter(fieldIndex: Int): Converter = converter

override def end(): Unit = currentArray += currentElement
override def end(): Unit = {
converter.updater.end()
currentArray += currentElement
}

override def start(): Unit = currentElement = null
override def start(): Unit = {
converter.updater.start()
currentElement = null
}
}
}

Expand Down
19 changes: 18 additions & 1 deletion sql/core/src/test/avro/parquet-compat.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,40 @@ protocol CompatibilityTest {
string nested_string_column;
}

record ParquetAvroCompat {
record AvroPrimitives {
boolean bool_column;
int int_column;
long long_column;
float float_column;
double double_column;
bytes binary_column;
string string_column;
}

record AvroOptionalPrimitives {
union { null, boolean } maybe_bool_column;
union { null, int } maybe_int_column;
union { null, long } maybe_long_column;
union { null, float } maybe_float_column;
union { null, double } maybe_double_column;
union { null, bytes } maybe_binary_column;
union { null, string } maybe_string_column;
}

record AvroNonNullableArrays {
array<string> strings_column;
union { null, array<int> } maybe_ints_column;
}

record AvroArrayOfArray {
array<array<int>> int_arrays_column;
}

record AvroMapOfArray {
map<array<int>> string_to_ints_column;
}

record ParquetAvroCompat {
array<string> strings_column;
map<int> string_to_int_column;
map<array<Nested>> complex_column;
Expand Down
54 changes: 52 additions & 2 deletions sql/core/src/test/avro/parquet-compat.avpr
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
} ]
}, {
"type" : "record",
"name" : "ParquetAvroCompat",
"name" : "AvroPrimitives",
"fields" : [ {
"name" : "bool_column",
"type" : "boolean"
Expand All @@ -49,7 +49,11 @@
}, {
"name" : "string_column",
"type" : "string"
}, {
} ]
}, {
"type" : "record",
"name" : "AvroOptionalPrimitives",
"fields" : [ {
"name" : "maybe_bool_column",
"type" : [ "null", "boolean" ]
}, {
Expand All @@ -70,7 +74,53 @@
}, {
"name" : "maybe_string_column",
"type" : [ "null", "string" ]
} ]
}, {
"type" : "record",
"name" : "AvroNonNullableArrays",
"fields" : [ {
"name" : "strings_column",
"type" : {
"type" : "array",
"items" : "string"
}
}, {
"name" : "maybe_ints_column",
"type" : [ "null", {
"type" : "array",
"items" : "int"
} ]
} ]
}, {
"type" : "record",
"name" : "AvroArrayOfArray",
"fields" : [ {
"name" : "int_arrays_column",
"type" : {
"type" : "array",
"items" : {
"type" : "array",
"items" : "int"
}
}
} ]
}, {
"type" : "record",
"name" : "AvroMapOfArray",
"fields" : [ {
"name" : "string_to_ints_column",
"type" : {
"type" : "map",
"values" : {
"type" : "array",
"items" : "int"
}
}
} ]
}, {
"type" : "record",
"name" : "ParquetAvroCompat",
"fields" : [ {
"name" : "strings_column",
"type" : {
"type" : "array",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Autogenerated by Avro
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this so its included in 1.5, but why are we checking in auto generated code? Can't we build it on demand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding the first set of Parquet Avro/Thrift/Hive compatibility suites, I considered building these files on demand, but that complicates the build process. Especially, it requires developers and Jenkins to install Thrift binary executable for code generation (or is there any way to avoid this?). Besides that, it's super tricky to compile the version of the Thrift binary executable Parquet uses under Mac because Mac lacks some C++ header files (I always compile parquet-thrift tests under a Ubuntu VM). That's why I decided to include the generated Java files. In this way, only developers who update these test suites need to install these extra dependencies.

*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class AvroArrayOfArray extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroArrayOfArray\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"int_arrays_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"array\",\"items\":\"int\"}}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.util.List<java.util.List<java.lang.Integer>> int_arrays_column;

/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public AvroArrayOfArray() {}

/**
* All-args constructor.
*/
public AvroArrayOfArray(java.util.List<java.util.List<java.lang.Integer>> int_arrays_column) {
this.int_arrays_column = int_arrays_column;
}

public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return int_arrays_column;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: int_arrays_column = (java.util.List<java.util.List<java.lang.Integer>>)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}

/**
* Gets the value of the 'int_arrays_column' field.
*/
public java.util.List<java.util.List<java.lang.Integer>> getIntArraysColumn() {
return int_arrays_column;
}

/**
* Sets the value of the 'int_arrays_column' field.
* @param value the value to set.
*/
public void setIntArraysColumn(java.util.List<java.util.List<java.lang.Integer>> value) {
this.int_arrays_column = value;
}

/** Creates a new AvroArrayOfArray RecordBuilder */
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder newBuilder() {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder();
}

/** Creates a new AvroArrayOfArray RecordBuilder by copying an existing Builder */
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder(other);
}

/** Creates a new AvroArrayOfArray RecordBuilder by copying an existing AvroArrayOfArray instance */
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder(other);
}

/**
* RecordBuilder for AvroArrayOfArray instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<AvroArrayOfArray>
implements org.apache.avro.data.RecordBuilder<AvroArrayOfArray> {

private java.util.List<java.util.List<java.lang.Integer>> int_arrays_column;

/** Creates a new Builder */
private Builder() {
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.SCHEMA$);
}

/** Creates a Builder by copying an existing Builder */
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder other) {
super(other);
if (isValidValue(fields()[0], other.int_arrays_column)) {
this.int_arrays_column = data().deepCopy(fields()[0].schema(), other.int_arrays_column);
fieldSetFlags()[0] = true;
}
}

/** Creates a Builder by copying an existing AvroArrayOfArray instance */
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray other) {
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.SCHEMA$);
if (isValidValue(fields()[0], other.int_arrays_column)) {
this.int_arrays_column = data().deepCopy(fields()[0].schema(), other.int_arrays_column);
fieldSetFlags()[0] = true;
}
}

/** Gets the value of the 'int_arrays_column' field */
public java.util.List<java.util.List<java.lang.Integer>> getIntArraysColumn() {
return int_arrays_column;
}

/** Sets the value of the 'int_arrays_column' field */
public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder setIntArraysColumn(java.util.List<java.util.List<java.lang.Integer>> value) {
validate(fields()[0], value);
this.int_arrays_column = value;
fieldSetFlags()[0] = true;
return this;
}

/** Checks whether the 'int_arrays_column' field has been set */
public boolean hasIntArraysColumn() {
return fieldSetFlags()[0];
}

/** Clears the value of the 'int_arrays_column' field */
public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray.Builder clearIntArraysColumn() {
int_arrays_column = null;
fieldSetFlags()[0] = false;
return this;
}

@Override
public AvroArrayOfArray build() {
try {
AvroArrayOfArray record = new AvroArrayOfArray();
record.int_arrays_column = fieldSetFlags()[0] ? this.int_arrays_column : (java.util.List<java.util.List<java.lang.Integer>>) defaultValue(fields()[0]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}
Loading