Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address the comments
  • Loading branch information
jerryshao committed Sep 18, 2016
commit bbf766308a3dca0266991179c0f8d378223664b0
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming

import java.io.IOException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit

import scala.reflect.ClassTag

Expand All @@ -29,15 +28,16 @@ import org.apache.spark.sql.SparkSession

/**
* An abstract class for compactible metadata logs. It will write one log file for each batch.
* The first line of the log file is the version number, and there are multiple JSON lines
* following.
* The first line of the log file is the version number, and there are multiple serialized
* metadata lines following.
*
* As reading from many small files is usually pretty slow, also too many
* small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will
* compact log files every 10 batches by default into a big file. When
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
abstract class CompactibleFileStreamLog[T: ClassTag](
metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
Expand All @@ -51,11 +51,11 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
protected val fileCleanupDelayMs = TimeUnit.MINUTES.toMillis(10)
protected def fileCleanupDelayMs: Long

protected val isDeletingExpiredLog = true
protected def isDeletingExpiredLog: Boolean

protected val compactInterval = 10
protected def compactInterval: Int

/**
* Serialize the data into encoded string.
Expand All @@ -68,12 +68,9 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
protected def deserializeData(encodedString: String): T

/**
* Filter out the unwanted logs, by default it filters out nothing, inherited class could
* override this method to do filtering.
* Filter out the obsolote logs.
*/
protected def compactLogs(oldLogs: Seq[T], newLogs: Seq[T]): Seq[T] = {
oldLogs ++ newLogs
}
def compactLogs(logs: Seq[T]): Seq[T]

override def batchIdToPath(batchId: Long): Path = {
if (isCompactionBatch(batchId, compactInterval)) {
Expand All @@ -97,7 +94,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
}

override def serialize(logData: Array[T]): Array[Byte] = {
(VERSION +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
(metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
}

override def deserialize(bytes: Array[Byte]): Array[T] = {
Expand All @@ -106,7 +103,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
throw new IllegalStateException("Incomplete log file")
}
val version = lines(0)
if (version != VERSION) {
if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
lines.slice(1, lines.length).map(deserializeData)
Expand All @@ -126,8 +123,8 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten
if (super.add(batchId, compactLogs(allLogs, logs).toArray)) {
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs).toArray)) {
if (isDeletingExpiredLog) {
deleteExpiredLog(batchId)
}
Expand All @@ -148,18 +145,18 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
// race condition.
while (true) {
if (latestId >= 0) {
val startId = getAllValidBatches(latestId, compactInterval)(0)
try {
val logs = super.get(Some(startId), Some(latestId)).flatMap(_._2)
return compactLogs(logs, Seq.empty).toArray
val logs =
getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten
return compactLogs(logs).toArray
} catch {
case e: IOException =>
// Another process using `FileStreamSink` may delete the batch files when
// Another process using `CompactibleFileStreamLog` may delete the batch files when
// `StreamFileCatalog` are reading. However, it only happens when a compaction is
// deleting old files. If so, let's try the next compaction batch and we should find it.
// Otherwise, this is a real IO issue and we should throw it.
latestId = nextCompactionBatchId(latestId, compactInterval)
get(latestId).getOrElse {
super.get(latestId).getOrElse {
throw e
}
}
Expand Down Expand Up @@ -197,7 +194,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
}

object CompactibleFileStreamLog {
val VERSION = "v1"
val COMPACT_FILE_SUFFIX = ".compact"

def getBatchIdFromFileName(fileName: String): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class FileStreamSink(

private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val fs = basePath.getFileSystem(hadoopConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.streaming

import java.io.IOException

import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
Expand Down Expand Up @@ -78,8 +76,11 @@ object SinkFileStatus {
* When the reader uses `allFiles` to list all files, this method only returns the visible files
* (drops the deleted files).
*/
class FileStreamSinkLog(sparkSession: SparkSession, path: String)
extends CompactibleFileStreamLog[SinkFileStatus](sparkSession, path) {
class FileStreamSinkLog(
metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {

private implicit val formats = Serialization.formats(NoTypeHints)

Expand All @@ -103,26 +104,18 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
read[SinkFileStatus](encodedString)
}

protected override def compactLogs(
oldLogs: Seq[SinkFileStatus], newLogs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
FileStreamSinkLog.compactLogs(oldLogs ++ newLogs)
}
}

object FileStreamSinkLog {
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"

/**
* Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to
* the log in future.
*/
def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
logs.filter(f => !deletedFiles.contains(f.path))
}
}
}

object FileStreamSinkLog {
val VERSION = "v1"
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.streaming
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -50,7 +47,8 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val metadataLog = new FileStreamSourceLog(sparkSession, metadataPath)
private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
Expand Down Expand Up @@ -97,7 +95,7 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles.toArray)
metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

Expand Down Expand Up @@ -173,12 +171,9 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

// Default action when `FileEntry` is persisted into log.
val ADD_ACTION = "add"
// Action when `FileEntry` is compacted.
val COMPACT_ACTION = "compact"
val NOT_SET = -1L

case class FileEntry(path: String, timestamp: Timestamp, action: String = ADD_ACTION)
case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
extends Serializable

/**
Expand Down Expand Up @@ -238,48 +233,4 @@ object FileStreamSource {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
}
}

class FileStreamSourceLog(sparkSession: SparkSession, path: String)
extends CompactibleFileStreamLog[FileEntry](sparkSession, path) {

// Configurations about metadata compaction
protected override val compactInterval =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
s"positive value.")

protected override val fileCleanupDelayMs =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)

protected override val isDeletingExpiredLog =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)

private implicit val formats = Serialization.formats(NoTypeHints)

protected override def serializeData(data: FileEntry): String = {
Serialization.write(data)
}

def deserializeData(encodedString: String): FileEntry = {
Serialization.read[FileEntry](encodedString)
}

protected override def compactLogs(
oldLogs: Seq[FileEntry], newLogs: Seq[FileEntry]): Seq[FileEntry] = {
// Change the action of old file entry into COMPACT, so when fetching these out, they will
// be filtered out to avoid processing again.
oldLogs.map(e => FileEntry(e.path, e.timestamp, COMPACT_ACTION)) ++ newLogs
}

override def get(
startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = {
super.get(startId, endId).map { case (id, entries) =>
// Keep only the file entries in which the action is ADD, this will keep the consistency
// while retrieving again after compaction.
val addedEntries = entries.filter(_.action == ADD_ACTION)
(id, addedEntries)
}
}
}
}
Loading