Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch proposes adding option in file stream sink to retain only the last batch for file log (metadata). This would help on the case where query is outputting plenty of files per each batch, which compacting metadata files into one could bring non-trivial overhead.

Please refer the comment in JIRA issue for more details on the overhead current file stream sink metadata and file stream source metadata file index can bring to high-volume and long-run queries.

As this patch purges old batches and retains only last batch in metadata, metadata file index fails to construct list of files when we enable this option, and as a result file (stream) source cannot read the output directory. To re-enable reading from the output directory, this patch also proposes to add option in file (stream) source which ignores metadata information when reading directory. With this option, end users can also choose the faster one between in-memory file index and metadata file index when metadata file gets much bigger.

How was this patch tested?

Added unit tests.

href="api/R/read.stream.html">R</a>).
E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code>.
<br/><br/>
<code>ignoreFileStreamSinkMetadata</code>: whether to ignore metadata information being left from file stream sink, which leads to always use in-memory file index. (default: false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find place to leave this for (batch) file source. Please let me know if we have proper place to put. At least we guide the needs of enable this option when "retainOnlyLastBatchInMetadata" option is enabled, so might be safe for batch case as well.

<code>retainOnlyLastBatchInMetadata</code>: whether to retain metadata information only for last succeed batch.
<br/><br/>
This option greatly reduces overhead on compacting metadata files which would be non-trivial when query processes lots of files in each batch.<br/>
NOTE: As it only retains the last batch in metadata, the metadata is not readable from file source: you must set "ignoreFileStreamSinkMetadata" option
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is not ideal, but given file stream sink itself also leverages file log, it cannot be an optional entirely.

If we would like to not leaving file log in this case, we may need to have another metadata (which store minimized information like the last succeed batch id) and store it instead when the option is turned on. It should be stored in another directory (_spark_metadata) and listing files should ignore this directory too.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2019

In practice, end users would have policy for data retention, and output files could be removed based on the policy. So it would be ideal if metadata can be reflected on the change of output files, but in point of Spark's view it doesn't look like easy to do. For example, if we go on checking existence of files in metadata list periodically (maybe each X batches to avoid concurrent modification), it will be another huge overhead to slow down. Specifying retention policy in Spark query (which files will be removed outside of Spark) is also really odd, so neither is beauty.

If it's OK for file stream sink to periodically check existence of files and get rid of removed files in file log (less side effect but not sure about performance), I'll apply the change.

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102529 has finished for PR 23840 at commit 644b8ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102531 has finished for PR 23840 at commit fa88d9e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Now this approach conflicts #23850, so I would rather want to let us choose one of alternatives.
I'll mark this as [WIP][DISCUSSION_NEEDED] for easily seeing the status.

@HeartSaVioR HeartSaVioR changed the title [SPARK-24295][SS] Add option to retain only last batch in file stream sink metadata [WIP][DISCUSSION_NEEDED][SPARK-24295][SS] Add option to retain only last batch in file stream sink metadata Feb 25, 2019
@alfredo-gimenez
Copy link

alfredo-gimenez commented Feb 25, 2019

Taking a look at this, as it could fix a longstanding critical performance bug in our ingestion pipeline. Couple comments:

  1. When retaining only the last metadata log, we should probably also disable log file compaction, since it will just copy the last log.

  2. Can this be used in conjunction with checkpoints? If checkpoints are present, they depend on the metadata log contents. If they are guaranteed to only use the last metadata log, this is fine, otherwise this case will have to be handled separately.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 25, 2019

FileStreamSink only reads the last batch of metadata to determine which batch sink wrote successfully. The huge metadata on FileStreamSink is actually not for FileStreamSink (it just effectively takes the last successful batch ID: it's for File(Stream)Source from another query.

Now I think the ideal way to address is incorporating last successful batch ID to query checkpoint and only writing sink metadata when option is enabled (though maybe we should make it default to true since it breaks backward compatibility).

@HeartSaVioR
Copy link
Contributor Author

I'm now seeing that metadata path (within checkpoint root) is injected to only Sources, which requires DSv2 change on Sink side if we really want to incorporate Sink metadata to query checkpoint. I guess this would not happen if we have concrete and nice use case, and even it happens we can get the change only in Spark 3.0.0 and upwards.

Let's see how other sink (KafkaSink) is implemented:

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
topic: Option[String]) extends Sink with Logging {
@volatile private var latestBatchId = -1L
override def toString(): String = "KafkaSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logInfo(s"Skipping already committed batch $batchId")
} else {
KafkaWriter.write(sqlContext.sparkSession,
data.queryExecution, executorKafkaParams, topic)
latestBatchId = batchId
}
}
}

it only leverages latestBatchId in memory, which means the rows could be requested to be written again when query restarts - it's OK because Kafka sink supports at-least-once.

I guess we couldn't take same approach to achieve exactly-once in File Stream Sink. It would be at least achieving weak exactly-once via at-least-once with idempotent if rewriting batch is idempotent, but not 100% sure about it.

Might be better to initiate discussion on dev. mailing list?

@HeartSaVioR
Copy link
Contributor Author

Posted discussion thread on dev. mailing list for necessary DSv2 API change.

https://lists.apache.org/thread.html/5a4b394e81cfdd8057c7b07b168df6e2d8814c040279b52f1c203854@%3Cdev.spark.apache.org%3E

@NamanMahor
Copy link

I was facing this issue SPARK-24295 so I was looking into commit. I have two queries.

  1. if we ignore sink metadata then we may face correct files/duplicate file issue as mentioned by @tdas in this mail chain http://mail-archives.apache.org/mod_mbox/spark-user/201706.mbox/%3CCA+AHuKmDDRCr4ZjvXOaaX+9QjLfMhF7Xy1qyUL3VH2Q3ziAQwg@mail.gmail.com%3E
  2. will CompactibleFileStreamLog.compact fail because there will not be any batchid to compact?

@HeartSaVioR
Copy link
Contributor Author

if we ignore sink metadata then we may face correct files/duplicate file issue as mentioned by @tdas in this mail chain

Yes. Actually there's no way to achieve both. Looks like the only way is let sink checks and purges file entries when files are removed outside of query via some retention policy, but that will only work when query is running (it might be OK since metadata is growing only when query is running).

will CompactibleFileStreamLog.compact fail because there will not be any batchid to compact?

I guess so, so the proposed patch is not a complete solution as of now. We may need to focus alternatives I've suggested as well or raise a new idea.

@HeartSaVioR
Copy link
Contributor Author

@alfredo-gimenez @NamanMahor
Could we vote in origin issue SPARK-24295 to see who/how many users really want to have it fixed?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 18, 2019

I've raised a patch for one of alternatives - #24128 - which applies retention on FileStreamSink.

@HeartSaVioR
Copy link
Contributor Author

Once we have a new patch which is less intrusive I'll close this one. Please follow up #24128 and review. Thanks all!

@HeartSaVioR HeartSaVioR deleted the SPARK-24295 branch March 19, 2019 00:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants