Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More conservative check against lastPurgeTimestamp
  • Loading branch information
petermaxlee committed Aug 23, 2016
commit a371f05843d1eae3ae09e21bbd1cedeffb19d0e2
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.spark.sql.types.StructType

/**
* A very simple source that reads files from the given directory as they appear.
*
* TODO: Clean up the metadata log files periodically.
*/
class FileStreamSource(
sparkSession: SparkSession,
Expand Down Expand Up @@ -183,15 +185,17 @@ object FileStreamSource {
/** Mapping from file to its timestamp. */
private val map = new java.util.HashMap[String, Timestamp]

private var lastTimestamp: Timestamp = 0L
/** Timestamp of the latest file. */
private var latestTimestamp: Timestamp = 0L

private def ageThreshold: Timestamp = lastTimestamp - maxAgeMs
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L

/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > lastTimestamp) {
lastTimestamp = file.timestamp
if (file.timestamp > latestTimestamp) {
latestTimestamp = file.timestamp
}
}

Expand All @@ -200,16 +204,19 @@ object FileStreamSource {
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
file.timestamp > ageThreshold && !map.containsKey(file.path)
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
}

/** Removes aged entries and returns the number of files removed. */
def purge(): Int = {
lastPurgeTimestamp = latestTimestamp - maxAgeMs
val iter = map.entrySet().iterator()
var count = 0
while (iter.hasNext) {
val entry = iter.next()
if (entry.getValue < lastTimestamp - maxAgeMs) {
if (entry.getValue < lastPurgeTimestamp) {
count += 1
iter.remove()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,20 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("e", 20)))
}

test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 20))
assert(map.size == 1)

// Timestamp 5 should still considered a new file because purge time should be 0
assert(map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))

// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
assert(!map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
}

}