diff --git a/examples/src/main/python/avro_outputformat.py b/examples/src/main/python/avro_outputformat.py new file mode 100644 index 000000000000..d64439ddab04 --- /dev/null +++ b/examples/src/main/python/avro_outputformat.py @@ -0,0 +1,61 @@ +import sys +from pyspark import SparkContext +import avro.schema + +if __name__ == "__main__": + avroSchema = """{"type": "record", "name": "ProductFeed", "namespace": "com.rubikloud.entity", + "fields": [{"name": "A", "type": "string"}, + {"name": "B", "type": + {"type": "record", "name": "B", "fields": [ + {"name": "B1", "type": "int"}, + {"name": "B2", "type": "string"}, + {"name": "B3", "type": ["null", "long"]}] + } + }, + {"name": "C", "type": ["null", {"type": "array", "items": "string"}]} + ]}""" + + d = [{"A": "something1", + "B": { + "B1": 101, + "B2": "oneoone", + "B3": long(456) + } + }, + {"A": "something2", + "B": { + "B1": 202, + "B2": "twootwo" + }, + "C": ['asd', 'sdf'] + }] + + sc = SparkContext(appName="AvroKeyInputFormat") + + conf = { + "avro.schema.output.key": avroSchema + } + + to_avro_rdd = sc.parallelize(d) + + to_avro_rdd.map(lambda x: ((x, avroSchema), None)).saveAsNewAPIHadoopFile( + "test.out", + keyClass = "org.apache.avro.mapred.AvroKey", + valueClass = "org.apache.hadoop.io.NullWritable", + outputFormatClass = "org.apache.avro.mapreduce.AvroKeyOutputFormat", + keyConverter="org.apache.spark.examples.pythonconverters.JavaToAvroWrapperConverter", + conf=conf + ) + + b = sc.newAPIHadoopFile( + "test.out", + "org.apache.avro.mapreduce.AvroKeyInputFormat", + "org.apache.avro.mapred.AvroKey", + "org.apache.hadoop.io.NullWritable", + keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", + conf=None).collect() + + for k in b: + print 'read: %s' % str(k) + + sc.stop() \ No newline at end of file diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala index a11890d6f2b1..5b1caea70c98 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala @@ -17,12 +17,15 @@ package org.apache.spark.examples.pythonconverters -import java.util.{Collection => JCollection, Map => JMap} +import java.util.{Collection => JCollection, Map => JMap, HashMap => JHashMap, ArrayList => JArrayList} + +import org.apache.avro.Schema.Parser +import org.apache.avro.generic.GenericData.Record import scala.collection.JavaConversions._ -import org.apache.avro.generic.{GenericFixed, IndexedRecord} -import org.apache.avro.mapred.AvroWrapper +import org.apache.avro.generic.{GenericData, GenericFixed, IndexedRecord} +import org.apache.avro.mapred.{AvroKey, AvroWrapper} import org.apache.avro.Schema import org.apache.avro.Schema.Type._ @@ -110,6 +113,78 @@ object AvroConversionUtil extends Serializable { "Unions may only consist of a concrete type and null") } } + + def toAvro(value: Any, schema: Schema): Any = { + if (value == null) { + return null + } + + schema.getType match { + case UNION => packUnion(value, schema) + case ARRAY => remapArray(value, schema) + //case FIXED => unpackFixed(value, schema) + case MAP => packMap(value, schema) + //case BYTES => unpackBytes(value) + case RECORD => packRecord(value, schema) + case STRING => value.toString + case ENUM => value.toString + case NULL => value + case BOOLEAN => value + case DOUBLE => value + case FLOAT => value + case INT => value + case LONG => value + case other => throw new SparkException( + s"Unknown Avro schema type ${other.getName}") + } + } + + def packUnion(value: Any, schema: Schema): Any = { + schema.getTypes.toList match { + case List(s) => toAvro(value, s) + case List(n, s) if n.getType == NULL => toAvro(value, s) + case List(s, n) if n.getType == NULL => toAvro(value, s) + case _ => throw new SparkException( + "Unions may only consist of a concrete type and null") + } + } + + def packMap(value: Any, schema: Schema): Any = { + val result = new JHashMap[String, Any] + val map = value.asInstanceOf[JHashMap[String, Any]] + map.keySet().foreach(k => { + try { + val v = map.get(k) + result.put(k, toAvro(v, schema.getValueType)) + } catch { + case e: Exception => throw new SparkException( + s"Unknown Avro map key ${k} in ${map} with schema ${schema}", e) + } + }) + result + } + + def packRecord(value: Any, schema: Schema): Record = { + val map = value.asInstanceOf[JHashMap[String, Any]] + val record = new Record(schema) + + map.keySet().foreach(k => { + try { + val f = schema.getField(k) + val v = map.get(k) + record.put(f.pos(), toAvro(v, f.schema())) + } catch { + case e: Exception => throw new SparkException( + s"Unknown Avro record key ${k} in ${map} with schema ${schema}", e) + } + }) + record + } + + def remapArray(value: Any, schema: Schema): GenericData.Array[Any] = { + val array = value.asInstanceOf[JArrayList[Any]] + new GenericData.Array[Any](schema, array.map(e => toAvro(e, schema.getElementType))) + } } /** @@ -144,3 +219,18 @@ class AvroWrapperToJavaConverter extends Converter[Any, Any] { } } } + +class JavaToAvroWrapperConverter() extends Converter[Any, AvroKey[_]] { + override def convert(obj: Any): AvroKey[_] = { + if (obj == null) { + return null + } + + val args: Array[Any] = obj.asInstanceOf[Array[Any]] + val schema: Schema = (new Parser).parse(args(1).asInstanceOf[String]) + val value: Any = args(0) + + new AvroKey[Any](AvroConversionUtil.toAvro(value, schema)) + } +} +