1515# limitations under the License.
1616#
1717
18-
1918from py4j .java_collections import MapConverter
2019from py4j .java_gateway import java_import , Py4JError
2120
2221from pyspark .storagelevel import StorageLevel
23- from pyspark .serializers import PairDeserializer , UTF8Deserializer
22+ from pyspark .serializers import PairDeserializer , NoOpSerializer
2423from pyspark .streaming import DStream
2524
2625__all__ = ['KafkaUtils' ]
2726
2827
28+ def utf8_decoder (s ):
29+ return s .decode ('utf-8' )
30+
31+
2932class KafkaUtils (object ):
3033
3134 @staticmethod
3235 def createStream (ssc , zkQuorum , groupId , topics ,
3336 storageLevel = StorageLevel .MEMORY_AND_DISK_SER_2 ,
34- keyDecoder = None , valueDecoder = None ):
37+ keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
3538 """
3639 Create an input stream that pulls messages from a Kafka Broker.
3740
@@ -47,26 +50,32 @@ def createStream(ssc, zkQuorum, groupId, topics,
4750 """
4851 java_import (ssc ._jvm , "org.apache.spark.streaming.kafka.KafkaUtils" )
4952
53+ param = {
54+ "zookeeper.connect" : zkQuorum ,
55+ "group.id" : groupId ,
56+ "zookeeper.connection.timeout.ms" : "10000" ,
57+ }
5058 if not isinstance (topics , dict ):
5159 raise TypeError ("topics should be dict" )
5260 jtopics = MapConverter ().convert (topics , ssc .sparkContext ._gateway ._gateway_client )
61+ jparam = MapConverter ().convert (param , ssc .sparkContext ._gateway ._gateway_client )
5362 jlevel = ssc ._sc ._getJavaStorageLevel (storageLevel )
63+
64+ def getClassByName (name ):
65+ return ssc ._jvm .org .apache .spark .util .Utils .classForName (name )
66+
5467 try :
55- jstream = ssc ._jvm .KafkaUtils .createStream (ssc ._jssc , zkQuorum , groupId , jtopics ,
56- jlevel )
68+ array = getClassByName ("[B" )
69+ decoder = getClassByName ("kafka.serializer.DefaultDecoder" )
70+ jstream = ssc ._jvm .KafkaUtils .createStream (ssc ._jssc , array , array , decoder , decoder ,
71+ jparam , jtopics , jlevel )
5772 except Py4JError , e :
5873 if 'call a package' in e .message :
5974 print "No kafka package, please build it and add it into classpath:"
6075 print " $ sbt/sbt streaming-kafka/package"
61- print " $ bin/submit --driver-class-path external/kafka/target/scala-2.10/" \
62- "spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
63- raise Exception ("No kafka package" )
76+ print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \
77+ "external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
6478 raise e
65- ser = PairDeserializer (UTF8Deserializer (), UTF8Deserializer ())
79+ ser = PairDeserializer (NoOpSerializer (), NoOpSerializer ())
6680 stream = DStream (jstream , ssc , ser )
67-
68- if keyDecoder is not None :
69- stream = stream .map (lambda (k , v ): (keyDecoder (k ), v ))
70- if valueDecoder is not None :
71- stream = stream .mapValues (valueDecoder )
72- return stream
81+ return stream .map (lambda (k , v ): (keyDecoder (k ), valueDecoder (v )))
0 commit comments