Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
hdfsmetadatalog
  • Loading branch information
ericm-db committed Jul 9, 2024
commit 77ffe9531c082ee6a47d2a62f86779ff4d957690
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
Expand All @@ -47,10 +48,25 @@ import org.apache.spark.util.ArrayImplicits._
*
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
* @param hadoopConf Hadoop configuration that is used to read / write metadata files.
* @param path Path to the directory that will be used for writing metadata.
* @param metadataCacheEnabled Whether to cache the batches' metadata in memory.
*/
class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
class HDFSMetadataLog[T <: AnyRef : ClassTag](
hadoopConf: Configuration,
path: String,
val metadataCacheEnabled: Boolean = false)
extends MetadataLog[T] with Logging {

def this(sparkSession: SparkSession, path: String) = {
this(
sparkSession.sessionState.newHadoopConf(),
path,
metadataCacheEnabled = sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_METADATA_CACHE_ENABLED)
)
}

private implicit val formats: Formats = Serialization.formats(NoTypeHints)

/** Needed to serialize type T into JSON when using Jackson */
Expand All @@ -64,15 +80,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val metadataPath = new Path(path)

protected val fileManager =
CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf())
CheckpointFileManager.create(metadataPath, hadoopConf)

if (!fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
}

protected val metadataCacheEnabled: Boolean
= sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)

/**
* Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
* when committing offsets, this cache will save some file system operations.
Expand Down