Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions examples/src/main/python/avro_outputformat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sys
from pyspark import SparkContext
import avro.schema

if __name__ == "__main__":
avroSchema = """{"type": "record", "name": "ProductFeed", "namespace": "com.rubikloud.entity",
"fields": [{"name": "A", "type": "string"},
{"name": "B", "type":
{"type": "record", "name": "B", "fields": [
{"name": "B1", "type": "int"},
{"name": "B2", "type": "string"},
{"name": "B3", "type": ["null", "long"]}]
}
},
{"name": "C", "type": ["null", {"type": "array", "items": "string"}]}
]}"""

d = [{"A": "something1",
"B": {
"B1": 101,
"B2": "oneoone",
"B3": long(456)
}
},
{"A": "something2",
"B": {
"B1": 202,
"B2": "twootwo"
},
"C": ['asd', 'sdf']
}]

sc = SparkContext(appName="AvroKeyInputFormat")

conf = {
"avro.schema.output.key": avroSchema
}

to_avro_rdd = sc.parallelize(d)

to_avro_rdd.map(lambda x: ((x, avroSchema), None)).saveAsNewAPIHadoopFile(
"test.out",
keyClass = "org.apache.avro.mapred.AvroKey",
valueClass = "org.apache.hadoop.io.NullWritable",
outputFormatClass = "org.apache.avro.mapreduce.AvroKeyOutputFormat",
keyConverter="org.apache.spark.examples.pythonconverters.JavaToAvroWrapperConverter",
conf=conf
)

b = sc.newAPIHadoopFile(
"test.out",
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=None).collect()

for k in b:
print 'read: %s' % str(k)

sc.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.examples.pythonconverters

import java.util.{Collection => JCollection, Map => JMap}
import java.util.{Collection => JCollection, Map => JMap, HashMap => JHashMap, ArrayList => JArrayList}

import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData.Record

import scala.collection.JavaConversions._

import org.apache.avro.generic.{GenericFixed, IndexedRecord}
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.generic.{GenericData, GenericFixed, IndexedRecord}
import org.apache.avro.mapred.{AvroKey, AvroWrapper}
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._

Expand Down Expand Up @@ -110,6 +113,78 @@ object AvroConversionUtil extends Serializable {
"Unions may only consist of a concrete type and null")
}
}

def toAvro(value: Any, schema: Schema): Any = {
if (value == null) {
return null
}

schema.getType match {
case UNION => packUnion(value, schema)
case ARRAY => remapArray(value, schema)
//case FIXED => unpackFixed(value, schema)
case MAP => packMap(value, schema)
//case BYTES => unpackBytes(value)
case RECORD => packRecord(value, schema)
case STRING => value.toString
case ENUM => value.toString
case NULL => value
case BOOLEAN => value
case DOUBLE => value
case FLOAT => value
case INT => value
case LONG => value
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
}
}

def packUnion(value: Any, schema: Schema): Any = {
schema.getTypes.toList match {
case List(s) => toAvro(value, s)
case List(n, s) if n.getType == NULL => toAvro(value, s)
case List(s, n) if n.getType == NULL => toAvro(value, s)
case _ => throw new SparkException(
"Unions may only consist of a concrete type and null")
}
}

def packMap(value: Any, schema: Schema): Any = {
val result = new JHashMap[String, Any]
val map = value.asInstanceOf[JHashMap[String, Any]]
map.keySet().foreach(k => {
try {
val v = map.get(k)
result.put(k, toAvro(v, schema.getValueType))
} catch {
case e: Exception => throw new SparkException(
s"Unknown Avro map key ${k} in ${map} with schema ${schema}", e)
}
})
result
}

def packRecord(value: Any, schema: Schema): Record = {
val map = value.asInstanceOf[JHashMap[String, Any]]
val record = new Record(schema)

map.keySet().foreach(k => {
try {
val f = schema.getField(k)
val v = map.get(k)
record.put(f.pos(), toAvro(v, f.schema()))
} catch {
case e: Exception => throw new SparkException(
s"Unknown Avro record key ${k} in ${map} with schema ${schema}", e)
}
})
record
}

def remapArray(value: Any, schema: Schema): GenericData.Array[Any] = {
val array = value.asInstanceOf[JArrayList[Any]]
new GenericData.Array[Any](schema, array.map(e => toAvro(e, schema.getElementType)))
}
}

/**
Expand Down Expand Up @@ -144,3 +219,18 @@ class AvroWrapperToJavaConverter extends Converter[Any, Any] {
}
}
}

class JavaToAvroWrapperConverter() extends Converter[Any, AvroKey[_]] {
override def convert(obj: Any): AvroKey[_] = {
if (obj == null) {
return null
}

val args: Array[Any] = obj.asInstanceOf[Array[Any]]
val schema: Schema = (new Parser).parse(args(1).asInstanceOf[String])
val value: Any = args(0)

new AvroKey[Any](AvroConversionUtil.toAvro(value, schema))
}
}