Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] class HdfsSequentialReader(val path: String) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets name it WriteAheadLogReader. And the other one WriteAheadLogWriter. It does not need to be specific to HDFS.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and WriteAheadLogRandomReader?


val instream = HdfsUtils.getInputStream(path)
var closed = false

def hasNext: Boolean = {
assertOpen()
synchronized {
instream.available() != 0
}
}

def readNext(): Array[Byte] = {
assertOpen()
// TODO: Possible error case where there are not enough bytes in the stream
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation has the not-so-rare corner case where hasNext return true, but readNext() fails. Its better, to actually read the record in hasNext, and then return true. The record will be stored and returned when readNext() is called (or name it next()).

Additionally you can implement a Scala iterator, which allows nice iterator.foreach(array => ... ) downstream. The hasNext and next() will do what i suggested above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did notice the issue, but was not sure whether to make the hasNext an expensive call where we read it (that is how it is usually done in C++ and Java I think).

The iterator idea sounds good. I will do that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// TODO: How to handle that?
synchronized {
val length = instream.readInt()
HdfsUtils.checkState(length <= instream.available(), "Not enough data found in file!")
val buffer = new Array[Byte](length)
instream.readFully(buffer)
buffer
}
}

def close() {
closed = true
instream.close()
}

def assertOpen() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the point of checkign assertOpen() before doing readNext()? There can be very easily a race condition where this assert true, and the stream is closed in the middle of instream.readInt() on line 36

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. The close and assert should be synchronized too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the " +
"file.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, FSDataOutputStream, Path}

private[streaming] object HdfsUtils {

def getOutputStream(path: String): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val conf = new Configuration()
val dfs =
this.synchronized {
dfsPath.getFileSystem(new Configuration())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the conf defined above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (conf.getBoolean("hdfs.append.support", false) && dfs.isFile(dfsPath)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you ever have to append? Isnt it fine to create new files? We are have to create new files at some not-so-frequent intervals (say 1 minute), because we also need to delete old files, containing data that is not needed any more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to be as generic as possible here. If the file already exists, the create call will fail. Since the file name is decided a layer above this one, whatever class is writing to HDFS can decide whether to append to a file or to create a new one. If in some case, we create new instances of this class with the same file name, we can still write data, just appending to an existing file. I don't see anything wrong with having this supported - if it is used, great else this code path is not exercised at all.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what happens when the file exists and append support isnt present? It will throw an error, right? And that may very well happen on some deployments. So any ways, all modules that use this have to any way be written such that filenames are unique, so that it does not fail on certain systems. Isnt it?

I dont mind the code, since it is a small piece of code. Just that it doesnt really help us in any way and is therefore redundant.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since hadoop-.20 or so append is supported by default. I have never come across a situation where append has not been enabled since like 2010. So I don't see that as a major concern. If needed, we can resolve it later.

dfs.append(dfsPath)
} else {
dfs.create(dfsPath)
}
}
stream
}

def getInputStream(path: String): FSDataInputStream = {
val dfsPath = new Path(path)
val conf = new Configuration()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf not used anywhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

val dfs = this.synchronized {
dfsPath.getFileSystem(new Configuration())
}
val instream = dfs.open(dfsPath)
instream
}

def checkState(state: Boolean, errorMsg: => String) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really need to be a separate function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid the String concat cost. This accomplishes the same thing as Preconditions.checkArgument in Guava, but that being Java does not have pass by name.

if(!state) {
throw new IllegalStateException(errorMsg)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] class HdfsWalRandomReader(val path: String) {

val instream = HdfsUtils.getInputStream(path)
var closed = false

def read(segment: FileSegment): Array[Byte] = {
assertOpen()
synchronized {
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
}
}

def close() {
closed = true
instream.close()
}

def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] class HdfsWalWriter(val path: String) {
val stream = HdfsUtils.getOutputStream(path)
var nextOffset = stream.getPos
var closed = false

// Data is always written as:
// - Length - Long
// - Data - of length = Length
def write(data: Array[Byte]): FileSegment = {
assertOpen()
synchronized {
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
nextOffset = stream.getPos
segment
}
}

def close(): Unit = {
closed = true
stream.close()
}

def assertOpen() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make this private. Downstream modules should not be able to see this. Same for the other classes. Even if they are all private to spark, its best to be disciplined about the method visibility. Who knows in future we may want to abstract out the WAL as a pure interface and have different implementations of it (HDFS, HBase, etc.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
}
}