Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
val metadata = Map(
"files" -> newFiles.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
val inputInfo = StreamInputInfo(id, 0, metadata)
val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will kick off a new Spark job to read files and count, which will bring in obvious overhead. Whereas count in DirectKafkaInputDStream only calculates offsets.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this little overhead, that 'Records' is not recorded? This is a obvious bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a small overhead. The changes will read/scan all the new files, this is a big overhead for streaming application (data is unnecessarily read twice).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'll try to make it read once. Can you give me some idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there's a solution to fix it here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asynchronous processing, does not affect the backbone of the Streaming job, also can get the number of records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in favor of such changes. No matter the process is sync or async, because reportInfo is invoked here, so you have to wait for the process to end.

Anyway I think reading twice is unacceptable for streaming scenario (even for batch scenario). I guess the previous code set to "0" by intention.

Copy link
Author

@guoxiaolongzte guoxiaolongzte Feb 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can add a switch parameter, the default value is false.

If it is true, then it needs to be count (read the file again), so that the records can be correctly counted.

Of course, it shows that when the parameter is opened to true, the streaming performance problem will be affected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea. Actually I'm incline of leaving as it is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very sad. I'm working on whether there's a better way.

ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
rdds
}
Expand Down