Skip to content

Conversation

@fhan688
Copy link

@fhan688 fhan688 commented Sep 19, 2017

JIRA Issue:https://issues.apache.org/jira/browse/SPARK-22056

When spark streaming consuming data from Kafka in direct way , partition in Kafka and KafkaRDDPartition in spark streaming are now bijection. To enhance the computing ability of spark streaming, we always to increase the number of partitions in Kafka , but too many increments may lead problems in Kafka like leader selection.
So , we introduce a new mechanism that change bijection to one-to-many which controls by a new parameter named "topic.partition.subconcurrency". This mechanism will divide one KafkaRDDPartition to many according to the parameter in spark streaming side , thus will make spark streaming use computing resources more efficient and avoid the problems caused by increasing the Kafka partitions.

we test this in production , the processing capacity of spark streaming improves apparently.

@bjkonglu
Copy link

I tried this method . It worked well.

@fhan688 fhan688 changed the title [SPARK-22056] Add subconcurrency for KafkaRDDPartition [SPARK-22056][Streaming] Add subconcurrency for KafkaRDDPartition Sep 20, 2017
@jerryshao
Copy link
Contributor

jerryshao commented Sep 21, 2017

Will this break the assumption that one Kafka partition only map to one Spark partition?

@fhan688
Copy link
Author

fhan688 commented Sep 21, 2017

Yes. One Kafka partition will map to many Spark partitions, thus more executors can be used.

@jerryshao
Copy link
Contributor

Hi @loneknightpy , think a bit on your PR, I think this can also be done in the user side. User could create several threads in one task (RDD#mapPartitions) to consume the records concurrently, so such feature may not be so necessary to land in Spark's code. What do you think?

@fhan688
Copy link
Author

fhan688 commented Sep 26, 2017

lonelytrooper... : P will more executors be used in RDD#mapPartitions way ? I'll try that later to see if it works. I think if Spark provides a convenient way for this , it would help users a lot and reduce their work , that still make sense. LOL
Besides , this feature achieves very good performance promotion in our production env.

@jerryshao
Copy link
Contributor

Yes, I understand your scenario, but my concern is that your proposal is quite scenario specific, it may well serve your scenario, but somehow it breaks the design purpose of KafkaRDD. From my understanding lots of user use repartition or coalease to increase the parallelism, so somehow your problem can be solved by this.

@fhan688
Copy link
Author

fhan688 commented Sep 27, 2017

Hi Jerry, thank you so much for discussing! Actually, we tried 'repartition' before introducing this feature and for two reasons we give it up. First, it leads to shuffle which may influence a lot in real time applications. Second, performance promotion is quite limited in 'repartition' way. You mentioned the assumption at the front that one Kafka partition map to one Spark partition, I wonder why this assumption is so vital ?

@jerryshao
Copy link
Contributor

jerryshao commented Sep 27, 2017

This is because it is the only way to guarantee the ordering of data in Kafka partition mapping to Spark partition. Maybe some other users took as an assumption to write the code.

Let's see others' feedbacks. Ping @zsxwing @koeninger would you please weigh in this PR? Thanks!

@fhan688
Copy link
Author

fhan688 commented Sep 27, 2017

I guessed that.. This is true, this feature can not ensure the ordering of data in one Kafka partition, but quite a few applications(like dealing with logs) do not need strict order guarantee in one Kafka partition. if they want, just do not use this feature, otherwise, this will achieves good performance promotion. So I think this feature maybe not so scenario specific. : P

@fhan688
Copy link
Author

fhan688 commented Sep 27, 2017

Thank you so much for inviting more discussions!

@koeninger
Copy link
Contributor

Search Jira and the mailing list, this idea has been brought up multiple times. I don't think breaking fundamental assumptions of Kafka (one consumer thread per group per partition) is a good idea.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

ping @lonelytrooper for @koeninger's comment. Otherwise, let me propose to close this for now.

@asfgit asfgit closed this in 1a4fda8 Jul 19, 2018
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#17422
Closes apache#17619
Closes apache#18034
Closes apache#18229
Closes apache#18268
Closes apache#17973
Closes apache#18125
Closes apache#18918
Closes apache#19274
Closes apache#19456
Closes apache#19510
Closes apache#19420
Closes apache#20090
Closes apache#20177
Closes apache#20304
Closes apache#20319
Closes apache#20543
Closes apache#20437
Closes apache#21261
Closes apache#21726
Closes apache#14653
Closes apache#13143
Closes apache#17894
Closes apache#19758
Closes apache#12951
Closes apache#17092
Closes apache#21240
Closes apache#16910
Closes apache#12904
Closes apache#21731
Closes apache#21095

Added:
Closes apache#19233
Closes apache#20100
Closes apache#21453
Closes apache#21455
Closes apache#18477

Added:
Closes apache#21812
Closes apache#21787

Author: hyukjinkwon <[email protected]>

Closes apache#21781 from HyukjinKwon/closing-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants