Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reflect review comment, but a bit differently
  • Loading branch information
HeartSaVioR committed Oct 3, 2019
commit e0abbaecade12252d65b9c95de04146fd2741b97
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,17 @@ abstract class EventLogFileWriter(
CompressionCodec.getShortName(c.getClass.getName)
}

// Only defined if the file system scheme is not local
protected var hadoopDataStream: Option[FSDataOutputStream] = None
protected var writer: Option[PrintWriter] = None

protected def requireLogBaseDirAsDirectory(): Unit = {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
}

protected def initLogFile(path: Path): (Option[FSDataOutputStream], OutputStream) = {
protected def initLogFile(path: Path, fnSetupWriter: OutputStream => PrintWriter): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nicer with currying.

initLogFile(path) { stream =>
  // wrap the stream as needed
}

if (shouldOverwrite && fileSystem.delete(path, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the old code but seems to be somewhat incorrect. If the file exists and shouldOverwrite is false, this will not delete the file, as expected.

But later, when you do e.g. new FileOutputStream(uri.getPath), the existing file will be overwritten on most file systems (NTFS being the exception, I think).

No need to fix that here, though. Also because it's unlikely to happen given how we name files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Maybe we haven't defined proper behavior of this case: would we want to fail the application?

There might be similar case for if shouldOverwrite is true and fileSystem.delete() returns false. I see fileSystem.delete() will mostly throw IOException when it fails to delete, but at least in javadoc, having 'false' as return value when calling fileSystem.delete() may not only say the file doesn't exist. Javadoc doesn't guarantee that.

logWarning(s"Event log $path already exists. Overwriting...")
}
Expand All @@ -85,7 +89,6 @@ abstract class EventLogFileWriter(
val isDefaultLocal = defaultFs == null || defaultFs == "file"
val uri = path.toUri

var hadoopDataStream: Option[FSDataOutputStream] = None
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the old code, but use // for these kind of comments.

* Therefore, for local files, use FileOutputStream instead. */
val dstream =
Expand All @@ -103,14 +106,28 @@ abstract class EventLogFileWriter(
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS)
logInfo(s"Logging events to $path")
(hadoopDataStream, bstream)
writer = Some(fnSetupWriter(bstream))
} catch {
case e: Exception =>
dstream.close()
throw e
}
}

protected def writeJson(json: String, flushLogger: Boolean = false): Unit = {
// scalastyle:off println
writer.foreach(_.println(json))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}
}

protected def closeWriter(): Unit = {
writer.foreach(_.close())
}

protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = {
if (fileSystem.exists(dest)) {
if (overwrite) {
Expand Down Expand Up @@ -197,35 +214,23 @@ class SingleEventLogFileWriter(

private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

private var writer: Option[PrintWriter] = None

override def start(): Unit = {
requireLogBaseDirAsDirectory()

val (hadoopStream, outputStream) = initLogFile(new Path(inProgressPath))
hadoopDataStream = hadoopStream
writer = Some(new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)))
initLogFile(new Path(inProgressPath),
ostream => new PrintWriter(new OutputStreamWriter(ostream, StandardCharsets.UTF_8)))
}

override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = {
// scalastyle:off println
writer.foreach(_.println(eventJson))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}
writeJson(eventJson, flushLogger)
}

/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
*/
override def stop(): Unit = {
writer.foreach(_.close())
closeWriter()
renameFile(new Path(inProgressPath), new Path(logPath), shouldOverwrite)
}
}
Expand Down Expand Up @@ -292,10 +297,7 @@ class RollingEventLogFilesWriter(

private val logDirForAppPath = getAppEventLogDirPath(logBaseDir, appId, appAttemptId)

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
private var countingOutputStream: Option[CountingOutputStream] = None
private var writer: Option[PrintWriter] = None

// seq and event log path will be updated soon in rollEventLogFile, which `start` will call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seq ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh. Now I'm regretting I shouldn't either use abbr or change the term. Nice finding. Will change.

private var index: Long = 0L
Expand Down Expand Up @@ -325,31 +327,25 @@ class RollingEventLogFilesWriter(
}
}

// scalastyle:off println
writer.foreach(_.println(eventJson))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}
writeJson(eventJson, flushLogger)
}

private def rollEventLogFile(): Unit = {
writer.foreach(_.close())
closeWriter()

index += 1
currentEventLogFilePath = getEventLogFilePath(logDirForAppPath, appId, appAttemptId, index,
compressionCodecName)

val (hadoopStream, outputStream) = initLogFile(currentEventLogFilePath)
hadoopDataStream = hadoopStream
countingOutputStream = Some(new CountingOutputStream(outputStream))
writer = Some(new PrintWriter(
new OutputStreamWriter(countingOutputStream.get, StandardCharsets.UTF_8)))
initLogFile(currentEventLogFilePath, ostream => {
countingOutputStream = Some(new CountingOutputStream(ostream))
new PrintWriter(
new OutputStreamWriter(countingOutputStream.get, StandardCharsets.UTF_8))
})
}

override def stop(): Unit = {
writer.foreach(_.close())
closeWriter()
val appStatusPathIncomplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId,
inProgress = true)
val appStatusPathComplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId,
Expand Down