Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader}
import org.apache.spark._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import out of order


private[streaming]
class HDFSBackedBlockRDDPartition(
val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition {
val index = idx
}
val blockId: BlockId,
val index: Int,
val segment: WriteAheadLogFileSegment
) extends Partition

private[streaming]
class HDFSBackedBlockRDD[T: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to tie this to WriteAheadLogFileSegment. On one hand the naming HDFSBackedBlockRDD is supposed to be general, on the other you tie it to recovery through the use of WriteAheadLogFileSegment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add javadoc explaining what this class is for. If it is used for recovery, why should we put the blocks in block manager after using them? Shouldn't recovery data be used only once during a recovery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, how about renaming this class to more specific WriteAheadLogBasedBackedBlockRDD (kind-a-long).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could still have to use the recovered data if the same RDD is used for multiple operations, correct? Maybe I am mistaken, but if I do something like

rdd1 = hdfsRdd.<blah>
and
rdd2=hdfsRdd.<blah2>

wouldn't it be better if the data recovered is now in BlockManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to WriteAheadLogBasedBackedBlockRDD

Expand All @@ -42,13 +42,12 @@ class HDFSBackedBlockRDD[T: ClassTag](
val storageLevel: StorageLevel
) extends BlockRDD[T](sc, blockIds) {

if (blockIds.length != segments.length) {
throw new IllegalStateException("Number of block ids must be the same as number of segments!")
}
require(blockIds.length == segments.length,
"Number of block ids must be the same as number of segments!")

// Hadoop Configuration is not serializable, so broadcast it as a serializable.
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over in #2935, @davies is planning to add some code to SerializableWritable to address the Hadoop Configuration constructor thread-safety issue, so you shouldn't have to do it here once we've merged that patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to take the SerializableWritable as the argument in the constructor (as being done in #2935) or should we just take the hadoopConf and wrap it in the SerializableWritable once that is merged? We don't want to change the interface later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I am leaving this as is. Lets revisit this later if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private[this]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should definitely be private, @harishreedharan

.asInstanceOf[Broadcast[SerializableWritable[Configuration]]]

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.size).map { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array.tabulate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ package org.apache.spark.streaming.rdd
import java.io.File
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports out of order

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration

import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
import org.apache.spark.{SparkConf, SparkContext}

class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
Expand All @@ -51,6 +52,13 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
dir.delete()
}

override def afterAll(): Unit = {
// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not
// get imported properly by sbt even if it is created.
sparkContext.stop()
System.clearProperty("spark.driver.port")
}

test("Data available in BM and HDFS") {
doTestHDFSBackedRDD(5, 5, 20, 5)
}
Expand All @@ -70,8 +78,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
/**
* Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do u mean by "a part of all of them"?

* BlockManager, so all reads need not happen from HDFS.
* @param total - Total number of Strings to write
* @param blockCount - Number of blocks to write (therefore, total # of events per block =
* @param total Total number of Strings to write
* @param blockCount Number of blocks to write (therefore, total # of events per block =
* total/blockCount
*/
private def doTestHDFSBackedRDD(
Expand All @@ -81,8 +89,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
blockCount: Int
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to the previous line?

val countPerBlock = total / blockCount
val blockIds = (0 until blockCount).map {
i =>
val blockIds = (0 until blockCount).map { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Array.tabulate instead or Seq.tabulate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent off

}

Expand All @@ -95,16 +102,17 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
}
}

val segments = new ArrayBuffer[WriteAheadLogFileSegment]
if (writeToHDFSCount != 0) {
// Generate some fake segments for the blocks in BM so the RDD does not complain
segments ++= generateFakeSegments(writeToBMCount)
segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
blockIds.slice(writeToBMCount, blockCount))

} else {
segments ++= generateFakeSegments(blockCount)
val segments = {
if (writeToHDFSCount != 0) {
// Generate some fake segments for the blocks in BM so the RDD does not complain
generateFakeSegments(writeToBMCount) ++
writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
blockIds.slice(writeToBMCount, blockCount))
} else {
generateFakeSegments(blockCount)
}
}

val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
segments.toArray, false, StorageLevel.MEMORY_ONLY)

Expand All @@ -116,10 +124,9 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
/**
* Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that
* went into one block.
* @param count - Number of Strings to write
* @param countPerBlock - Number of Strings per block
* @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments,
* each representing the block being written to HDFS.
* @param count Number of Strings to write
* @param countPerBlock Number of Strings per block
* @return Seq of Seqs, each of these Seqs is one block
*/
private def generateData(
count: Int,
Expand All @@ -130,8 +137,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
}

private def writeDataToHDFS(
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
): Seq[WriteAheadLogFileSegment] = {
assert(blockData.size === blockIds.size)
val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
Expand Down