Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jan 22, 2015

This is a refactored fix based on @jerryshao 's PR #4037
This enabled deletion of old WAL files containing the received block data.
Improvements over #4037

  • Respecting the rememberDuration of all receiver streams. In [SPARK-5147][Streaming] Delete the received data WAL log periodically #4037, if there were two receiver streams with multiple remember durations, the deletion would have delete based on the shortest remember duration, thus deleting data prematurely for the receiver stream with longer remember duration.
  • Added unit test to test creation of receiver WAL, automatic deletion, and respecting of remember duration.

@jerryshao I am going to merge this ASAP to make it 1.2.1 Thanks for the initial draft of this PR. Made my job much easier.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25932 has started for PR 4149 at commit c4cf067.

  • This patch merges cleanly.

@tdas
Copy link
Contributor Author

tdas commented Jan 22, 2015

@pwendell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved the FakeReceiver out of the ReceiverSuite to avoid serialization issues. The diff is very confusing, so just compare directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to dig around and understand why ReceiverInfo's might have a null actor reference. There seems to be only one case when this happens and it's when an error is received. But in that case doesn't the error report itself come from the actor (so you can have a reference to the actor via the sender?).

At a minimum, should we maybe indicate in ReceiverInfo that users should defend against the possibility of the actor ref being null? And would it be possible to avoid the possibility of a null reference alltogether (not for this patch specifically, but in general).

@pwendell
Copy link
Contributor

Hey TD - I looked through this. The logic seems sound to me though admittedly I'm newer with this part of the code. I added some comments around clarity, but they weren't really specific to the patch's logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a doc here that explains that the max remember duration can be used to set a conservative threshold on when it is safe to remember older data. Otherwise it's not super obvious why callers are using this particular time.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25932 has finished for PR 4149 at commit c4cf067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25932/
Test PASSed.

@jerryshao
Copy link
Contributor

Thanks @tdas for your refactoring, I think we should fix this #4032 accordingly, otherwise we will still meet such kind of error when recovering from failure according to my test of your branch:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 73.0 failed 1 times, most recent failure: Lost task 0.0 in stage 73.0 (TID 166, localhost): java.io.FileNotFoundException: file:/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421893625401-1421893685401
    at org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:175)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:427)
    at org.apache.spark.streaming.util.HdfsUtils$.getInputStream(HdfsUtils.scala:45)
    at org.apache.spark.streaming.util.WriteAheadLogRandomReader.<init>(WriteAheadLogRandomReader.scala:32)
    at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:99)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

@tdas
Copy link
Contributor Author

tdas commented Jan 22, 2015

@pwendell Since I made changes only to the comments I am merging this.
@jerryshao I will take a look at that PR.

asfgit pushed a commit that referenced this pull request Jan 22, 2015
This is a refactored fix based on jerryshao 's PR #4037
This enabled deletion of old WAL files containing the received block data.
Improvements over #4037
- Respecting the rememberDuration of all receiver streams. In #4037, if there were two receiver streams with multiple remember durations, the deletion would have delete based on the shortest remember duration, thus deleting data prematurely for the receiver stream with longer remember duration.
- Added unit test to test creation of receiver WAL, automatic deletion, and respecting of remember duration.

jerryshao I am going to merge this ASAP to make it 1.2.1 Thanks for the initial draft of this PR. Made my job much easier.

Author: Tathagata Das <[email protected]>
Author: jerryshao <[email protected]>

Closes #4149 from tdas/SPARK-5147 and squashes the following commits:

730798b [Tathagata Das] Added comments.
c4cf067 [Tathagata Das] Minor fixes
2579b27 [Tathagata Das] Refactored the fix to make sure that the cleanup respects the remember duration of all the receiver streams
2736fd1 [jerryshao] Delete the old WAL log periodically

(cherry picked from commit 3027f06)
Signed-off-by: Tathagata Das <[email protected]>
@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25952 has started for PR 4149 at commit 730798b.

  • This patch merges cleanly.

@asfgit asfgit closed this in 3027f06 Jan 22, 2015
@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25952 has finished for PR 4149 at commit 730798b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25952/
Test PASSed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants