Skip to content

Commit 2827bed

Browse files
MapR [SPARK-619] Move absent commits from 2.4.3 branch to 2.4.4 (apache#574)
* Adding SQL API to write to kafka from Spark (apache#567) * Branch 2.4.3 extended kafka and examples (apache#569) * The v2 API is in its own package - the v2 api is in a different package - the old functionality is available in a separated package * v2 API examples - All the examples are using the newest API. - I have removed the old examples since they are not relevant any more and the same functionality is shown in the new examples usin the new API. * MapR [SPARK-619] Move absent commits from 2.4.3 branch to 2.4.4
1 parent ca0c9c4 commit 2827bed

File tree

11 files changed

+294
-51
lines changed

11 files changed

+294
-51
lines changed

build/dev-build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ fi
1212

1313
if [ $? -ne 0 ]; then exit 1; fi
1414

15-
scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.0.1/jars
15+
scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.4.4/jars
1616
if [ $? -ne 0 ]; then exit 1; fi

examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,78 +17,83 @@
1717

1818
package org.apache.spark.examples.streaming
1919

20-
import java.util.{ Map => JMap }
20+
import java.util.{Map => JMap}
2121

2222
import org.apache.kafka.common.serialization.Serializer
23-
2423
import org.apache.spark.SparkConf
24+
25+
import scala.util.Random
2526
import org.apache.spark.rdd.RDD
2627
import org.apache.spark.streaming.{Seconds, StreamingContext}
2728
import org.apache.spark.streaming.dstream.{ConstantInputDStream, DStream}
29+
import org.apache.spark.streaming.kafka.v2.producer._
30+
import org.apache.spark.sql.{DataFrame, SparkSession, Row}
31+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, LongType}
2832

29-
class ItemJsonSerializer extends Serializer[Item] {
30-
override def configure(configs: JMap[String, _], isKey: Boolean): Unit = { /* NOP */ }
31-
32-
override def serialize(topic: String, data: Item): Array[Byte] = data.toString.getBytes
33-
34-
override def close(): Unit = { /* NOP */ }
35-
}
36-
37-
case class Item(id: Int, value: Int) {
38-
override def toString: String = s"""{"id":"$id","value":"$value"}"""
39-
}
40-
41-
/**
42-
* Produces messages to Kafka.
43-
* Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
44-
* <kafkaBrokers> is a list of one or more kafka brokers
45-
* <topics> is a list of one or more kafka topics
46-
* <numMessages> is the number of messages that the kafka producer should send
47-
*
48-
* Example:
49-
* `$ bin/run-example \
50-
* org.apache.spark.examples.streaming.KafkaProducerExample broker1,broker2 \
51-
* topic1,topic2 10`
52-
*/
53-
54-
// scalastyle:off println
55-
object KafkaProducerExample extends App {
56-
import org.apache.spark.streaming.kafka.producer._
33+
object KakfaProducerExample extends App {
5734

58-
if (args.length < 3) {
35+
if (args.length < 2) {
5936
System.err.println(s"""
60-
|Usage: Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
61-
| <kafkaBrokers> is a list of one or more kafka brokers
37+
|Usage: Usage: KafkaProducerExample <topics> <numMessages>
6238
| <topics> is a list of one or more kafka topics
6339
| <numMessages> is the number of messages that the kafka producer
6440
| should send
6541
""".stripMargin)
6642
System.exit(1)
6743
}
6844

69-
val Array(kafkaBrokers, topics, numMessages) = args
70-
71-
val batchTime = Seconds(2)
45+
val Array(topics, numMessages) = args
7246

7347
val sparkConf = new SparkConf()
7448
.set("spark.executor.memory", "1g")
7549
.set("spark.driver.memory", "1g")
7650
.setAppName(getClass.getCanonicalName)
51+
52+
implicit val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
53+
54+
val items = (0 until numMessages.toInt).map(_.toString)
55+
56+
val stringRDD: RDD[String] = sparkSession.sparkContext.parallelize(items)
57+
58+
// if we have RDD[String] we can write to kafka using the new API V2
59+
60+
stringRDD.sendToKafka(topics)
61+
62+
val rnd = new Random()
63+
64+
// create RDD of Rows
65+
val anotherRDD = stringRDD.map(s => Row(s, s.length, rnd.nextLong()))
66+
67+
val schema = new StructType()
68+
.add(StructField("value", StringType))
69+
.add(StructField("length", IntegerType))
70+
.add(StructField("some_long", LongType))
71+
72+
// create a dataframe with some schema
73+
val dataFrame: DataFrame = sparkSession.createDataFrame(anotherRDD, schema)
74+
75+
// any data frame can be easily written to Kafka
76+
dataFrame.sendToKafka(topics)
77+
78+
val intRDD: RDD[(Int, Int)] = sparkSession.sparkContext.parallelize(0 until numMessages.toInt).map(n => (n, n.toString.length))
79+
80+
val transformer = (v: (Int, Int)) => Row(v._1, v._2)
81+
82+
// given an RDD[A], a function A => Row and a schema, we can write to kafka easily
83+
intRDD.sendToKafka(topics, transformer, new StructType().add(StructField("value", IntegerType)).add(StructField("length", IntegerType)))
84+
85+
val batchTime = Seconds(2)
7786
val ssc = new StreamingContext(sparkConf, batchTime)
7887

79-
val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList)
80-
.withKeySerializer("org.apache.kafka.common.serialization.ByteArraySerializer")
81-
.withValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
88+
val stringStream: DStream[String] = new ConstantInputDStream[String](ssc, stringRDD)
89+
90+
stringStream.sendToKafka(topics)
8291

83-
val items = (0 until numMessages.toInt).map(i => Item(i, i).toString)
84-
val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items)
85-
val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD)
92+
val someStream = new ConstantInputDStream[(Int, Int)](ssc, intRDD)
8693

87-
dStream.foreachRDD(_.sendToKafka(topics, producerConf))
88-
dStream.count().print()
94+
someStream.sendToKafka(topics, transformer, new StructType().add(StructField("value", IntegerType)).add(StructField("length", IntegerType)))
8995

9096
ssc.start()
9197
ssc.awaitTermination()
92-
9398
ssc.stop(stopSparkContext = true, stopGracefully = true)
94-
}
99+
}

external/kafka-producer/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@
6464
<type>test-jar</type>
6565
<scope>test</scope>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.apache.spark</groupId>
69+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
70+
<version>${project.version}</version>
71+
</dependency>
6772
<dependency>
6873
<groupId>org.apache.kafka</groupId>
6974
<artifactId>kafka_${scala.binary.version}</artifactId>

external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.language.implicitConversions
2222
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
2323

2424
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.{DataFrame, SparkSession}
2526

2627
class RDDFunctions[T](rdd: RDD[T]) {
2728
def sendToKafka(topic: String, conf: ProducerConf): Unit = {
@@ -43,4 +44,4 @@ class PairRDDFunctions[K, V](rdd: RDD[(K, V)]) {
4344
}
4445
})
4546
}
46-
}
47+
}

external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import scala.language.implicitConversions
21-
2221
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.sql.{DataFrame, SparkSession, Row}
23+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
2324

2425
package object producer {
2526
implicit def toRDDFunctions[T](rdd: RDD[T]): RDDFunctions[T] =
2627
new RDDFunctions[T](rdd)
2728

2829
implicit def toPairRDDFunctions[K, V](rdd: RDD[(K, V)]):
29-
PairRDDFunctions[K, V] = new PairRDDFunctions[K, V](rdd)
30-
30+
PairRDDFunctions[K, V] = new PairRDDFunctions[K, V](rdd)
3131
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.apache.spark.streaming.kafka.producer.sql
2+
3+
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
4+
5+
private case class CommittedIds(partitionId: Int, ids: Set[String]) extends WriterCommitMessage
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.apache.spark.streaming.kafka.producer.sql
2+
3+
import java.util.concurrent.Future
4+
5+
import org.apache.spark.streaming.kafka.producer.sql.CommittedIds
6+
import org.apache.spark.internal.Logging
7+
import org.apache.spark.sql.Row
8+
import org.apache.spark.sql.catalyst.InternalRow
9+
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}
10+
import org.apache.spark.sql.types.{DataType, StringType, StructType}
11+
import org.apache.spark.streaming.kafka.producer.ProducerConf
12+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
13+
14+
15+
private class KafkaDataSourceWriter(topic: String, schema: StructType) extends DataSourceWriter with Logging {
16+
17+
private var globallyCommittedIds = List.empty[String]
18+
19+
override def createWriterFactory(): DataWriterFactory[InternalRow] = new KafkaDataWriterFactory(topic, schema)
20+
21+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
22+
23+
val ids = messages.foldLeft(Set.empty[String]) { case (acc, CommittedIds(partitionId, partitionIds)) =>
24+
log.info(s"PARTITION $partitionId HAS BEEN CONFIRMED BY DRIVER")
25+
26+
acc ++ partitionIds
27+
}
28+
29+
// Let's make sure this is thread-safe
30+
globallyCommittedIds = this.synchronized {
31+
globallyCommittedIds ++ ids
32+
}
33+
}
34+
35+
override def abort(messages: Array[WriterCommitMessage]): Unit = {
36+
log.info("JOB BEING ABORTED")
37+
}
38+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.apache.spark.streaming.kafka.producer.sql
2+
3+
import java.util.concurrent.Future
4+
import java.util.concurrent.Future
5+
6+
import org.apache.spark.streaming.kafka.producer.sql.CommittedIds
7+
import org.apache.spark.internal.Logging
8+
import org.apache.spark.sql.Row
9+
import org.apache.spark.sql.catalyst.InternalRow
10+
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}
11+
import org.apache.spark.sql.types.{DataType, StringType, StructType}
12+
import org.apache.spark.streaming.kafka.producer.ProducerConf
13+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
14+
15+
import scala.util.parsing.json.{JSONArray, JSONObject}
16+
17+
private class KafkaDataWriterFactory(topic: String, schema: StructType) extends DataWriterFactory[InternalRow] {
18+
19+
@transient private lazy val producerConf = new ProducerConf(
20+
bootstrapServers = "".split(",").toList)
21+
22+
@transient private lazy val producer = new KafkaProducer[String, String](producerConf.asJMap())
23+
24+
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = new DataWriter[InternalRow] with Logging {
25+
26+
private val writtenIds = scala.collection.mutable.ListBuffer.empty[Future[RecordMetadata]]
27+
28+
log.info(s"PROCESSING PARTITION ID: $partitionId ; TASK ID: $taskId")
29+
30+
override def write(record: InternalRow): Unit = {
31+
val data = record.toSeq(schema).toList
32+
33+
val map = schema.fields.zipWithIndex
34+
.map { case (field, idx) => (field.name, data(idx)) }
35+
.toMap
36+
37+
val json = toJson(map)
38+
39+
val task = producer.send(new ProducerRecord(topic, json.toString))
40+
41+
writtenIds.append(task)
42+
43+
}
44+
45+
46+
override def commit(): WriterCommitMessage = {
47+
val meta = writtenIds.map(_.get())
48+
49+
writtenIds.clear()
50+
CommittedIds(partitionId, meta.map(_.offset().toString).toSet)
51+
}
52+
53+
override def abort(): Unit = writtenIds.map(_.cancel(true))
54+
55+
private def toJson(arr: List[Any]): JSONArray = {
56+
JSONArray(arr.map {
57+
case (innerMap: Map[String, Any]) => toJson(innerMap)
58+
case (innerArray: List[Any]) => toJson(innerArray)
59+
case (other) => other
60+
})
61+
}
62+
63+
private def toJson(map: Map[String, Any]): JSONObject = {
64+
JSONObject(map.map {
65+
case (key, innerMap: Map[String, Any]) =>
66+
(key, toJson(innerMap))
67+
case (key, innerArray: List[Any]) =>
68+
(key, toJson(innerArray))
69+
case (key, other) =>
70+
(key, other)
71+
})
72+
}
73+
}
74+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.apache.spark.streaming.kafka.producer.sql
2+
3+
import java.util.Optional
4+
5+
import org.apache.spark.internal.Logging
6+
import org.apache.spark.sql.SaveMode
7+
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
8+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, WriteSupport}
9+
import org.apache.spark.sql.types.StructType
10+
11+
class KafkaWriter extends WriteSupport with Logging {
12+
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = {
13+
14+
val stream = options.get("path").get()
15+
16+
java.util.Optional.of(new KafkaDataSourceWriter(stream, schema))
17+
}
18+
}

0 commit comments

Comments
 (0)