Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED

override def compressedOutputStream(s: OutputStream): OutputStream = {
compressedOutputStream(s, syncFlush = false)
}

def compressedOutputStream(s: OutputStream, syncFlush: Boolean): OutputStream = {
val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
val syncFlush = false
new LZ4BlockOutputStream(
s,
blockSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,21 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes

val FILE_SINK_LOG_WRITE_METADATA_VERSION =
buildConf("spark.sql.streaming.fileSink.log.writeMetadataVersion")
.doc("The version of file stream sink log metadata. By default the version is set to " +
"the highest version current Spark handles, as higher version tends to be better in " +
"some aspects. You may want to set this to lower value when the outputs should be " +
"readable from lower version of Spark. " +
"Note that it doesn't 'rewrite' the old batch files: to ensure the metadata to be " +
"read by lower version of Spark, the metadata log should be written from the scratch, " +
"or at least one compact batch should be written with configured version. " +
"Available metadata versions: 1 (all versions) 2 (3.1.0+)")
.version("3.1.0")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createOptional

val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream source.")
Expand Down Expand Up @@ -2808,6 +2823,8 @@ class SQLConf extends Serializable with Logging {

def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)

def fileSinkWriteMetadataLogVersion: Option[Int] = getConf(FILE_SINK_LOG_WRITE_METADATA_VERSION)

def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)

def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@

package org.apache.spark.sql.execution.streaming

import java.io.{InputStream, IOException, OutputStream}
import java.io.{DataInputStream, DataOutputStream, InputStream, IOException, OutputStream}
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.mutable
import scala.io.{Source => IOSource}
import scala.reflect.ClassTag

import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.io.LZ4CompressionCodec
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{SizeEstimator, Utils}

Expand Down Expand Up @@ -68,6 +71,19 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

protected def defaultCompactInterval: Int

/**
* In some case, log files being written from the application A should be able to be read from
* application B, which Spark versions between twos may not be same. To support writing log file
* which is readable from lower version of Spark, this method receives additional metadata log
* version which will be only used for writing.
*
* Note that this class doesn't "rewrite" the old batch files: to ensure the metadata to be read
* by lower version of Spark, the metadata log should be written with proper version from the
* scratch, or at least one compact batch should be written with proper version. (so that reader
* will ignore previous batch logs which may be written with higher version)
*/
protected def writeMetadataLogVersion: Option[Int] = None

protected final lazy val compactInterval: Int = {
// SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval.
// If there are existing log entries, then we should ensure a compatible compactInterval
Expand Down Expand Up @@ -106,6 +122,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
interval
}

private val sparkConf = sparkSession.sparkContext.getConf

/**
* Filter out the obsolete logs.
*/
Expand All @@ -132,24 +150,88 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}
}

protected def serializeEntryToV2(data: T): Array[Byte]
protected def deserializeEntryFromV2(serialized: Array[Byte]): T

override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
val version = writeMetadataLogVersion.getOrElse(metadataLogVersion)
out.write(("v" + version).getBytes(UTF_8))
version match {
case 1 => serializeToV1(out, logData)
case 2 => serializeToV2(out, logData)
case _ =>
throw new IllegalStateException(s"UnsupportedLogVersion: unknown log version is provided" +
s", v$metadataLogVersion.")
}
}

private def serializeToV1(out: OutputStream, logData: Array[T]): Unit = {
logData.foreach { data =>
out.write('\n')
out.write(Serialization.write(data).getBytes(UTF_8))
}
}

private def serializeToV2(out: OutputStream, logData: Array[T]): Unit = {
out.write('\n')
val dos = compressStream(out)
if (logData.nonEmpty) {
logData.foreach { data =>
val serialized = serializeEntryToV2(data)
dos.writeInt(serialized.length)
dos.write(serialized)
}
}
dos.writeInt(-1)
dos.flush()
}

override def deserialize(in: InputStream): Array[T] = {
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
val line = readLine(in)
if (line == null || line.isEmpty) {
throw new IllegalStateException("Incomplete log file")
}
validateVersion(lines.next(), metadataLogVersion)

val version = parseVersion(line)
version match {
case 1 if version <= metadataLogVersion => deserializeFromV1(in)
case 2 if version <= metadataLogVersion => deserializeFromV2(in)
case version =>
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${metadataLogVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
}
}

private def deserializeFromV1(in: InputStream): Array[T] = {
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
lines.map(Serialization.read[T]).toArray
}

private def deserializeFromV2(in: InputStream): Array[T] = {
val list = new scala.collection.mutable.ArrayBuffer[T]

val dis = decompressStream(in)
var eof = false

while (!eof) {
val size = dis.readInt()
if (size == -1) {
eof = true
} else if (size < 0) {
throw new IOException(
s"Error to deserialize file: size cannot be $size")
} else {
val buffer = new Array[Byte](size)
ByteStreams.readFully(dis, buffer, 0, size)
list += deserializeEntryFromV2(buffer)
}
}

list.toArray
}

override def add(batchId: Long, logs: Array[T]): Boolean = {
val batchAdded =
if (isCompactionBatch(batchId, compactInterval)) {
Expand Down Expand Up @@ -284,6 +366,33 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}
}
}

private def readLine(in: InputStream): String = {
val line = new mutable.ArrayBuffer[Byte]()
var eol = false
while (!eol) {
val b = in.read()
if (b == -1 || b == '\n') {
eol = true
} else {
line += b.toByte
}
}

new String(line.toArray, UTF_8)
}

private def compressStream(outputStream: OutputStream): DataOutputStream = {
// set syncFlush to true since we don't call close for compressed stream but call flush instead
val compressed = new LZ4CompressionCodec(sparkConf)
.compressedOutputStream(outputStream, syncFlush = true)
new DataOutputStream(compressed)
}

private def decompressStream(inputStream: InputStream): DataInputStream = {
val compressed = new LZ4CompressionCodec(sparkConf).compressedInputStream(inputStream)
new DataInputStream(compressed)
}
}

object CompactibleFileStreamLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.net.URI

import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -97,6 +98,10 @@ class FileStreamSinkLog(
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

// The validation of version is done in SQLConf.
protected override val writeMetadataLogVersion: Option[Int] =
sparkSession.sessionState.conf.fileSinkWriteMetadataLogVersion

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
Expand All @@ -105,10 +110,45 @@ class FileStreamSinkLog(
logs.filter(f => !deletedFiles.contains(f.path))
}
}

override protected def serializeEntryToV2(data: SinkFileStatus): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val dos = new DataOutputStream(baos)

dos.writeUTF(data.path)
dos.writeLong(data.size)
dos.writeBoolean(data.isDir)
dos.writeLong(data.modificationTime)
dos.writeInt(data.blockReplication)
dos.writeLong(data.blockSize)
dos.writeUTF(data.action)
dos.close()

baos.toByteArray
}

override protected def deserializeEntryFromV2(serialized: Array[Byte]): SinkFileStatus = {
val bais = new ByteArrayInputStream(serialized)
val dis = new DataInputStream(bais)

val status = SinkFileStatus(
dis.readUTF(),
dis.readLong(),
dis.readBoolean(),
dis.readLong(),
dis.readInt(),
dis.readLong(),
dis.readUTF())

dis.close()

status
}
}

object FileStreamSinkLog {
val VERSION = 1
val VERSION = 2
val SUPPORTED_VERSIONS = Seq(1, 2)
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.util.{LinkedHashMap => JLinkedHashMap}
import java.util.Map.Entry

Expand Down Expand Up @@ -122,8 +123,34 @@ class FileStreamSourceLog(
}
batches
}

override protected def serializeEntryToV2(data: FileEntry): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val dos = new DataOutputStream(baos)

dos.writeUTF(data.path)
dos.writeLong(data.timestamp)
dos.writeLong(data.batchId)
dos.close()

baos.toByteArray
}

override protected def deserializeEntryFromV2(serialized: Array[Byte]): FileEntry = {
val bais = new ByteArrayInputStream(serialized)
val dis = new DataInputStream(bais)

val entry = FileEntry(
dis.readUTF(),
dis.readLong(),
dis.readLong())

dis.close()

entry
}
}

object FileStreamSourceLog {
val VERSION = 1
val VERSION = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* "v123xyz" etc.)
*/
private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int = {
val version = parseVersion(text)
if (version > maxSupportedVersion) {
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
}
version
}

private[sql] def parseVersion(text: String): Int = {
if (text.length > 0 && text(0) == 'v') {
val version =
try {
Expand All @@ -258,15 +268,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
s"version from $text.")
}
if (version > 0) {
if (version > maxSupportedVersion) {
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
} else {
return version
}
}

if (version > 0) return version
}

// reaching here means we failed to read the correct log version
Expand Down
Loading