Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[SPARK-17813][SQL][KAFKA] spacing
  • Loading branch information
koeninger committed Oct 27, 2016
commit 5e4b468111ec20f11fc352de4075637f12e3a499
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[kafka010] case class KafkaSource(
from: Map[TopicPartition, Long],
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap { case (tp, end) =>
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use 2 spaces

from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
val size = end - begin
Expand All @@ -166,7 +167,8 @@ private[kafka010] case class KafkaSource(
if (total < 1) {
until
} else {
until.map { case (tp, end) =>
until.map {
case (tp, end) =>
tp -> sizes.get(tp).map { size =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use 2 spaces

val begin = from.get(tp).getOrElse(fromNew(tp))
val prorate = limit * (size / total)
Expand Down